Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class LoadBalancerConfigFactory {
static final String SHUFFLE_ADDRESS_LIST_FIELD_NAME = "shuffleAddressList";

static final String ERROR_UTILIZATION_PENALTY = "errorUtilizationPenalty";
static final String METRIC_NAMES_FOR_COMPUTING_UTILIZATION = "metricNamesForComputingUtilization";

/**
* Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link
Expand Down Expand Up @@ -134,11 +135,9 @@ class LoadBalancerConfigFactory {
* the given config values.
*/
private static ImmutableMap<String, ?> buildWrrConfig(String blackoutPeriod,
String weightExpirationPeriod,
String oobReportingPeriod,
Boolean enableOobLoadReport,
String weightUpdatePeriod,
Float errorUtilizationPenalty) {
String weightExpirationPeriod, String oobReportingPeriod, Boolean enableOobLoadReport,
String weightUpdatePeriod, Float errorUtilizationPenalty,
ImmutableList<String> metricNamesForComputingUtilization) {
ImmutableMap.Builder<String, Object> configBuilder = ImmutableMap.builder();
if (blackoutPeriod != null) {
configBuilder.put(BLACK_OUT_PERIOD, blackoutPeriod);
Expand All @@ -158,6 +157,10 @@ class LoadBalancerConfigFactory {
if (errorUtilizationPenalty != null) {
configBuilder.put(ERROR_UTILIZATION_PENALTY, errorUtilizationPenalty);
}
if (metricNamesForComputingUtilization != null
&& !metricNamesForComputingUtilization.isEmpty()) {
configBuilder.put(METRIC_NAMES_FOR_COMPUTING_UTILIZATION, metricNamesForComputingUtilization);
}
return ImmutableMap.of(WeightedRoundRobinLoadBalancerProvider.SCHEME,
configBuilder.buildOrThrow());
}
Expand Down Expand Up @@ -284,7 +287,7 @@ static class LoadBalancingPolicyConverter {
}

private static ImmutableMap<String, ?> convertWeightedRoundRobinConfig(
ClientSideWeightedRoundRobin wrr) throws ResourceInvalidException {
ClientSideWeightedRoundRobin wrr) throws ResourceInvalidException {
try {
return buildWrrConfig(
wrr.hasBlackoutPeriod() ? Durations.toString(wrr.getBlackoutPeriod()) : null,
Expand All @@ -293,7 +296,8 @@ static class LoadBalancingPolicyConverter {
wrr.hasOobReportingPeriod() ? Durations.toString(wrr.getOobReportingPeriod()) : null,
wrr.hasEnableOobLoadReport() ? wrr.getEnableOobLoadReport().getValue() : null,
wrr.hasWeightUpdatePeriod() ? Durations.toString(wrr.getWeightUpdatePeriod()) : null,
wrr.hasErrorUtilizationPenalty() ? wrr.getErrorUtilizationPenalty().getValue() : null);
wrr.hasErrorUtilizationPenalty() ? wrr.getErrorUtilizationPenalty().getValue() : null,
ImmutableList.copyOf(wrr.getMetricNamesForComputingUtilizationList()));
} catch (IllegalArgumentException ex) {
throw new ResourceInvalidException("Invalid duration in weighted round robin config: "
+ ex.getMessage());
Expand Down
120 changes: 85 additions & 35 deletions xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.grpc.services.MetricReport;
import io.grpc.util.ForwardingSubchannel;
import io.grpc.util.MultiChildLoadBalancer;
import io.grpc.xds.internal.MetricReportUtils;
import io.grpc.xds.orca.OrcaOobUtil;
import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener;
import io.grpc.xds.orca.OrcaPerRequestUtil;
Expand All @@ -49,6 +50,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.OptionalDouble;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -87,6 +89,9 @@
* See related documentation: https://cloud.google.com/service-mesh/legacy/load-balancing-apis/proxyless-configure-advanced-traffic-management#custom-lb-config
*/
final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
@VisibleForTesting
static boolean enableCustomConfig =
Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_WRR_CUSTOM_METRICS"));

private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER;
private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER;
Expand Down Expand Up @@ -189,7 +194,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
this.backendService = "";
}
config =
(WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
(WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();

if (weightUpdateTimer != null && weightUpdateTimer.isPending()) {
weightUpdateTimer.cancel();
Expand Down Expand Up @@ -236,7 +241,8 @@ protected void updateOverallBalancingState() {

private SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) {
WeightedRoundRobinPicker picker = new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),
config.enableOobLoadReport, config.errorUtilizationPenalty, sequence);
config.enableOobLoadReport, config.errorUtilizationPenalty, sequence,
config.metricNamesForComputingUtilization);
updateWeight(picker);
return picker;
}
Expand Down Expand Up @@ -325,12 +331,16 @@ public void addSubchannel(WrrSubchannel wrrSubchannel) {
subchannels.add(wrrSubchannel);
}

public OrcaReportListener getOrCreateOrcaListener(float errorUtilizationPenalty) {
public OrcaReportListener getOrCreateOrcaListener(float errorUtilizationPenalty,
ImmutableList<String> metricNamesForComputingUtilization) {
if (orcaReportListener != null
&& orcaReportListener.errorUtilizationPenalty == errorUtilizationPenalty) {
&& orcaReportListener.errorUtilizationPenalty == errorUtilizationPenalty
&& Objects.equals(orcaReportListener.metricNamesForComputingUtilization,
metricNamesForComputingUtilization)) {
return orcaReportListener;
}
orcaReportListener = new OrcaReportListener(errorUtilizationPenalty);
orcaReportListener =
new OrcaReportListener(errorUtilizationPenalty, metricNamesForComputingUtilization);
return orcaReportListener;
}

Expand All @@ -355,18 +365,19 @@ public void updateBalancingState(ConnectivityState newState, SubchannelPicker ne

final class OrcaReportListener implements OrcaPerRequestReportListener, OrcaOobReportListener {
private final float errorUtilizationPenalty;
private final ImmutableList<String> metricNamesForComputingUtilization;

OrcaReportListener(float errorUtilizationPenalty) {
OrcaReportListener(float errorUtilizationPenalty,
ImmutableList<String> metricNamesForComputingUtilization) {
this.errorUtilizationPenalty = errorUtilizationPenalty;
this.metricNamesForComputingUtilization = metricNamesForComputingUtilization;
}

@Override
public void onLoadReport(MetricReport report) {
double utilization = getUtilization(report, metricNamesForComputingUtilization);

double newWeight = 0;
// Prefer application utilization and fallback to CPU utilization if unset.
double utilization =
report.getApplicationUtilization() > 0 ? report.getApplicationUtilization()
: report.getCpuUtilization();
if (utilization > 0 && report.getQps() > 0) {
double penalty = 0;
if (report.getEps() > 0 && errorUtilizationPenalty > 0) {
Expand All @@ -383,6 +394,37 @@ public void onLoadReport(MetricReport report) {
lastUpdated = ticker.nanoTime();
weight = newWeight;
}

/**
* Returns the utilization value computed from the specified metric names. If the application
* utilization is present and valid, it is returned. Otherwise, the maximum of the custom
* metrics specified is returned. If none of the custom metrics are present, the CPU
* utilization is returned.
*/
private double getUtilization(MetricReport report, ImmutableList<String> metricNames) {
double appUtil = report.getApplicationUtilization();
if (appUtil > 0) {
return appUtil;
}
return getCustomMetricUtilization(report, metricNames)
.orElse(report.getCpuUtilization());
}

/**
* Returns the maximum utilization value among the specified metric names.
* Returns OptionalDouble.empty() if NONE of the specified metrics are present in the report,
* or if all present metrics are NaN.
* Returns OptionalDouble.of(maxUtil) if at least one non-NaN metric is present.
*/
private OptionalDouble getCustomMetricUtilization(MetricReport report,
ImmutableList<String> metricNames) {
return metricNames.stream()
.map(name -> MetricReportUtils.getMetric(report, name))
.filter(OptionalDouble::isPresent)
.mapToDouble(OptionalDouble::getAsDouble)
.filter(d -> !Double.isNaN(d) && d > 0)
.max();
}
}
}

Expand All @@ -403,10 +445,10 @@ private void createAndApplyOrcaListeners() {
for (WrrSubchannel weightedSubchannel : wChild.subchannels) {
if (config.enableOobLoadReport) {
OrcaOobUtil.setListener(weightedSubchannel,
wChild.getOrCreateOrcaListener(config.errorUtilizationPenalty),
wChild.getOrCreateOrcaListener(config.errorUtilizationPenalty,
config.metricNamesForComputingUtilization),
OrcaOobUtil.OrcaReportingConfig.newBuilder()
.setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS)
.build());
.setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS).build());
} else {
OrcaOobUtil.setListener(weightedSubchannel, null, null);
}
Expand Down Expand Up @@ -473,7 +515,8 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {
private volatile StaticStrideScheduler scheduler;

WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,
float errorUtilizationPenalty, AtomicInteger sequence) {
float errorUtilizationPenalty, AtomicInteger sequence,
ImmutableList<String> metricNamesForComputingUtilization) {
checkNotNull(children, "children");
Preconditions.checkArgument(!children.isEmpty(), "empty child list");
this.children = children;
Expand All @@ -482,7 +525,8 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {
for (ChildLbState child : children) {
WeightedChildLbState wChild = (WeightedChildLbState) child;
pickers.add(wChild.getCurrentPicker());
reportListeners.add(wChild.getOrCreateOrcaListener(errorUtilizationPenalty));
reportListeners.add(wChild.getOrCreateOrcaListener(errorUtilizationPenalty,
metricNamesForComputingUtilization));
}
this.pickers = pickers;
this.reportListeners = reportListeners;
Expand Down Expand Up @@ -565,11 +609,11 @@ public boolean equals(Object o) {
* The Static Stride Scheduler is an implementation of an earliest deadline first (EDF) scheduler
* in which each object's deadline is the multiplicative inverse of the object's weight.
* <p>
* The way in which this is implemented is through a static stride scheduler.
* The way in which this is implemented is through a static stride scheduler.
* The Static Stride Scheduler works by iterating through the list of subchannel weights
* and using modular arithmetic to proportionally distribute picks, favoring entries
* with higher weights. It is based on the observation that the intended sequence generated
* from an EDF scheduler is a periodic one that can be achieved through modular arithmetic.
* and using modular arithmetic to proportionally distribute picks, favoring entries
* with higher weights. It is based on the observation that the intended sequence generated
* from an EDF scheduler is a periodic one that can be achieved through modular arithmetic.
* The Static Stride Scheduler is more performant than other implementations of the EDF
* Scheduler, as it removes the need for a priority queue (and thus mutex locks).
* <p>
Expand Down Expand Up @@ -720,23 +764,23 @@ static final class WeightedRoundRobinLoadBalancerConfig {
final long oobReportingPeriodNanos;
final long weightUpdatePeriodNanos;
final float errorUtilizationPenalty;
final ImmutableList<String> metricNamesForComputingUtilization;

public static Builder newBuilder() {
return new Builder();
}

private WeightedRoundRobinLoadBalancerConfig(long blackoutPeriodNanos,
long weightExpirationPeriodNanos,
boolean enableOobLoadReport,
long oobReportingPeriodNanos,
long weightUpdatePeriodNanos,
float errorUtilizationPenalty) {
long weightExpirationPeriodNanos, boolean enableOobLoadReport, long oobReportingPeriodNanos,
long weightUpdatePeriodNanos, float errorUtilizationPenalty,
ImmutableList<String> metricNamesForComputingUtilization) {
this.blackoutPeriodNanos = blackoutPeriodNanos;
this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
this.enableOobLoadReport = enableOobLoadReport;
this.oobReportingPeriodNanos = oobReportingPeriodNanos;
this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
this.errorUtilizationPenalty = errorUtilizationPenalty;
this.metricNamesForComputingUtilization = metricNamesForComputingUtilization;
}

@Override
Expand All @@ -751,27 +795,26 @@ public boolean equals(Object o) {
&& this.oobReportingPeriodNanos == that.oobReportingPeriodNanos
&& this.weightUpdatePeriodNanos == that.weightUpdatePeriodNanos
// Float.compare considers NaNs equal
&& Float.compare(this.errorUtilizationPenalty, that.errorUtilizationPenalty) == 0;
&& Float.compare(this.errorUtilizationPenalty, that.errorUtilizationPenalty) == 0
&& Objects.equals(this.metricNamesForComputingUtilization,
that.metricNamesForComputingUtilization);
}

@Override
public int hashCode() {
return Objects.hash(
blackoutPeriodNanos,
weightExpirationPeriodNanos,
enableOobLoadReport,
oobReportingPeriodNanos,
weightUpdatePeriodNanos,
errorUtilizationPenalty);
return Objects.hash(blackoutPeriodNanos, weightExpirationPeriodNanos, enableOobLoadReport,
oobReportingPeriodNanos, weightUpdatePeriodNanos, errorUtilizationPenalty,
metricNamesForComputingUtilization);
}

static final class Builder {
long blackoutPeriodNanos = 10_000_000_000L; // 10s
long weightExpirationPeriodNanos = 180_000_000_000L; //3min
long weightExpirationPeriodNanos = 180_000_000_000L; // 3min
boolean enableOobLoadReport = false;
long oobReportingPeriodNanos = 10_000_000_000L; // 10s
long weightUpdatePeriodNanos = 1_000_000_000L; // 1s
float errorUtilizationPenalty = 1.0F;
ImmutableList<String> metricNamesForComputingUtilization = ImmutableList.of();

private Builder() {

Expand Down Expand Up @@ -809,10 +852,17 @@ Builder setErrorUtilizationPenalty(float errorUtilizationPenalty) {
return this;
}

Builder setMetricNamesForComputingUtilization(
List<String> metricNamesForComputingUtilization) {
this.metricNamesForComputingUtilization =
ImmutableList.copyOf(metricNamesForComputingUtilization);
return this;
}

WeightedRoundRobinLoadBalancerConfig build() {
return new WeightedRoundRobinLoadBalancerConfig(blackoutPeriodNanos,
weightExpirationPeriodNanos, enableOobLoadReport, oobReportingPeriodNanos,
weightUpdatePeriodNanos, errorUtilizationPenalty);
weightExpirationPeriodNanos, enableOobLoadReport, oobReportingPeriodNanos,
weightUpdatePeriodNanos, errorUtilizationPenalty, metricNamesForComputingUtilization);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.grpc.Status;
import io.grpc.internal.JsonUtil;
import io.grpc.xds.WeightedRoundRobinLoadBalancer.WeightedRoundRobinLoadBalancerConfig;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -73,14 +74,16 @@ public ConfigOrError parseLoadBalancingPolicyConfig(Map<String, ?> rawConfig) {
private ConfigOrError parseLoadBalancingPolicyConfigInternal(Map<String, ?> rawConfig) {
Long blackoutPeriodNanos = JsonUtil.getStringAsDuration(rawConfig, "blackoutPeriod");
Long weightExpirationPeriodNanos =
JsonUtil.getStringAsDuration(rawConfig, "weightExpirationPeriod");
JsonUtil.getStringAsDuration(rawConfig, "weightExpirationPeriod");
Long oobReportingPeriodNanos = JsonUtil.getStringAsDuration(rawConfig, "oobReportingPeriod");
Boolean enableOobLoadReport = JsonUtil.getBoolean(rawConfig, "enableOobLoadReport");
Long weightUpdatePeriodNanos = JsonUtil.getStringAsDuration(rawConfig, "weightUpdatePeriod");
Float errorUtilizationPenalty = JsonUtil.getNumberAsFloat(rawConfig, "errorUtilizationPenalty");
List<String> metricNamesForComputingUtilization = JsonUtil.getListOfStrings(rawConfig,
LoadBalancerConfigFactory.METRIC_NAMES_FOR_COMPUTING_UTILIZATION);

WeightedRoundRobinLoadBalancerConfig.Builder configBuilder =
WeightedRoundRobinLoadBalancerConfig.newBuilder();
WeightedRoundRobinLoadBalancerConfig.newBuilder();
if (blackoutPeriodNanos != null) {
configBuilder.setBlackoutPeriodNanos(blackoutPeriodNanos);
}
Expand All @@ -102,6 +105,11 @@ private ConfigOrError parseLoadBalancingPolicyConfigInternal(Map<String, ?> rawC
if (errorUtilizationPenalty != null) {
configBuilder.setErrorUtilizationPenalty(errorUtilizationPenalty);
}
if (metricNamesForComputingUtilization != null) {
if (WeightedRoundRobinLoadBalancer.enableCustomConfig) {
configBuilder.setMetricNamesForComputingUtilization(metricNamesForComputingUtilization);
}
}
return ConfigOrError.fromConfig(configBuilder.build());
}
}
Loading
Loading