Skip to content

Commit

Permalink
core: Resolved review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
harshagoo94 committed Feb 21, 2025
1 parent 327085a commit efa740c
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
Expand Down Expand Up @@ -1598,8 +1599,10 @@ public void interactionsAfterServerStreamCloseAreNoops() throws Exception {
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));

// Ensure that for a closed ServerStream, interactions are noops
server.stream.writeHeaders(new Metadata(), true);
server.stream.writeMessage(methodDescriptor.streamResponse("response"));
assertThrows(Exception.class, () ->
server.stream.writeHeaders(new Metadata(), true));
assertThrows(Exception.class, () ->
server.stream.writeMessage(methodDescriptor.streamResponse("response")));
server.stream.close(Status.INTERNAL, new Metadata());

// Make sure new streams still work properly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.inprocess;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
import static java.lang.Math.max;

Expand Down Expand Up @@ -414,6 +415,7 @@ private class InProcessServerStream implements ServerStream {
private boolean closed;
@GuardedBy("this")
private int outboundSeqNo;
private boolean closeCalled;

InProcessServerStream(MethodDescriptor<?, ?> method, Metadata headers) {
statsTraceCtx = StatsTraceContext.newServerContext(
Expand All @@ -431,6 +433,7 @@ public void setListener(ServerStreamListener serverStreamListener) {

@Override
public void request(int numMessages) {
checkState(!closeCalled, "call already closed");
boolean onReady = clientStream.serverRequested(numMessages);
if (onReady) {
synchronized (this) {
Expand Down Expand Up @@ -487,6 +490,7 @@ private void clientCancelled(Status status) {

@Override
public void writeMessage(InputStream message) {
checkState(!closeCalled, "call already closed");
long messageLength = 0;
if (isEnabledSupportTracingMessageSizes) {
try {
Expand Down Expand Up @@ -546,6 +550,7 @@ public synchronized boolean isReady() {

@Override
public void writeHeaders(Metadata headers, boolean flush) {
checkState(!closeCalled, "call already closed");
if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
int metadataSize = metadataSize(headers);
if (metadataSize > clientMaxInboundMetadataSize) {
Expand Down Expand Up @@ -581,6 +586,7 @@ public void close(Status status, Metadata trailers) {
// clientStreamListener.closed can trigger clientStream.cancel (see code in
// ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are
// calling internalCancel().
closeCalled = true;
clientStream.serverClosed(Status.OK, status);

if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
Expand Down

0 comments on commit efa740c

Please sign in to comment.