Skip to content

Commit

Permalink
Use acceptResolvedAddresses() in easy cases
Browse files Browse the repository at this point in the history
We want to move away from handleResolvedAddresses(). These are "easy" in
that they need no logic. LBs extending ForwardingLoadBalancer had the
method duplicated from handleResolvedAddresses() and swapped away from
`super` because ForwardingLoadBalancer only forwards
handleResolvedAddresses() reliably today. Duplicating small methods was
less bug-prone than dealing with ForwardingLoadBalancer.
  • Loading branch information
ejona86 committed Feb 20, 2025
1 parent 892144d commit 83eb495
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
((RpcBehaviorConfig) resolvedAddresses.getLoadBalancingPolicyConfig()).rpcBehavior);
delegateLb.handleResolvedAddresses(resolvedAddresses);
}

@Override
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
helper.setRpcBehavior(
((RpcBehaviorConfig) resolvedAddresses.getLoadBalancingPolicyConfig()).rpcBehavior);
return delegateLb.acceptResolvedAddresses(resolvedAddresses);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ public void handleResolvedAddressesDelegated() {
verify(mockDelegateLb).handleResolvedAddresses(resolvedAddresses);
}

@Test
public void acceptResolvedAddressesDelegated() {
RpcBehaviorLoadBalancer lb = new RpcBehaviorLoadBalancer(new RpcBehaviorHelper(mockHelper),
mockDelegateLb);
ResolvedAddresses resolvedAddresses = buildResolvedAddresses(buildConfig());
lb.acceptResolvedAddresses(resolvedAddresses);
verify(mockDelegateLb).acceptResolvedAddresses(resolvedAddresses);
}

@Test
public void helperWrapsPicker() {
RpcBehaviorHelper helper = new RpcBehaviorHelper(mockHelper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,18 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
.get(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG);
String serviceName = ServiceConfigUtil.getHealthCheckedServiceName(healthCheckingConfig);
helper.setHealthCheckedService(serviceName);
super.handleResolvedAddresses(resolvedAddresses);
delegate.handleResolvedAddresses(resolvedAddresses);
}

@Override
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
Map<String, ?> healthCheckingConfig =
resolvedAddresses
.getAttributes()
.get(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG);
String serviceName = ServiceConfigUtil.getHealthCheckedServiceName(healthCheckingConfig);
helper.setHealthCheckedService(serviceName);
return delegate.acceptResolvedAddresses(resolvedAddresses);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,16 @@ public void setup() throws Exception {
boolean shutdown;

@Override
public void handleResolvedAddresses(final ResolvedAddresses resolvedAddresses) {
public Status acceptResolvedAddresses(final ResolvedAddresses resolvedAddresses) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!shutdown) {
hcLb.handleResolvedAddresses(resolvedAddresses);
hcLb.acceptResolvedAddresses(resolvedAddresses);
}
}
});
return Status.OK;
}

