diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index a82ca770093..91007c392d1 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -24,7 +24,9 @@ import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Ticker; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import io.grpc.ChannelLogger; import io.grpc.ChannelLogger.ChannelLogLevel; @@ -38,8 +40,6 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.Status; -import io.grpc.SynchronizationContext; -import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.lookup.v1.RouteLookupServiceGrpc; @@ -54,7 +54,6 @@ import io.grpc.rls.RlsProtoData.RouteLookupConfig; import io.grpc.rls.RlsProtoData.RouteLookupRequest; import io.grpc.rls.RlsProtoData.RouteLookupResponse; -import io.grpc.rls.Throttler.ThrottledException; import io.grpc.stub.StreamObserver; import io.grpc.util.ForwardingLoadBalancerHelper; import java.net.URI; @@ -62,6 +61,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.annotation.CheckReturnValue; @@ -96,7 +96,6 @@ final class CachingRlsLbClient { @GuardedBy("lock") private final Map pendingCallCache = new HashMap<>(); - private final SynchronizationContext synchronizationContext; private final ScheduledExecutorService scheduledExecutorService; private final Ticker ticker; private final Throttler throttler; @@ -118,7 +117,6 @@ final class CachingRlsLbClient { private CachingRlsLbClient(Builder builder) { helper = new RlsLbHelper(checkNotNull(builder.helper, "helper")); scheduledExecutorService = helper.getScheduledExecutorService(); - synchronizationContext = helper.getSynchronizationContext(); lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig"); RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig(); maxAgeNanos = rlsConfig.maxAgeInNanos(); @@ -129,10 +127,11 @@ private CachingRlsLbClient(Builder builder) { linkedHashLruCache = new RlsAsyncLruCache( rlsConfig.cacheSizeBytes(), - builder.evictionListener, + new AutoCleaningEvictionListener(builder.evictionListener), scheduledExecutorService, ticker, - lock); + lock, + helper); logger = helper.getChannelLogger(); String serverHost = null; try { @@ -193,15 +192,19 @@ static Status convertRlsServerStatus(Status status, String serverName) { serverName, status.getCode(), status.getDescription())); } - @CheckReturnValue - private ListenableFuture asyncRlsCall(RouteLookupRequest request) { + /** Populates async cache entry for new request. */ + @GuardedBy("lock") + private CachedRouteLookupResponse asyncRlsCall( + RouteLookupRequest request, @Nullable BackoffPolicy backoffPolicy) { logger.log(ChannelLogLevel.DEBUG, "Making an async call to RLS"); - final SettableFuture response = SettableFuture.create(); if (throttler.shouldThrottle()) { logger.log(ChannelLogLevel.DEBUG, "Request is throttled"); - response.setException(new ThrottledException()); - return response; + // Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting + // on this result + return CachedRouteLookupResponse.backoffEntry(createBackOffEntry( + request, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"), backoffPolicy)); } + final SettableFuture response = SettableFuture.create(); io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request); logger.log(ChannelLogLevel.DEBUG, "Sending RouteLookupRequest: {0}", routeLookupRequest); rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS) @@ -219,7 +222,6 @@ public void onError(Throwable t) { logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t); response.setException(t); throttler.registerBackendResponse(true); - helper.propagateRlsError(); } @Override @@ -228,7 +230,8 @@ public void onCompleted() { throttler.registerBackendResponse(false); } }); - return response; + return CachedRouteLookupResponse.pendingResponse( + createPendingEntry(request, response, backoffPolicy)); } /** @@ -245,7 +248,11 @@ final CachedRouteLookupResponse get(final RouteLookupRequest request) { cacheEntry = linkedHashLruCache.read(request); if (cacheEntry == null) { logger.log(ChannelLogLevel.DEBUG, "No cache entry found, making a new lrs request"); - return handleNewRequest(request); + PendingCacheEntry pendingEntry = pendingCallCache.get(request); + if (pendingEntry != null) { + return CachedRouteLookupResponse.pendingResponse(pendingEntry); + } + return asyncRlsCall(request, /* backoffPolicy= */ null); } if (cacheEntry instanceof DataCacheEntry) { @@ -276,46 +283,86 @@ void close() { } } - /** - * Populates async cache entry for new request. This is only methods directly modifies the cache, - * any status change is happening via event (async request finished, timed out, etc) in {@link - * PendingCacheEntry}, {@link DataCacheEntry} and {@link BackoffCacheEntry}. - */ - private CachedRouteLookupResponse handleNewRequest(RouteLookupRequest request) { + void requestConnection() { + rlsChannel.getState(true); + } + + @GuardedBy("lock") + private PendingCacheEntry createPendingEntry( + RouteLookupRequest request, + ListenableFuture pendingCall, + @Nullable BackoffPolicy backoffPolicy) { + PendingCacheEntry entry = new PendingCacheEntry(request, pendingCall, backoffPolicy); + // Add the entry to the map before adding the Listener, because the listener removes the + // entry from the map + pendingCallCache.put(request, entry); + // Beware that the listener can run immediately on the current thread + pendingCall.addListener(() -> pendingRpcComplete(entry), MoreExecutors.directExecutor()); + return entry; + } + + private void pendingRpcComplete(PendingCacheEntry entry) { synchronized (lock) { - PendingCacheEntry pendingEntry = pendingCallCache.get(request); - if (pendingEntry != null) { - return CachedRouteLookupResponse.pendingResponse(pendingEntry); + boolean clientClosed = pendingCallCache.remove(entry.request) == null; + if (clientClosed) { + return; } - ListenableFuture asyncCall = asyncRlsCall(request); - if (!asyncCall.isDone()) { - pendingEntry = new PendingCacheEntry(request, asyncCall); - // Add the entry to the map before adding the Listener, because the listener removes the - // entry from the map - pendingCallCache.put(request, pendingEntry); - // Beware that the listener can run immediately on the current thread - asyncCall.addListener(pendingEntry::handleDoneFuture, synchronizationContext); - return CachedRouteLookupResponse.pendingResponse(pendingEntry); - } else { - // async call returned finished future is most likely throttled - try { - RouteLookupResponse response = asyncCall.get(); - DataCacheEntry dataEntry = new DataCacheEntry(request, response); - linkedHashLruCache.cacheAndClean(request, dataEntry); - return CachedRouteLookupResponse.dataEntry(dataEntry); - } catch (Exception e) { - BackoffCacheEntry backoffEntry = - new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get()); - linkedHashLruCache.cacheAndClean(request, backoffEntry); - return CachedRouteLookupResponse.backoffEntry(backoffEntry); - } + try { + createDataEntry(entry.request, Futures.getDone(entry.pendingCall)); + // Cache updated. DataCacheEntry constructor indirectly calls updateBalancingState() to + // reattempt picks when the child LB is done connecting + } catch (Exception e) { + createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy); + // Cache updated. updateBalancingState() to reattempt picks + helper.propagateRlsError(); } } } - void requestConnection() { - rlsChannel.getState(true); + @GuardedBy("lock") + private DataCacheEntry createDataEntry( + RouteLookupRequest request, RouteLookupResponse routeLookupResponse) { + logger.log( + ChannelLogLevel.DEBUG, + "Transition to data cache: routeLookupResponse={0}", + routeLookupResponse); + DataCacheEntry entry = new DataCacheEntry(request, routeLookupResponse); + // Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until + // this cache update because the lock is held + linkedHashLruCache.cacheAndClean(request, entry); + return entry; + } + + @GuardedBy("lock") + private BackoffCacheEntry createBackOffEntry( + RouteLookupRequest request, Status status, @Nullable BackoffPolicy backoffPolicy) { + logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status); + if (backoffPolicy == null) { + backoffPolicy = backoffProvider.get(); + } + long delayNanos = backoffPolicy.nextBackoffNanos(); + BackoffCacheEntry entry = new BackoffCacheEntry(request, status, backoffPolicy); + // Lock is held, so the task can't execute before the assignment + entry.scheduledFuture = scheduledExecutorService.schedule( + () -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS); + linkedHashLruCache.cacheAndClean(request, entry); + logger.log(ChannelLogLevel.DEBUG, "BackoffCacheEntry created with a delay of {0} nanos", + delayNanos); + return entry; + } + + private void refreshBackoffEntry(BackoffCacheEntry entry) { + synchronized (lock) { + // This checks whether the task has been cancelled and prevents a second execution. + if (!entry.scheduledFuture.cancel(false)) { + // Future was previously cancelled + return; + } + logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending"); + linkedHashLruCache.invalidate(entry.request); + asyncRlsCall(entry.request, entry.backoffPolicy); + } } private static final class RlsLbHelper extends ForwardingLoadBalancerHelper { @@ -353,7 +400,8 @@ public void run() { } void triggerPendingRpcProcessing() { - super.updateBalancingState(state, picker); + helper.getSynchronizationContext().execute( + () -> super.updateBalancingState(state, picker)); } } @@ -455,60 +503,19 @@ public String toString() { } /** A pending cache entry when the async RouteLookup RPC is still on the fly. */ - final class PendingCacheEntry { + static final class PendingCacheEntry { private final ListenableFuture pendingCall; private final RouteLookupRequest request; + @Nullable private final BackoffPolicy backoffPolicy; - PendingCacheEntry( - RouteLookupRequest request, ListenableFuture pendingCall) { - this(request, pendingCall, null); - } - PendingCacheEntry( RouteLookupRequest request, ListenableFuture pendingCall, @Nullable BackoffPolicy backoffPolicy) { this.request = checkNotNull(request, "request"); - this.pendingCall = pendingCall; - this.backoffPolicy = backoffPolicy == null ? backoffProvider.get() : backoffPolicy; - } - - void handleDoneFuture() { - synchronized (lock) { - pendingCallCache.remove(request); - if (pendingCall.isCancelled()) { - return; - } - - try { - transitionToDataEntry(pendingCall.get()); - } catch (Exception e) { - if (e instanceof ThrottledException) { - transitionToBackOff(Status.RESOURCE_EXHAUSTED.withCause(e)); - } else { - transitionToBackOff(Status.fromThrowable(e)); - } - } - } - } - - private void transitionToDataEntry(RouteLookupResponse routeLookupResponse) { - synchronized (lock) { - logger.log( - ChannelLogLevel.DEBUG, - "Transition to data cache: routeLookupResponse={0}", - routeLookupResponse); - linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, routeLookupResponse)); - } - } - - private void transitionToBackOff(Status status) { - synchronized (lock) { - logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status); - linkedHashLruCache.cacheAndClean(request, - new BackoffCacheEntry(request, status, backoffPolicy)); - } + this.pendingCall = checkNotNull(pendingCall, "pendingCall"); + this.backoffPolicy = backoffPolicy; } @Override @@ -541,10 +548,6 @@ final boolean isExpired() { protected long getMinEvictionTime() { return 0L; } - - protected void triggerPendingRpcProcessing() { - helper.triggerPendingRpcProcessing(); - } } /** Implementation of {@link CacheEntry} contains valid data. */ @@ -584,38 +587,14 @@ final class DataCacheEntry extends CacheEntry { * */ void maybeRefresh() { - logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to maybe refresh cache entry"); - synchronized (lock) { - logger.log(ChannelLogLevel.DEBUG, "Lock to maybe refresh cache entry acquired"); + synchronized (lock) { // Lock is already held, but ErrorProne can't tell if (pendingCallCache.containsKey(request)) { // pending already requested logger.log(ChannelLogLevel.DEBUG, "A pending refresh request already created, no need to proceed with refresh"); return; } - final ListenableFuture asyncCall = asyncRlsCall(request); - if (!asyncCall.isDone()) { - logger.log(ChannelLogLevel.DEBUG, - "Async call to rls not yet complete, adding a pending cache entry"); - PendingCacheEntry pendingEntry = new PendingCacheEntry(request, asyncCall); - pendingCallCache.put(request, pendingEntry); - asyncCall.addListener(pendingEntry::handleDoneFuture, synchronizationContext); - } else { - // async call returned finished future is most likely throttled - try { - logger.log(ChannelLogLevel.DEBUG, "Waiting for RLS call to return"); - RouteLookupResponse response = asyncCall.get(); - logger.log(ChannelLogLevel.DEBUG, "RLS call to returned"); - linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - logger.log(ChannelLogLevel.DEBUG, "RLS call failed, adding a backoff entry", e); - BackoffCacheEntry backoffEntry = - new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get()); - linkedHashLruCache.cacheAndClean(request, backoffEntry); - } - } + asyncRlsCall(request, /* backoffPolicy= */ null); } } @@ -701,75 +680,13 @@ public String toString() { private final class BackoffCacheEntry extends CacheEntry { private final Status status; - private final ScheduledHandle scheduledHandle; private final BackoffPolicy backoffPolicy; - private final long expireNanos; - private boolean shutdown = false; + private Future scheduledFuture; BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) { super(request); this.status = checkNotNull(status, "status"); this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy"); - long delayNanos = backoffPolicy.nextBackoffNanos(); - this.expireNanos = ticker.read() + delayNanos; - this.scheduledHandle = - synchronizationContext.schedule( - new Runnable() { - @Override - public void run() { - transitionToPending(); - } - }, - delayNanos, - TimeUnit.NANOSECONDS, - scheduledExecutorService); - logger.log(ChannelLogLevel.DEBUG, "BackoffCacheEntry created with a delay of {0} nanos", - delayNanos); - } - - /** Forcefully refreshes cache entry by ignoring the backoff timer. */ - void forceRefresh() { - logger.log(ChannelLogLevel.DEBUG, "Forcefully refreshing cache entry"); - if (scheduledHandle.isPending()) { - scheduledHandle.cancel(); - transitionToPending(); - } - } - - private void transitionToPending() { - logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to transition to pending"); - synchronized (lock) { - logger.log(ChannelLogLevel.DEBUG, "Acquired lock to transition to pending"); - if (shutdown) { - logger.log(ChannelLogLevel.DEBUG, "Already shut down, not transitioning to pending"); - return; - } - logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending"); - ListenableFuture call = asyncRlsCall(request); - if (!call.isDone()) { - logger.log(ChannelLogLevel.DEBUG, - "Transition to pending RLS call not done, adding a pending cache entry"); - linkedHashLruCache.invalidate(request); - PendingCacheEntry pendingEntry = new PendingCacheEntry(request, call, backoffPolicy); - pendingCallCache.put(request, pendingEntry); - call.addListener(pendingEntry::handleDoneFuture, synchronizationContext); - } else { - try { - logger.log(ChannelLogLevel.DEBUG, - "Waiting for transition to pending RLS call response"); - RouteLookupResponse response = call.get(); - linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - logger.log(ChannelLogLevel.DEBUG, - "Transition to pending RLS call failed, creating a backoff entry", e); - linkedHashLruCache.cacheAndClean( - request, - new BackoffCacheEntry(request, Status.fromThrowable(e), backoffPolicy)); - } - } - } } Status getStatus() { @@ -783,18 +700,12 @@ int getSizeBytes() { @Override boolean isExpired(long now) { - return expireNanos - now <= 0; + return scheduledFuture.isDone(); } @Override void cleanup() { - if (shutdown) { - return; - } - shutdown = true; - if (!scheduledHandle.isPending()) { - scheduledHandle.cancel(); - } + scheduledFuture.cancel(false); } @Override @@ -911,18 +822,20 @@ public void registerBackendResponse(boolean throttled) { /** Implementation of {@link LinkedHashLruCache} for RLS. */ private static final class RlsAsyncLruCache extends LinkedHashLruCache { + private final RlsLbHelper helper; RlsAsyncLruCache(long maxEstimatedSizeBytes, @Nullable EvictionListener evictionListener, - ScheduledExecutorService ses, Ticker ticker, Object lock) { + ScheduledExecutorService ses, Ticker ticker, Object lock, RlsLbHelper helper) { super( maxEstimatedSizeBytes, - new AutoCleaningEvictionListener(evictionListener), + evictionListener, 1, TimeUnit.MINUTES, ses, ticker, lock); + this.helper = checkNotNull(helper, "helper"); } @Override @@ -951,7 +864,7 @@ public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) { // force cleanup if new entry pushed cache over max size (in bytes) if (fitToLimit()) { - value.triggerPendingRpcProcessing(); + helper.triggerPendingRpcProcessing(); } return newEntry; } @@ -977,7 +890,7 @@ public void onStatusChanged(ConnectivityState newState) { logger.log(ChannelLogLevel.DEBUG, "Lock acquired for refreshing backoff cache entries"); for (CacheEntry value : linkedHashLruCache.values()) { if (value instanceof BackoffCacheEntry) { - ((BackoffCacheEntry) value).forceRefresh(); + refreshBackoffEntry((BackoffCacheEntry) value); } } } @@ -1077,9 +990,11 @@ private void startFallbackChildPolicy() { // GuardedBy CachingRlsLbClient.lock void close() { - logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker"); - if (fallbackChildPolicyWrapper != null) { - refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper); + synchronized (lock) { // Lock is already held, but ErrorProne can't tell + logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker"); + if (fallbackChildPolicyWrapper != null) { + refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper); + } } } diff --git a/rls/src/main/java/io/grpc/rls/Throttler.java b/rls/src/main/java/io/grpc/rls/Throttler.java index 08f54c2e1b3..96d17e70adf 100644 --- a/rls/src/main/java/io/grpc/rls/Throttler.java +++ b/rls/src/main/java/io/grpc/rls/Throttler.java @@ -42,27 +42,4 @@ interface Throttler { * @param throttled specifies whether the request was throttled by the backend. */ void registerBackendResponse(boolean throttled); - - /** - * A ThrottledException indicates the call is throttled. This exception is meant to be used by - * caller of {@link Throttler}, the implementation of Throttler should not throw - * this exception when {@link #shouldThrottle()} is called. - */ - final class ThrottledException extends RuntimeException { - - static final long serialVersionUID = 1L; - - public ThrottledException() { - super(); - } - - public ThrottledException(String s) { - super(s); - } - - @Override - public synchronized Throwable fillInStackTrace() { - return this; - } - } } diff --git a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java index 61cf4023779..8dd99bff320 100644 --- a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java +++ b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java @@ -301,7 +301,7 @@ public void get_throttledAndRecover() throws Exception { fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); // initially backed off entry is backed off again verify(evictionListener) - .onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.REPLACED)); + .onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.EXPLICIT)); resp = getInSyncContext(routeLookupRequest); diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index 78f4ee28a52..936539af6bb 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -186,13 +186,18 @@ public void lb_serverStatusCodeConversion() throws Exception { Metadata headers = new Metadata(); PickSubchannelArgsImpl fakeSearchMethodArgs = new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT); + // Warm-up pick; will be queued + PickResult res = picker.pickSubchannel(fakeSearchMethodArgs); + assertThat(res.getStatus().isOk()).isTrue(); + assertThat(res.getSubchannel()).isNull(); + // Cache is warm, but still unconnected picker.pickSubchannel(fakeSearchMethodArgs); // Will create the subchannel FakeSubchannel subchannel = subchannels.peek(); assertThat(subchannel).isNotNull(); // Ensure happy path is unaffected subchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); - PickResult res = picker.pickSubchannel(fakeSearchMethodArgs); + res = picker.pickSubchannel(fakeSearchMethodArgs); assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.OK); // Check on conversion @@ -213,7 +218,12 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { inOrder.verify(helper) .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); + // Warm-up pick; will be queued PickResult res = picker.pickSubchannel(searchSubchannelArgs); + assertThat(res.getStatus().isOk()).isTrue(); + assertThat(res.getSubchannel()).isNull(); + // Cache is warm, but still unconnected + res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); inOrder.verify(helper, atLeast(0)) .updateBalancingState(eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class)); @@ -323,7 +333,12 @@ public void lb_working_withoutDefaultTarget() throws Exception { .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); Metadata headers = new Metadata(); + // Warm-up pick; will be queued PickResult res = picker.pickSubchannel(searchSubchannelArgs); + assertThat(res.getStatus().isOk()).isTrue(); + assertThat(res.getSubchannel()).isNull(); + // Cache is warm, but still unconnected + res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); inOrder.verify(helper, atLeast(0)) .updateBalancingState(eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class));