-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core: Made ServerImpl.internalClose thread-safe. #11864
base: master
Are you sure you want to change the base?
Changes from 9 commits
b1c980a
fd19633
eac7ef7
9cf7cb5
c246e75
a19ba4c
036e241
0d59d54
d185b47
1dbd353
4d72fcd
327085a
efa740c
2e67222
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -503,7 +503,7 @@ private void streamCreatedInternal( | |
|
||
final JumpToApplicationThreadServerStreamListener jumpListener | ||
= new JumpToApplicationThreadServerStreamListener( | ||
wrappedExecutor, executor, stream, context, tag); | ||
wrappedExecutor, executor, stream, context, tag, headers); | ||
stream.setListener(jumpListener); | ||
final SettableFuture<ServerCallParameters<?,?>> future = SettableFuture.create(); | ||
// Run in serializing executor so jumpListener.setListener() is called before any callbacks | ||
|
@@ -778,14 +778,30 @@ static final class JumpToApplicationThreadServerStreamListener implements Server | |
private final Tag tag; | ||
// Only accessed from callExecutor. | ||
private ServerStreamListener listener; | ||
|
||
public JumpToApplicationThreadServerStreamListener(Executor executor, | ||
Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) { | ||
private Metadata trailers; | ||
|
||
public JumpToApplicationThreadServerStreamListener( | ||
Executor executor, | ||
Executor cancelExecutor, | ||
ServerStream stream, | ||
Context.CancellableContext context, | ||
Tag tag) { | ||
this(executor, cancelExecutor, stream, context, tag, new Metadata()); | ||
} | ||
|
||
public JumpToApplicationThreadServerStreamListener( | ||
Executor executor, | ||
Executor cancelExecutor, | ||
ServerStream stream, | ||
Context.CancellableContext context, | ||
Tag tag, | ||
Metadata trailers) { | ||
this.callExecutor = executor; | ||
this.cancelExecutor = cancelExecutor; | ||
this.stream = stream; | ||
this.context = context; | ||
this.tag = tag; | ||
this.trailers = trailers; | ||
} | ||
|
||
/** | ||
|
@@ -808,10 +824,9 @@ void setListener(ServerStreamListener listener) { | |
/** | ||
* Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use. | ||
*/ | ||
private void internalClose(Throwable t) { | ||
// TODO(ejona86): this is not thread-safe :) | ||
private synchronized void internalClose(Throwable t) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Throwing synchronized on it doesn't make it thread-safe. You'd have to synchronize most calls to the stream, and make sure to stop calling the stream after closing it. And we don't want to synchronize most calls to the stream. We will need help from the stream to implement this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since all 3 callers of ServerImpl.closeInternal viz., onReady, messagesAvailable and halfClosed run serialized on the callExecutor, what makes the stream.close call in internalClose non-thread safe? Is the race with other methods on the stream? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. onReady, messagesAvailable, and halfClosed are callbacks and don't generally call into This is the "three threads" we had talked about for RPCs: application thread, transport thread, callback thread. |
||
String description = "Application error processing RPC"; | ||
stream.close(Status.UNKNOWN.withDescription(description).withCause(t), new Metadata()); | ||
stream.close(Status.UNKNOWN.withDescription(description).withCause(t), this.trailers); | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to create the trailers with metadata from the exception caught. See example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that using these
headers
is very wrong, as echoing back the client's request headers is harmful. Although, in fact, the original code was fine and it should just be a new set of Metadata. None of the callers ofinternalClose()
has metadata attached, and even if they do, we'd need to understand why a bit better because sending RST_STREAM was a valid way to handle this, which won't have metadata.