From 65bdd9b0dceaeede63dffdb9ec434061a9ea51ca Mon Sep 17 00:00:00 2001 From: MV Shiva Prasad Date: Thu, 23 Jan 2025 08:32:00 +0000 Subject: [PATCH 1/7] core: remember last pick status in no real stream --- .../src/main/java/io/grpc/internal/DelayedClientTransport.java | 3 +++ core/src/main/java/io/grpc/internal/DelayedStream.java | 2 ++ .../test/java/io/grpc/internal/DelayedClientTransportTest.java | 2 +- core/src/test/java/io/grpc/internal/DelayedStreamTest.java | 3 ++- 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index ae173f4ac26..94cb211d907 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -286,6 +286,9 @@ final void reprocess(@Nullable SubchannelPicker picker) { for (final PendingStream stream : toProcess) { PickResult pickResult = picker.pickSubchannel(stream.args); + if (!pickResult.hasResult()) { + stream.lastPickStatus = pickResult.getStatus(); + } CallOptions callOptions = stream.args.getCallOptions(); // User code provided authority takes precedence over the LB provided one. if (callOptions.getAuthority() == null && pickResult.getAuthorityOverride() != null) { diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index 5f14f24cfe5..f2b9084e864 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -63,6 +63,7 @@ class DelayedStream implements ClientStream { private long streamSetTimeNanos; // No need to synchronize; start() synchronization provides a happens-before private List preStartPendingCalls = new ArrayList<>(); + protected Status lastPickStatus; @Override public void setMaxInboundMessageSize(final int maxSize) { @@ -109,6 +110,7 @@ public void appendTimeoutInsight(InsightBuilder insight) { } else { insight.appendKeyValue("buffered_nanos", System.nanoTime() - startTimeNanos); insight.append("waiting_for_connection"); + insight.append(lastPickStatus); } } } diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index a5160552a9e..c3b530f5afb 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -756,7 +756,7 @@ public void pendingStream_appendTimeoutInsight_waitForReady() { InsightBuilder insight = new InsightBuilder(); stream.appendTimeoutInsight(insight); assertThat(insight.toString()) - .matches("\\[wait_for_ready, buffered_nanos=[0-9]+\\, waiting_for_connection]"); + .matches("\\[wait_for_ready, buffered_nanos=[0-9]+\\, waiting_for_connection, null]"); } private static TransportProvider newTransportProvider(final ClientTransport transport) { diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java index a47bea9f4ab..5e9ab9216f3 100644 --- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java @@ -450,7 +450,8 @@ public void appendTimeoutInsight_realStreamNotSet() { InsightBuilder insight = new InsightBuilder(); stream.start(listener); stream.appendTimeoutInsight(insight); - assertThat(insight.toString()).matches("\\[buffered_nanos=[0-9]+\\, waiting_for_connection]"); + assertThat(insight.toString()) + .matches("\\[buffered_nanos=[0-9]+\\, waiting_for_connection, null]"); } @Test From 76b763bae2479e9f182a532f00d29532fe5f4c18 Mon Sep 17 00:00:00 2001 From: MV Shiva Prasad Date: Sat, 25 Jan 2025 16:02:48 +0000 Subject: [PATCH 2/7] Address comments --- .../grpc/internal/DelayedClientTransport.java | 6 +++++- .../java/io/grpc/internal/DelayedStream.java | 2 -- .../internal/DelayedClientTransportTest.java | 19 ++++++++++++++++++- .../io/grpc/internal/DelayedStreamTest.java | 3 +-- 4 files changed, 24 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 94cb211d907..6956e20dba7 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -286,7 +286,7 @@ final void reprocess(@Nullable SubchannelPicker picker) { for (final PendingStream stream : toProcess) { PickResult pickResult = picker.pickSubchannel(stream.args); - if (!pickResult.hasResult()) { + if (pickResult.hasResult()) { stream.lastPickStatus = pickResult.getStatus(); } CallOptions callOptions = stream.args.getCallOptions(); @@ -350,6 +350,7 @@ private class PendingStream extends DelayedStream { private final PickSubchannelArgs args; private final Context context = Context.current(); private final ClientStreamTracer[] tracers; + private Status lastPickStatus; private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) { this.args = args; @@ -399,6 +400,9 @@ protected void onEarlyCancellation(Status reason) { public void appendTimeoutInsight(InsightBuilder insight) { if (args.getCallOptions().isWaitForReady()) { insight.append("wait_for_ready"); + if (lastPickStatus != null && !lastPickStatus.isOk()) { + insight.appendKeyValue("Last Pick Failure", lastPickStatus); + } } super.appendTimeoutInsight(insight); } diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index f2b9084e864..5f14f24cfe5 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -63,7 +63,6 @@ class DelayedStream implements ClientStream { private long streamSetTimeNanos; // No need to synchronize; start() synchronization provides a happens-before private List preStartPendingCalls = new ArrayList<>(); - protected Status lastPickStatus; @Override public void setMaxInboundMessageSize(final int maxSize) { @@ -110,7 +109,6 @@ public void appendTimeoutInsight(InsightBuilder insight) { } else { insight.appendKeyValue("buffered_nanos", System.nanoTime() - startTimeNanos); insight.append("waiting_for_connection"); - insight.append(lastPickStatus); } } } diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index c3b530f5afb..571701b1685 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -756,7 +756,24 @@ public void pendingStream_appendTimeoutInsight_waitForReady() { InsightBuilder insight = new InsightBuilder(); stream.appendTimeoutInsight(insight); assertThat(insight.toString()) - .matches("\\[wait_for_ready, buffered_nanos=[0-9]+\\, waiting_for_connection, null]"); + .matches("\\[wait_for_ready, buffered_nanos=[0-9]+\\, waiting_for_connection]"); + } + + @Test + public void pendingStream_appendTimeoutInsight_waitForReady_withLastPickFailure() { + ClientStream stream = delayedTransport.newStream( + method, headers, callOptions.withWaitForReady(), tracers); + stream.start(streamListener); + SubchannelPicker picker = mock(SubchannelPicker.class); + when(picker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withError(Status.PERMISSION_DENIED)); + delayedTransport.reprocess(picker); + InsightBuilder insight = new InsightBuilder(); + stream.appendTimeoutInsight(insight); + assertThat(insight.toString()) + .matches("\\[wait_for_ready, " + + "Last Pick Failure=Status\\{code=PERMISSION_DENIED, description=null, cause=null\\}," + + " buffered_nanos=[0-9]+, waiting_for_connection]"); } private static TransportProvider newTransportProvider(final ClientTransport transport) { diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java index 5e9ab9216f3..a47bea9f4ab 100644 --- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java @@ -450,8 +450,7 @@ public void appendTimeoutInsight_realStreamNotSet() { InsightBuilder insight = new InsightBuilder(); stream.start(listener); stream.appendTimeoutInsight(insight); - assertThat(insight.toString()) - .matches("\\[buffered_nanos=[0-9]+\\, waiting_for_connection, null]"); + assertThat(insight.toString()).matches("\\[buffered_nanos=[0-9]+\\, waiting_for_connection]"); } @Test From 5fbe2dd2094e15627358e477f494cf77e0ad7d29 Mon Sep 17 00:00:00 2001 From: MV Shiva Prasad Date: Thu, 30 Jan 2025 19:24:40 +0000 Subject: [PATCH 3/7] Addressed comments --- .../grpc/internal/DelayedClientTransport.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 6956e20dba7..444f99c6982 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -129,8 +129,9 @@ public final ClientStream newStream( if (state.shutdownStatus != null) { return new FailingClientStream(state.shutdownStatus, tracers); } + PickResult pickResult = null; if (state.lastPicker != null) { - PickResult pickResult = state.lastPicker.pickSubchannel(args); + pickResult = state.lastPicker.pickSubchannel(args); callOptions = args.getCallOptions(); // User code provided authority takes precedence over the LB provided one. if (callOptions.getAuthority() == null @@ -150,7 +151,7 @@ public final ClientStream newStream( synchronized (lock) { PickerState newerState = pickerState; if (state == newerState) { - return createPendingStream(args, tracers); + return createPendingStream(args, tracers, pickResult); } state = newerState; } @@ -166,8 +167,11 @@ public final ClientStream newStream( */ @GuardedBy("lock") private PendingStream createPendingStream( - PickSubchannelArgs args, ClientStreamTracer[] tracers) { + PickSubchannelArgs args, ClientStreamTracer[] tracers, PickResult pickResult) { PendingStream pendingStream = new PendingStream(args, tracers); + if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) { + pendingStream.lastPickStatus = pickResult.getStatus(); + } pendingStreams.add(pendingStream); if (getPendingStreamsCount() == 1) { syncContext.executeLater(reportTransportInUse); @@ -286,10 +290,10 @@ final void reprocess(@Nullable SubchannelPicker picker) { for (final PendingStream stream : toProcess) { PickResult pickResult = picker.pickSubchannel(stream.args); - if (pickResult.hasResult()) { + CallOptions callOptions = stream.args.getCallOptions(); + if (callOptions.isWaitForReady() && pickResult.hasResult()) { stream.lastPickStatus = pickResult.getStatus(); } - CallOptions callOptions = stream.args.getCallOptions(); // User code provided authority takes precedence over the LB provided one. if (callOptions.getAuthority() == null && pickResult.getAuthorityOverride() != null) { stream.setAuthority(pickResult.getAuthorityOverride()); @@ -400,8 +404,10 @@ protected void onEarlyCancellation(Status reason) { public void appendTimeoutInsight(InsightBuilder insight) { if (args.getCallOptions().isWaitForReady()) { insight.append("wait_for_ready"); - if (lastPickStatus != null && !lastPickStatus.isOk()) { - insight.appendKeyValue("Last Pick Failure", lastPickStatus); + synchronized (this) { + if (lastPickStatus != null && !lastPickStatus.isOk()) { + insight.appendKeyValue("Last Pick Failure", lastPickStatus); + } } } super.appendTimeoutInsight(insight); From dd2fb04e20d9c832f4bd651e24ddc1eb27191900 Mon Sep 17 00:00:00 2001 From: MV Shiva Prasad Date: Fri, 31 Jan 2025 10:43:48 +0000 Subject: [PATCH 4/7] Used Atomic reference to store Status --- .../grpc/internal/DelayedClientTransport.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 444f99c6982..bcde104a1c2 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -37,6 +37,7 @@ import java.util.Collections; import java.util.LinkedHashSet; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -170,7 +171,7 @@ private PendingStream createPendingStream( PickSubchannelArgs args, ClientStreamTracer[] tracers, PickResult pickResult) { PendingStream pendingStream = new PendingStream(args, tracers); if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) { - pendingStream.lastPickStatus = pickResult.getStatus(); + pendingStream.lastPickStatus.set(pickResult.getStatus()); } pendingStreams.add(pendingStream); if (getPendingStreamsCount() == 1) { @@ -291,8 +292,10 @@ final void reprocess(@Nullable SubchannelPicker picker) { for (final PendingStream stream : toProcess) { PickResult pickResult = picker.pickSubchannel(stream.args); CallOptions callOptions = stream.args.getCallOptions(); - if (callOptions.isWaitForReady() && pickResult.hasResult()) { - stream.lastPickStatus = pickResult.getStatus(); + synchronized (lock) { + if (callOptions.isWaitForReady() && pickResult.hasResult()) { + stream.lastPickStatus.set(pickResult.getStatus()); + } } // User code provided authority takes precedence over the LB provided one. if (callOptions.getAuthority() == null && pickResult.getAuthorityOverride() != null) { @@ -354,7 +357,7 @@ private class PendingStream extends DelayedStream { private final PickSubchannelArgs args; private final Context context = Context.current(); private final ClientStreamTracer[] tracers; - private Status lastPickStatus; + private final AtomicReference lastPickStatus = new AtomicReference<>(null); private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) { this.args = args; @@ -404,10 +407,9 @@ protected void onEarlyCancellation(Status reason) { public void appendTimeoutInsight(InsightBuilder insight) { if (args.getCallOptions().isWaitForReady()) { insight.append("wait_for_ready"); - synchronized (this) { - if (lastPickStatus != null && !lastPickStatus.isOk()) { - insight.appendKeyValue("Last Pick Failure", lastPickStatus); - } + Status status = lastPickStatus.get(); + if (status != null && !status.isOk()) { + insight.appendKeyValue("Last Pick Failure", status); } } super.appendTimeoutInsight(insight); From e12f8eaaf4df4bdbb1fe90cfd0c598025558312e Mon Sep 17 00:00:00 2001 From: MV Shiva Prasad Date: Wed, 12 Feb 2025 06:51:42 +0000 Subject: [PATCH 5/7] Use volatile --- .../grpc/internal/DelayedClientTransport.java | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index f0ad32c7e4e..328771bf105 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -38,7 +38,6 @@ import java.util.Collections; import java.util.LinkedHashSet; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -173,11 +172,11 @@ public final ClientStream newStream( * schedule tasks on syncContext. */ @GuardedBy("lock") - private PendingStream createPendingStream( - PickSubchannelArgs args, ClientStreamTracer[] tracers, PickResult pickResult) { + private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, + PickResult pickResult) { PendingStream pendingStream = new PendingStream(args, tracers); if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) { - pendingStream.lastPickStatus.set(pickResult.getStatus()); + pendingStream.lastPickStatus = pickResult.getStatus(); } pendingStreams.add(pendingStream); if (getPendingStreamsCount() == 1) { @@ -298,10 +297,8 @@ final void reprocess(@Nullable SubchannelPicker picker) { for (final PendingStream stream : toProcess) { PickResult pickResult = picker.pickSubchannel(stream.args); CallOptions callOptions = stream.args.getCallOptions(); - synchronized (lock) { - if (callOptions.isWaitForReady() && pickResult.hasResult()) { - stream.lastPickStatus.set(pickResult.getStatus()); - } + if (callOptions.isWaitForReady() && pickResult.hasResult()) { + stream.lastPickStatus = pickResult.getStatus(); } // User code provided authority takes precedence over the LB provided one. if (callOptions.getAuthority() == null && pickResult.getAuthorityOverride() != null) { @@ -363,7 +360,7 @@ private class PendingStream extends DelayedStream { private final PickSubchannelArgs args; private final Context context = Context.current(); private final ClientStreamTracer[] tracers; - private final AtomicReference lastPickStatus = new AtomicReference<>(null); + private volatile Status lastPickStatus; private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) { this.args = args; @@ -420,9 +417,8 @@ protected void onEarlyCancellation(Status reason) { public void appendTimeoutInsight(InsightBuilder insight) { if (args.getCallOptions().isWaitForReady()) { insight.append("wait_for_ready"); - Status status = lastPickStatus.get(); - if (status != null && !status.isOk()) { - insight.appendKeyValue("Last Pick Failure", status); + if (lastPickStatus != null && !lastPickStatus.isOk()) { + insight.appendKeyValue("Last Pick Failure", lastPickStatus); } } super.appendTimeoutInsight(insight); From 71eba4e7d896f176188222318d39587043abfc35 Mon Sep 17 00:00:00 2001 From: MV Shiva Prasad Date: Wed, 12 Feb 2025 09:32:13 +0000 Subject: [PATCH 6/7] cleanup --- .../main/java/io/grpc/internal/DelayedClientTransport.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 328771bf105..9fe43285656 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -300,10 +300,6 @@ final void reprocess(@Nullable SubchannelPicker picker) { if (callOptions.isWaitForReady() && pickResult.hasResult()) { stream.lastPickStatus = pickResult.getStatus(); } - // User code provided authority takes precedence over the LB provided one. - if (callOptions.getAuthority() == null && pickResult.getAuthorityOverride() != null) { - stream.setAuthority(pickResult.getAuthorityOverride()); - } final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, callOptions.isWaitForReady()); if (transport != null) { From 31c405a7ab89f969b83294efa7ce7444d6f1011b Mon Sep 17 00:00:00 2001 From: MV Shiva Prasad Date: Thu, 13 Feb 2025 06:33:18 +0000 Subject: [PATCH 7/7] read through local variable --- .../main/java/io/grpc/internal/DelayedClientTransport.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 9fe43285656..8ff755af3eb 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -413,8 +413,9 @@ protected void onEarlyCancellation(Status reason) { public void appendTimeoutInsight(InsightBuilder insight) { if (args.getCallOptions().isWaitForReady()) { insight.append("wait_for_ready"); - if (lastPickStatus != null && !lastPickStatus.isOk()) { - insight.appendKeyValue("Last Pick Failure", lastPickStatus); + Status status = lastPickStatus; + if (status != null && !status.isOk()) { + insight.appendKeyValue("Last Pick Failure", status); } } super.appendTimeoutInsight(insight);