@Override
Expand Down Expand Up @@ -264,9 +265,9 @@ public void typicalWorkflow() {
.setAddresses(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result);
hcLbEventDelivery.acceptResolvedAddresses(result);

verify(origLb).handleResolvedAddresses(result);
verify(origLb).acceptResolvedAddresses(result);
verify(origHelper, atLeast(0)).getSynchronizationContext();
verify(origHelper, atLeast(0)).getScheduledExecutorService();
verifyNoMoreInteractions(origHelper);
Expand Down Expand Up @@ -404,9 +405,9 @@ public void healthCheckDisabledWhenServiceNotImplemented() {
.setAddresses(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result);
hcLbEventDelivery.acceptResolvedAddresses(result);

verify(origLb).handleResolvedAddresses(result);
verify(origLb).acceptResolvedAddresses(result);
verifyNoMoreInteractions(origLb);

// We create 2 Subchannels. One of them connects to a server that doesn't implement health check
Expand Down Expand Up @@ -489,9 +490,9 @@ public void backoffRetriesWhenServerErroneouslyClosesRpcBeforeAnyResponse() {
.setAddresses(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result);
hcLbEventDelivery.acceptResolvedAddresses(result);

verify(origLb).handleResolvedAddresses(result);
verify(origLb).acceptResolvedAddresses(result);
verifyNoMoreInteractions(origLb);

SubchannelStateListener mockHealthListener = mockHealthListeners[0];
Expand Down Expand Up @@ -567,9 +568,9 @@ public void serverRespondResetsBackoff() {
.setAddresses(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result);
hcLbEventDelivery.acceptResolvedAddresses(result);

verify(origLb).handleResolvedAddresses(result);
verify(origLb).acceptResolvedAddresses(result);
verifyNoMoreInteractions(origLb);

SubchannelStateListener mockStateListener = mockStateListeners[0];
Expand Down Expand Up @@ -667,9 +668,9 @@ public void serviceConfigHasNoHealthCheckingInitiallyButDoesLater() {
.setAddresses(resolvedAddressList)
.setAttributes(Attributes.EMPTY)
.build();
hcLbEventDelivery.handleResolvedAddresses(result1);
hcLbEventDelivery.acceptResolvedAddresses(result1);

verify(origLb).handleResolvedAddresses(result1);
verify(origLb).acceptResolvedAddresses(result1);
verifyNoMoreInteractions(origLb);

// First, create Subchannels 0
Expand All @@ -688,8 +689,8 @@ public void serviceConfigHasNoHealthCheckingInitiallyButDoesLater() {
.setAddresses(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result2);
verify(origLb).handleResolvedAddresses(result2);
hcLbEventDelivery.acceptResolvedAddresses(result2);
verify(origLb).acceptResolvedAddresses(result2);

// Health check started on existing Subchannel
assertThat(healthImpls[0].calls).hasSize(1);
Expand All @@ -711,9 +712,9 @@ public void serviceConfigDisablesHealthCheckWhenRpcActive() {
.setAddresses(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result1);
hcLbEventDelivery.acceptResolvedAddresses(result1);

verify(origLb).handleResolvedAddresses(result1);
verify(origLb).acceptResolvedAddresses(result1);
verifyNoMoreInteractions(origLb);

Subchannel subchannel = createSubchannel(0, Attributes.EMPTY, maybeGetMockListener());
Expand All @@ -738,15 +739,15 @@ public void serviceConfigDisablesHealthCheckWhenRpcActive() {
.setAddresses(resolvedAddressList)
.setAttributes(Attributes.EMPTY)
.build();
hcLbEventDelivery.handleResolvedAddresses(result2);
hcLbEventDelivery.acceptResolvedAddresses(result2);

// Health check RPC cancelled.
assertThat(serverCall.cancelled).isTrue();
// Subchannel uses original state
inOrder.verify(getMockListener()).onSubchannelState(
eq(ConnectivityStateInfo.forNonError(READY)));

inOrder.verify(origLb).handleResolvedAddresses(result2);
inOrder.verify(origLb).acceptResolvedAddresses(result2);

verifyNoMoreInteractions(origLb, mockStateListeners[0]);
assertThat(healthImpl.calls).isEmpty();
Expand All @@ -759,9 +760,9 @@ public void serviceConfigDisablesHealthCheckWhenRetryPending() {
.setAddresses(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result);
hcLbEventDelivery.acceptResolvedAddresses(result);

verify(origLb).handleResolvedAddresses(result);
verify(origLb).acceptResolvedAddresses(result);
verifyNoMoreInteractions(origLb);

SubchannelStateListener mockHealthListener = mockHealthListeners[0];
Expand Down Expand Up @@ -793,7 +794,7 @@ public void serviceConfigDisablesHealthCheckWhenRetryPending() {
.setAddresses(resolvedAddressList)
.setAttributes(Attributes.EMPTY)
.build();
hcLbEventDelivery.handleResolvedAddresses(result2);
hcLbEventDelivery.acceptResolvedAddresses(result2);

// Retry timer is cancelled
assertThat(clock.getPendingTasks()).isEmpty();
Expand All @@ -805,7 +806,7 @@ public void serviceConfigDisablesHealthCheckWhenRetryPending() {
inOrder.verify(getMockListener()).onSubchannelState(
eq(ConnectivityStateInfo.forNonError(READY)));

inOrder.verify(origLb).handleResolvedAddresses(result2);
inOrder.verify(origLb).acceptResolvedAddresses(result2);

verifyNoMoreInteractions(origLb, mockStateListeners[0]);
}
Expand All @@ -817,9 +818,9 @@ public void serviceConfigDisablesHealthCheckWhenRpcInactive() {
.setAddresses(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result1);
hcLbEventDelivery.acceptResolvedAddresses(result1);

verify(origLb).handleResolvedAddresses(result1);
verify(origLb).acceptResolvedAddresses(result1);
verifyNoMoreInteractions(origLb);

Subchannel subchannel = createSubchannel(0, Attributes.EMPTY, maybeGetMockListener());
Expand All @@ -842,9 +843,9 @@ public void serviceConfigDisablesHealthCheckWhenRpcInactive() {
.setAddresses(resolvedAddressList)
.setAttributes(Attributes.EMPTY)
.build();
hcLbEventDelivery.handleResolvedAddresses(result2);
hcLbEventDelivery.acceptResolvedAddresses(result2);

inOrder.verify(origLb).handleResolvedAddresses(result2);
inOrder.verify(origLb).acceptResolvedAddresses(result2);

// Underlying subchannel is now ready
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
Expand All @@ -870,9 +871,9 @@ public void serviceConfigChangesServiceNameWhenRpcActive() {
.setAddresses(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result1);
hcLbEventDelivery.acceptResolvedAddresses(result1);

verify(origLb).handleResolvedAddresses(result1);
verify(origLb).acceptResolvedAddresses(result1);
verifyNoMoreInteractions(origLb);

SubchannelStateListener mockHealthListener = mockHealthListeners[0];
Expand Down Expand Up @@ -900,9 +901,9 @@ public void serviceConfigChangesServiceNameWhenRpcActive() {
eq(ConnectivityStateInfo.forNonError(READY)));

// Service config returns with the same health check name.
hcLbEventDelivery.handleResolvedAddresses(result1);
hcLbEventDelivery.acceptResolvedAddresses(result1);
// It's delivered to origLb, but nothing else happens
inOrder.verify(origLb).handleResolvedAddresses(result1);
inOrder.verify(origLb).acceptResolvedAddresses(result1);
verifyNoMoreInteractions(origLb, mockListener);

// Service config returns a different health check name.
Expand All @@ -911,8 +912,8 @@ public void serviceConfigChangesServiceNameWhenRpcActive() {
.setAddresses(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result2);
inOrder.verify(origLb).handleResolvedAddresses(result2);
hcLbEventDelivery.acceptResolvedAddresses(result2);
inOrder.verify(origLb).acceptResolvedAddresses(result2);

// Current health check RPC cancelled.
assertThat(serverCall.cancelled).isTrue();
Expand All @@ -934,9 +935,9 @@ public void serviceConfigChangesServiceNameWhenRetryPending() {
.setAddresses(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result1);
hcLbEventDelivery.acceptResolvedAddresses(result1);

verify(origLb).handleResolvedAddresses(result1);
verify(origLb).acceptResolvedAddresses(result1);
verifyNoMoreInteractions(origLb);

SubchannelStateListener mockHealthListener = mockHealthListeners[0];
Expand Down Expand Up @@ -969,9 +970,9 @@ public void serviceConfigChangesServiceNameWhenRetryPending() {

// Service config returns with the same health check name.

hcLbEventDelivery.handleResolvedAddresses(result1);
hcLbEventDelivery.acceptResolvedAddresses(result1);
// It's delivered to origLb, but nothing else happens
inOrder.verify(origLb).handleResolvedAddresses(result1);
inOrder.verify(origLb).acceptResolvedAddresses(result1);
verifyNoMoreInteractions(origLb, mockListener);
assertThat(clock.getPendingTasks()).hasSize(1);
assertThat(healthImpl.calls).isEmpty();
Expand All @@ -982,12 +983,12 @@ public void serviceConfigChangesServiceNameWhenRetryPending() {
.setAddresses(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result2);
hcLbEventDelivery.acceptResolvedAddresses(result2);
// Concluded CONNECTING state
inOrder.verify(getMockListener()).onSubchannelState(
eq(ConnectivityStateInfo.forNonError(CONNECTING)));

inOrder.verify(origLb).handleResolvedAddresses(result2);
inOrder.verify(origLb).acceptResolvedAddresses(result2);

// Current retry timer cancelled
assertThat(clock.getPendingTasks()).isEmpty();
Expand All @@ -1008,9 +1009,9 @@ public void serviceConfigChangesServiceNameWhenRpcInactive() {
.setAddresses(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result1);
hcLbEventDelivery.acceptResolvedAddresses(result1);

verify(origLb).handleResolvedAddresses(result1);
verify(origLb).acceptResolvedAddresses(result1);
verifyNoMoreInteractions(origLb);

Subchannel subchannel = createSubchannel(0, Attributes.EMPTY, maybeGetMockListener());
Expand All @@ -1031,9 +1032,9 @@ public void serviceConfigChangesServiceNameWhenRpcInactive() {
inOrder.verifyNoMoreInteractions();

// Service config returns with the same health check name.
hcLbEventDelivery.handleResolvedAddresses(result1);
hcLbEventDelivery.acceptResolvedAddresses(result1);
// It's delivered to origLb, but nothing else happens
inOrder.verify(origLb).handleResolvedAddresses(result1);
inOrder.verify(origLb).acceptResolvedAddresses(result1);
assertThat(healthImpl.calls).isEmpty();
verifyNoMoreInteractions(origLb);

Expand All @@ -1043,9 +1044,9 @@ public void serviceConfigChangesServiceNameWhenRpcInactive() {
.setAddresses(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result2);
hcLbEventDelivery.acceptResolvedAddresses(result2);

inOrder.verify(origLb).handleResolvedAddresses(result2);
inOrder.verify(origLb).acceptResolvedAddresses(result2);

// Underlying subchannel is now ready
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
Expand Down Expand Up @@ -1092,9 +1093,9 @@ public void balancerShutdown() {
.setAddresses(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result);
hcLbEventDelivery.acceptResolvedAddresses(result);

verify(origLb).handleResolvedAddresses(result);
verify(origLb).acceptResolvedAddresses(result);
verifyNoMoreInteractions(origLb);
ServerSideCall[] serverCalls = new ServerSideCall[NUM_SUBCHANNELS];

Expand Down Expand Up @@ -1172,8 +1173,8 @@ public LoadBalancer newLoadBalancer(Helper helper) {
.setAddresses(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result);
verify(origLb).handleResolvedAddresses(result);
hcLbEventDelivery.acceptResolvedAddresses(result);
verify(origLb).acceptResolvedAddresses(result);
createSubchannel(0, Attributes.EMPTY);
assertThat(healthImpls[0].calls).isEmpty();
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,8 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
endpointTrackerMap.cancelTracking();
}

switchLb.handleResolvedAddresses(
return switchLb.acceptResolvedAddresses(
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config.childConfig).build());
return Status.OK;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public void acceptResolvedAddresses() {
loadBalancer.acceptResolvedAddresses(resolvedAddresses);

// Handling of resolved addresses is delegated
verify(mockChildLb).handleResolvedAddresses(
verify(mockChildLb).acceptResolvedAddresses(
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(childConfig).build());

// There is a single pending task to run the outlier detection algorithm
Expand Down
4 changes: 1 addition & 3 deletions xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,10 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
Object switchConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME),
new WeightedTargetConfig(weightedPolicySelections));
switchLb.handleResolvedAddresses(
return switchLb.acceptResolvedAddresses(
resolvedAddresses.toBuilder()
.setLoadBalancingPolicyConfig(switchConfig)
.build());

return Status.OK;
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private OrcaOobUtil() {}
* class WrrLoadbalancer extends LoadBalancer {
* private final Helper originHelper; // the original Helper
*
* public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
* public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
* // listener implements the logic for WRR's usage of backend metrics.
* OrcaReportingHelper orcaHelper =
* OrcaOobUtil.newOrcaReportingHelper(originHelper);
Expand Down
Loading

0 comments on commit 83eb495

Please sign in to comment.