Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: remember last pick status in no real stream #11851

Merged
merged 9 commits into from
Feb 14, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ final void reprocess(@Nullable SubchannelPicker picker) {

for (final PendingStream stream : toProcess) {
PickResult pickResult = picker.pickSubchannel(stream.args);
if (pickResult.hasResult()) {
stream.lastPickStatus = pickResult.getStatus();
}
CallOptions callOptions = stream.args.getCallOptions();
// User code provided authority takes precedence over the LB provided one.
if (callOptions.getAuthority() == null && pickResult.getAuthorityOverride() != null) {
Expand Down Expand Up @@ -347,6 +350,7 @@ private class PendingStream extends DelayedStream {
private final PickSubchannelArgs args;
private final Context context = Context.current();
private final ClientStreamTracer[] tracers;
private Status lastPickStatus;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is accessed from multiple threads (pick attempts are single-threaded, but appendTimeoutInsight is a separate thread). I see the parent class DelayedStream is using this as a lock, and PendingStream could do the same for this variable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still has synchronization problems. One assignment uses DelayedClientTransport.lock, another uses no lock, and a read uses PendingStream.this. To be useful, you have to use the same lock in all reads/writes.

Copy link
Member Author

@shivaspeaks shivaspeaks Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exploring ways that we can do:

I definitely cannot synchronize on local variable having references. That doesn't work. I cannot define a new lock at the top because then that will be blocking every stream (If we want this, we can definitely lock on DelayedClientTransport anyways). We need lock per pending stream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ejona86 I believe AtomicReference to store Status is viable here. I made a commit, take a look.

Copy link
Contributor

@kannanjgithub kannanjgithub Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before worrying about the synchronization problem, there is a bigger problem. The PR doesn't help with the case of transport not established at all, and control will not go beyond GrpcUtil.getTransportFromPickResult != null.
Even if we didn't gate on the condition on transport != null ,pick result won't capture the transport status returned to the listener so (last) pick result will just be OK even when the picked sub channel fails to connect.

Copy link
Contributor

@kannanjgithub kannanjgithub Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to set up the pending stream as a listener to InternalSubChannel of the picker result instead of using the picker status itself, in newStream and reprocess(). InternalSubChannel will maintain a list of listeners and update them all in transportShutdown with the status. Whenever the picker result changes, reprocess should make each pending stream un-listen to the previous InternalSubChannel and listen to the new InternalSubChannel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current rough approach seems fine to me. What you may not notice is getTransportFromPickResult() is wait-for-ready-aware and queues if the RPC is wait-for-ready. What we want to do here is see the error that the RPC would have failed with if it wasn't wait-for-ready.

Since we're dealing with RPCs here, there must be no involvement with subchannel listeners. The LBs are responsible for that, and we don't know which subchannel is relevant. We need the LB to tell us if there is an error, which it does via the pick result.

When reviewing this earlier I saw that the flows can be simplified now that getTransportFromPickResult() is only used in one class (prior to 8844cf7 ManagedChannelImpl also used it). But I wanted to wait until this PR was merged before making such changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get it now, the pick result is in fact the status of the failed transport. With multiple updates to pick result it seems I only noticed the first status update of OK when connecting but not the later one during transport shutdown. This is the same status that I was trying to surface it in other ways but that is redundant and is already available.

About lock for the pending stream's pick result status, is there any issue if appendTimeoutInsight accesses this value outside any lock at all? Like, Status cannot have some garbage value just because the read is interleaved with a locked write. There is the other issue of a non locked variable cached in the thread's register but here the pick result status accessed through object references can't be cached like that.


private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
this.args = args;
Expand Down Expand Up @@ -396,6 +400,9 @@ protected void onEarlyCancellation(Status reason) {
public void appendTimeoutInsight(InsightBuilder insight) {
if (args.getCallOptions().isWaitForReady()) {
insight.append("wait_for_ready");
if (lastPickStatus != null && !lastPickStatus.isOk()) {
insight.appendKeyValue("Last Pick Failure", lastPickStatus);
}
}
super.appendTimeoutInsight(insight);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,23 @@ public void pendingStream_appendTimeoutInsight_waitForReady() {
.matches("\\[wait_for_ready, buffered_nanos=[0-9]+\\, waiting_for_connection]");
}

@Test
public void pendingStream_appendTimeoutInsight_waitForReady_withLastPickFailure() {
ClientStream stream = delayedTransport.newStream(
method, headers, callOptions.withWaitForReady(), tracers);
stream.start(streamListener);
SubchannelPicker picker = mock(SubchannelPicker.class);
when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withError(Status.PERMISSION_DENIED));
delayedTransport.reprocess(picker);
InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
assertThat(insight.toString())
.matches("\\[wait_for_ready, "
+ "Last Pick Failure=Status\\{code=PERMISSION_DENIED, description=null, cause=null\\},"
+ " buffered_nanos=[0-9]+, waiting_for_connection]");
}

private static TransportProvider newTransportProvider(final ClientTransport transport) {
return new TransportProvider() {
@Override
Expand Down
Loading