Skip to content

Commit

Permalink
util: Use acceptResolvedAddresses() for MultiChildLb children
Browse files Browse the repository at this point in the history
A failing Status from acceptResolvedAddresses means something is wrong
with the config, but parts of the config may still have been applied.
Thus there are now two possible flows: errors that should prevent
updateOverallBalancingState() and errors that should have no effect
other than the return code. To manage that, MultChildLb must always be
responsible for calling updateOverallBalancingState().
acceptResolvedAddressesInternal() was inlined to make that error
processing easier. No existing usages actually needed to have logic
between updating the children and regenerating the picker.

RingHashLb already was verifying that the address list was not empty, so
the short-circuiting when acceptResolvedAddressesInternal() returned an
error was impossible to trigger. WrrLb's updateWeightTask() calls the
last picker, so it can run before acceptResolvedAddressesInternal(); the
only part that matters is re-creating the weightUpdateTimer.
  • Loading branch information
ejona86 committed Feb 18, 2025
1 parent a132123 commit 7136070
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 126 deletions.
74 changes: 22 additions & 52 deletions util/src/main/java/io/grpc/util/MultiChildLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,22 @@ protected ChildLbState createChildLbState(Object key) {
*/
@Override
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
logger.log(Level.FINE, "Received resolution result: {0}", resolvedAddresses);
try {
resolvingAddresses = true;

// process resolvedAddresses to update children
AcceptResolvedAddrRetVal acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
if (!acceptRetVal.status.isOk()) {
return acceptRetVal.status;
Map<Object, ResolvedAddresses> newChildAddresses = createChildAddressesMap(resolvedAddresses);

// Handle error case
if (newChildAddresses.isEmpty()) {
Status unavailableStatus = Status.UNAVAILABLE.withDescription(
"NameResolver returned no usable address. " + resolvedAddresses);
handleNameResolutionError(unavailableStatus);
return unavailableStatus;
}

// Update the picker and our connectivity state
updateOverallBalancingState();

// shutdown removed children
shutdownRemoved(acceptRetVal.removedChildren);
return acceptRetVal.status;
return updateChildrenWithResolvedAddresses(newChildAddresses);
} finally {
resolvingAddresses = false;
}
Expand Down Expand Up @@ -149,31 +150,7 @@ public void shutdown() {
childLbStates.clear();
}

/**
* This does the work to update the child map and calculate which children have been removed.
* You must call {@link #updateOverallBalancingState} to update the picker
* and call {@link #shutdownRemoved(List)} to shutdown the endpoints that have been removed.
*/
protected final AcceptResolvedAddrRetVal acceptResolvedAddressesInternal(
ResolvedAddresses resolvedAddresses) {
logger.log(Level.FINE, "Received resolution result: {0}", resolvedAddresses);

Map<Object, ResolvedAddresses> newChildAddresses = createChildAddressesMap(resolvedAddresses);

// Handle error case
if (newChildAddresses.isEmpty()) {
Status unavailableStatus = Status.UNAVAILABLE.withDescription(
"NameResolver returned no usable address. " + resolvedAddresses);
handleNameResolutionError(unavailableStatus);
return new AcceptResolvedAddrRetVal(unavailableStatus, null);
}

List<ChildLbState> removed = updateChildrenWithResolvedAddresses(newChildAddresses);
return new AcceptResolvedAddrRetVal(Status.OK, removed);
}

/** Returns removed children. */
private List<ChildLbState> updateChildrenWithResolvedAddresses(
private Status updateChildrenWithResolvedAddresses(
Map<Object, ResolvedAddresses> newChildAddresses) {
// Create a map with the old values
Map<Object, ChildLbState> oldStatesMap =
Expand All @@ -183,6 +160,7 @@ private List<ChildLbState> updateChildrenWithResolvedAddresses(
}

// Move ChildLbStates from the map to a new list (preserving the new map's order)
Status status = Status.OK;
List<ChildLbState> newChildLbStates = new ArrayList<>(newChildAddresses.size());
for (Map.Entry<Object, ResolvedAddresses> entry : newChildAddresses.entrySet()) {
ChildLbState childLbState = oldStatesMap.remove(entry.getKey());
Expand All @@ -191,21 +169,23 @@ private List<ChildLbState> updateChildrenWithResolvedAddresses(
}
newChildLbStates.add(childLbState);
if (entry.getValue() != null) {
childLbState.lb.handleResolvedAddresses(entry.getValue()); // update child LB
// update child LB
Status newStatus = childLbState.lb.acceptResolvedAddresses(entry.getValue());
if (!newStatus.isOk()) {
status = newStatus;
}
}
}

childLbStates = newChildLbStates;
// Remaining entries in map are orphaned
return new ArrayList<>(oldStatesMap.values());
}
// Update the picker and our connectivity state
updateOverallBalancingState();

protected final void shutdownRemoved(List<ChildLbState> removedChildren) {
// Do shutdowns after updating picker to reduce the chance of failing an RPC by picking a
// subchannel that has been shutdown.
for (ChildLbState childLbState : removedChildren) {
// Remaining entries in map are orphaned
for (ChildLbState childLbState : oldStatesMap.values()) {
childLbState.shutdown();
}
return status;
}

@Nullable
Expand Down Expand Up @@ -406,14 +386,4 @@ public String toString() {
return addrs.toString();
}
}

protected static class AcceptResolvedAddrRetVal {
public final Status status;
public final List<ChildLbState> removedChildren;

public AcceptResolvedAddrRetVal(Status status, List<ChildLbState> removedChildren) {
this.status = status;
this.removedChildren = removedChildren;
}
}
}
90 changes: 37 additions & 53 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,62 +87,46 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
return addressValidityStatus;
}

try {
resolvingAddresses = true;
AcceptResolvedAddrRetVal acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
if (!acceptRetVal.status.isOk()) {
return acceptRetVal.status;
}

// Now do the ringhash specific logic with weights and building the ring
RingHashConfig config = (RingHashConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
if (config == null) {
throw new IllegalArgumentException("Missing RingHash configuration");
// Now do the ringhash specific logic with weights and building the ring
RingHashConfig config = (RingHashConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
if (config == null) {
throw new IllegalArgumentException("Missing RingHash configuration");
}
Map<EquivalentAddressGroup, Long> serverWeights = new HashMap<>();
long totalWeight = 0L;
for (EquivalentAddressGroup eag : addrList) {
Long weight = eag.getAttributes().get(XdsAttributes.ATTR_SERVER_WEIGHT);
// Support two ways of server weighing: either multiple instances of the same address
// or each address contains a per-address weight attribute. If a weight is not provided,
// each occurrence of the address will be counted a weight value of one.
if (weight == null) {
weight = 1L;
}
Map<EquivalentAddressGroup, Long> serverWeights = new HashMap<>();
long totalWeight = 0L;
for (EquivalentAddressGroup eag : addrList) {
Long weight = eag.getAttributes().get(XdsAttributes.ATTR_SERVER_WEIGHT);
// Support two ways of server weighing: either multiple instances of the same address
// or each address contains a per-address weight attribute. If a weight is not provided,
// each occurrence of the address will be counted a weight value of one.
if (weight == null) {
weight = 1L;
}
totalWeight += weight;
EquivalentAddressGroup addrKey = stripAttrs(eag);
if (serverWeights.containsKey(addrKey)) {
serverWeights.put(addrKey, serverWeights.get(addrKey) + weight);
} else {
serverWeights.put(addrKey, weight);
}
totalWeight += weight;
EquivalentAddressGroup addrKey = stripAttrs(eag);
if (serverWeights.containsKey(addrKey)) {
serverWeights.put(addrKey, serverWeights.get(addrKey) + weight);
} else {
serverWeights.put(addrKey, weight);
}
// Calculate scale
long minWeight = Collections.min(serverWeights.values());
double normalizedMinWeight = (double) minWeight / totalWeight;
// Scale up the number of hashes per host such that the least-weighted host gets a whole
// number of hashes on the the ring. Other hosts might not end up with whole numbers, and
// that's fine (the ring-building algorithm can handle this). This preserves the original
// implementation's behavior: when weights aren't provided, all hosts should get an equal
// number of hashes. In the case where this number exceeds the max_ring_size, it's scaled
// back down to fit.
double scale = Math.min(
Math.ceil(normalizedMinWeight * config.minRingSize) / normalizedMinWeight,
(double) config.maxRingSize);

// Build the ring
ring = buildRing(serverWeights, totalWeight, scale);

// Must update channel picker before return so that new RPCs will not be routed to deleted
// clusters and resolver can remove them in service config.
updateOverallBalancingState();

shutdownRemoved(acceptRetVal.removedChildren);
} finally {
this.resolvingAddresses = false;
}

return Status.OK;
// Calculate scale
long minWeight = Collections.min(serverWeights.values());
double normalizedMinWeight = (double) minWeight / totalWeight;
// Scale up the number of hashes per host such that the least-weighted host gets a whole
// number of hashes on the the ring. Other hosts might not end up with whole numbers, and
// that's fine (the ring-building algorithm can handle this). This preserves the original
// implementation's behavior: when weights aren't provided, all hosts should get an equal
// number of hashes. In the case where this number exceeds the max_ring_size, it's scaled
// back down to fit.
double scale = Math.min(
Math.ceil(normalizedMinWeight * config.minRingSize) / normalizedMinWeight,
(double) config.maxRingSize);

// Build the ring
ring = buildRing(serverWeights, totalWeight, scale);

return super.acceptResolvedAddresses(resolvedAddresses);
}


Expand Down
28 changes: 7 additions & 21 deletions xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,31 +170,17 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
}
config =
(WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
AcceptResolvedAddrRetVal acceptRetVal;
try {
resolvingAddresses = true;
acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
if (!acceptRetVal.status.isOk()) {
return acceptRetVal.status;
}

if (weightUpdateTimer != null && weightUpdateTimer.isPending()) {
weightUpdateTimer.cancel();
}
updateWeightTask.run();

createAndApplyOrcaListeners();
if (weightUpdateTimer != null && weightUpdateTimer.isPending()) {
weightUpdateTimer.cancel();
}
updateWeightTask.run();

// Must update channel picker before return so that new RPCs will not be routed to deleted
// clusters and resolver can remove them in service config.
updateOverallBalancingState();
Status status = super.acceptResolvedAddresses(resolvedAddresses);

shutdownRemoved(acceptRetVal.removedChildren);
} finally {
resolvingAddresses = false;
}
createAndApplyOrcaListeners();

return acceptRetVal.status;
return status;
}

/**
Expand Down

0 comments on commit 7136070

Please sign in to comment.