From efa740c7e1eeed3de8ba6e66e8ad4da1199810d9 Mon Sep 17 00:00:00 2001 From: harshagoo94 Date: Fri, 21 Feb 2025 11:24:06 +0000 Subject: [PATCH] core: Resolved review comments. --- .../java/io/grpc/internal/AbstractTransportTest.java | 7 +++++-- .../main/java/io/grpc/inprocess/InProcessTransport.java | 6 ++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java index aea7ff49032..d85c54a31e3 100644 --- a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; @@ -1598,8 +1599,10 @@ public void interactionsAfterServerStreamCloseAreNoops() throws Exception { assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); // Ensure that for a closed ServerStream, interactions are noops - server.stream.writeHeaders(new Metadata(), true); - server.stream.writeMessage(methodDescriptor.streamResponse("response")); + assertThrows(Exception.class, () -> + server.stream.writeHeaders(new Metadata(), true)); + assertThrows(Exception.class, () -> + server.stream.writeMessage(methodDescriptor.streamResponse("response"))); server.stream.close(Status.INTERNAL, new Metadata()); // Make sure new streams still work properly diff --git a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java index eacf46ca4a2..0fea117a9bf 100644 --- a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -17,6 +17,7 @@ package io.grpc.inprocess; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; import static java.lang.Math.max; @@ -414,6 +415,7 @@ private class InProcessServerStream implements ServerStream { private boolean closed; @GuardedBy("this") private int outboundSeqNo; + private boolean closeCalled; InProcessServerStream(MethodDescriptor method, Metadata headers) { statsTraceCtx = StatsTraceContext.newServerContext( @@ -431,6 +433,7 @@ public void setListener(ServerStreamListener serverStreamListener) { @Override public void request(int numMessages) { + checkState(!closeCalled, "call already closed"); boolean onReady = clientStream.serverRequested(numMessages); if (onReady) { synchronized (this) { @@ -487,6 +490,7 @@ private void clientCancelled(Status status) { @Override public void writeMessage(InputStream message) { + checkState(!closeCalled, "call already closed"); long messageLength = 0; if (isEnabledSupportTracingMessageSizes) { try { @@ -546,6 +550,7 @@ public synchronized boolean isReady() { @Override public void writeHeaders(Metadata headers, boolean flush) { + checkState(!closeCalled, "call already closed"); if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) { int metadataSize = metadataSize(headers); if (metadataSize > clientMaxInboundMetadataSize) { @@ -581,6 +586,7 @@ public void close(Status status, Metadata trailers) { // clientStreamListener.closed can trigger clientStream.cancel (see code in // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are // calling internalCancel(). + closeCalled = true; clientStream.serverClosed(Status.OK, status); if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {