Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: remember last pick status in no real stream #11851

Merged
merged 9 commits into from
Feb 14, 2025
23 changes: 19 additions & 4 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ public final ClientStream newStream(
if (state.shutdownStatus != null) {
return new FailingClientStream(state.shutdownStatus, tracers);
}
PickResult pickResult = null;
if (state.lastPicker != null) {
PickResult pickResult = state.lastPicker.pickSubchannel(args);
pickResult = state.lastPicker.pickSubchannel(args);
callOptions = args.getCallOptions();
// User code provided authority takes precedence over the LB provided one.
if (callOptions.getAuthority() == null
Expand All @@ -156,7 +157,7 @@ public final ClientStream newStream(
synchronized (lock) {
PickerState newerState = pickerState;
if (state == newerState) {
return createPendingStream(args, tracers);
return createPendingStream(args, tracers, pickResult);
}
state = newerState;
}
Expand All @@ -171,9 +172,12 @@ public final ClientStream newStream(
* schedule tasks on syncContext.
*/
@GuardedBy("lock")
private PendingStream createPendingStream(
PickSubchannelArgs args, ClientStreamTracer[] tracers) {
private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers,
PickResult pickResult) {
PendingStream pendingStream = new PendingStream(args, tracers);
if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) {
pendingStream.lastPickStatus = pickResult.getStatus();
}
pendingStreams.add(pendingStream);
if (getPendingStreamsCount() == 1) {
syncContext.executeLater(reportTransportInUse);
Expand Down Expand Up @@ -293,6 +297,13 @@ final void reprocess(@Nullable SubchannelPicker picker) {
for (final PendingStream stream : toProcess) {
PickResult pickResult = picker.pickSubchannel(stream.args);
CallOptions callOptions = stream.args.getCallOptions();
if (callOptions.isWaitForReady() && pickResult.hasResult()) {
stream.lastPickStatus = pickResult.getStatus();
}
// User code provided authority takes precedence over the LB provided one.
if (callOptions.getAuthority() == null && pickResult.getAuthorityOverride() != null) {
stream.setAuthority(pickResult.getAuthorityOverride());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did these lines come about (looks like an incorrect merge)? This undoes the change done in https://github.com/grpc/grpc-java/pull/11862/files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're right. Somehow merge branch master into this branch caused it to happen.

}
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
Expand Down Expand Up @@ -349,6 +360,7 @@ private class PendingStream extends DelayedStream {
private final PickSubchannelArgs args;
private final Context context = Context.current();
private final ClientStreamTracer[] tracers;
private volatile Status lastPickStatus;

private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
this.args = args;
Expand Down Expand Up @@ -405,6 +417,9 @@ protected void onEarlyCancellation(Status reason) {
public void appendTimeoutInsight(InsightBuilder insight) {
if (args.getCallOptions().isWaitForReady()) {
insight.append("wait_for_ready");
if (lastPickStatus != null && !lastPickStatus.isOk()) {
insight.appendKeyValue("Last Pick Failure", lastPickStatus);
}
}
super.appendTimeoutInsight(insight);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,23 @@ public void pendingStream_appendTimeoutInsight_waitForReady() {
.matches("\\[wait_for_ready, buffered_nanos=[0-9]+\\, waiting_for_connection]");
}

@Test
public void pendingStream_appendTimeoutInsight_waitForReady_withLastPickFailure() {
ClientStream stream = delayedTransport.newStream(
method, headers, callOptions.withWaitForReady(), tracers);
stream.start(streamListener);
SubchannelPicker picker = mock(SubchannelPicker.class);
when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withError(Status.PERMISSION_DENIED));
delayedTransport.reprocess(picker);
InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
assertThat(insight.toString())
.matches("\\[wait_for_ready, "
+ "Last Pick Failure=Status\\{code=PERMISSION_DENIED, description=null, cause=null\\},"
+ " buffered_nanos=[0-9]+, waiting_for_connection]");
}

private static TransportProvider newTransportProvider(final ClientTransport transport) {
return new TransportProvider() {
@Override
Expand Down