Skip to content

Commit

Permalink
core: Added changes to make ServerImpl.internalClose() thread-safe
Browse files Browse the repository at this point in the history
  • Loading branch information
vinodhabib committed Feb 26, 2025
1 parent cdab410 commit bd1749b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
11 changes: 7 additions & 4 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -779,8 +779,8 @@ static final class JumpToApplicationThreadServerStreamListener implements Server
// Only accessed from callExecutor.
private ServerStreamListener listener;

public JumpToApplicationThreadServerStreamListener(Executor executor,
Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) {
public JumpToApplicationThreadServerStreamListener(Executor executor, Executor cancelExecutor,
ServerStream stream, Context.CancellableContext context, Tag tag) {
this.callExecutor = executor;
this.cancelExecutor = cancelExecutor;
this.stream = stream;
Expand Down Expand Up @@ -809,9 +809,12 @@ void setListener(ServerStreamListener listener) {
* Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use.
*/
private void internalClose(Throwable t) {
// TODO(ejona86): this is not thread-safe :)
String description = "Application error processing RPC";
stream.close(Status.UNKNOWN.withDescription(description).withCause(t), new Metadata());
Metadata metadata = Status.trailersFromThrowable(t);
if (metadata == null) {
metadata = new Metadata();
}
stream.close(Status.UNKNOWN.withDescription(description).withCause(t), metadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.inprocess;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
import static java.lang.Math.max;

Expand Down Expand Up @@ -414,6 +415,7 @@ private class InProcessServerStream implements ServerStream {
private boolean closed;
@GuardedBy("this")
private int outboundSeqNo;
private boolean closeCalled;

InProcessServerStream(MethodDescriptor<?, ?> method, Metadata headers) {
statsTraceCtx = StatsTraceContext.newServerContext(
Expand All @@ -431,6 +433,7 @@ public void setListener(ServerStreamListener serverStreamListener) {

@Override
public void request(int numMessages) {
checkState(!closeCalled, "call already closed");
boolean onReady = clientStream.serverRequested(numMessages);
if (onReady) {
synchronized (this) {
Expand Down Expand Up @@ -487,6 +490,7 @@ private void clientCancelled(Status status) {

@Override
public void writeMessage(InputStream message) {
checkState(!closeCalled, "call already closed");
long messageLength = 0;
if (isEnabledSupportTracingMessageSizes) {
try {
Expand Down Expand Up @@ -546,6 +550,7 @@ public synchronized boolean isReady() {

@Override
public void writeHeaders(Metadata headers, boolean flush) {
checkState(!closeCalled, "call already closed");
if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
int metadataSize = metadataSize(headers);
if (metadataSize > clientMaxInboundMetadataSize) {
Expand Down

0 comments on commit bd1749b

Please sign in to comment.