Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/src/main/java/io/grpc/EquivalentAddressGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public final class EquivalentAddressGroup {
*/
public static final Attributes.Key<String> ATTR_LOCALITY_NAME =
Attributes.Key.create("io.grpc.EquivalentAddressGroup.LOCALITY");
/**
* The backend service associated with this EquivalentAddressGroup.
*/
@Attr
static final Attributes.Key<String> ATTR_BACKEND_SERVICE =
Attributes.Key.create("io.grpc.EquivalentAddressGroup.BACKEND_SERVICE");
/**
* Endpoint weight for load balancing purposes. While the type is Long, it must be a valid uint32.
* Must not be zero. The weight is proportional to the other endpoints; if an endpoint's weight is
Expand Down
6 changes: 6 additions & 0 deletions api/src/main/java/io/grpc/InternalEquivalentAddressGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,10 @@ private InternalEquivalentAddressGroup() {}
* twice that of another endpoint, it is intended to receive twice the load.
*/
public static final Attributes.Key<Long> ATTR_WEIGHT = EquivalentAddressGroup.ATTR_WEIGHT;

/**
* The backend service associated with this EquivalentAddressGroup.
*/
public static final Attributes.Key<String> ATTR_BACKEND_SERVICE =
EquivalentAddressGroup.ATTR_BACKEND_SERVICE;
}
21 changes: 15 additions & 6 deletions core/src/main/java/io/grpc/internal/InternalSubchannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.InternalChannelz;
import io.grpc.InternalChannelz.ChannelStats;
import io.grpc.InternalEquivalentAddressGroup;
import io.grpc.InternalInstrumented;
import io.grpc.InternalLogId;
import io.grpc.InternalWithLogId;
Expand Down Expand Up @@ -603,8 +604,8 @@ public void run() {
connectedAddressAttributes = addressIndex.getCurrentEagAttributes();
gotoNonErrorState(READY);
subchannelMetrics.recordConnectionAttemptSucceeded(/* target= */ target,
/* backendService= */ getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
/* backendService= */ getBackendServiceOrDefault(
addressIndex.getCurrentEagAttributes()),
/* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
EquivalentAddressGroup.ATTR_LOCALITY_NAME),
/* securityLevel= */ extractSecurityLevel(addressIndex.getCurrentEagAttributes()
Expand Down Expand Up @@ -635,17 +636,17 @@ public void run() {
addressIndex.reset();
gotoNonErrorState(IDLE);
subchannelMetrics.recordDisconnection(/* target= */ target,
/* backendService= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
NameResolver.ATTR_BACKEND_SERVICE),
/* backendService= */ getBackendServiceOrDefault(
addressIndex.getCurrentEagAttributes()),
/* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
EquivalentAddressGroup.ATTR_LOCALITY_NAME),
/* disconnectError= */ disconnectError.toErrorString(),
/* securityLevel= */ extractSecurityLevel(addressIndex.getCurrentEagAttributes()
.get(GrpcAttributes.ATTR_SECURITY_LEVEL)));
} else if (pendingTransport == transport) {
subchannelMetrics.recordConnectionAttemptFailed(/* target= */ target,
/* backendService= */getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
NameResolver.ATTR_BACKEND_SERVICE),
/* backendService= */ getBackendServiceOrDefault(
addressIndex.getCurrentEagAttributes()),
/* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
EquivalentAddressGroup.ATTR_LOCALITY_NAME));
Preconditions.checkState(state.getState() == CONNECTING,
Expand Down Expand Up @@ -708,6 +709,14 @@ private String getAttributeOrDefault(Attributes attributes, Attributes.Key<Strin
String value = attributes.get(key);
return value == null ? "" : value;
}

private String getBackendServiceOrDefault(Attributes attributes) {
String value = attributes.get(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE);
if (value == null) {
value = attributes.get(NameResolver.ATTR_BACKEND_SERVICE);
}
return value == null ? "" : value;
}
}

// All methods are called in syncContext
Expand Down
44 changes: 42 additions & 2 deletions core/src/test/java/io/grpc/internal/InternalSubchannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalChannelz;
import io.grpc.InternalEquivalentAddressGroup;
import io.grpc.InternalLogId;
import io.grpc.InternalWithLogId;
import io.grpc.LoadBalancer;
Expand Down Expand Up @@ -1510,7 +1511,7 @@ public void subchannelStateChanges_triggersAttemptFailedMetric() {
when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy);
SocketAddress addr = mock(SocketAddress.class);
Attributes eagAttributes = Attributes.newBuilder()
.set(NameResolver.ATTR_BACKEND_SERVICE, BACKEND_SERVICE)
.set(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE, BACKEND_SERVICE)
.set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, LOCALITY)
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SECURITY_LEVEL)
.build();
Expand Down Expand Up @@ -1564,7 +1565,7 @@ public void subchannelStateChanges_triggersSuccessAndDisconnectMetrics() {
// 2. Setup Subchannel with attributes
SocketAddress addr = mock(SocketAddress.class);
Attributes eagAttributes = Attributes.newBuilder()
.set(NameResolver.ATTR_BACKEND_SERVICE, BACKEND_SERVICE)
.set(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE, BACKEND_SERVICE)
.set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, LOCALITY)
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SECURITY_LEVEL)
.build();
Expand Down Expand Up @@ -1631,6 +1632,45 @@ public void subchannelStateChanges_triggersSuccessAndDisconnectMetrics() {
inOrder.verifyNoMoreInteractions();
}

@Test
public void subchannelStateChanges_backendServiceFallsBackToResolutionResultAttr() {
when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy);
SocketAddress addr = mock(SocketAddress.class);
Attributes eagAttributes = Attributes.newBuilder()
.set(NameResolver.ATTR_BACKEND_SERVICE, BACKEND_SERVICE)
.set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, LOCALITY)
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SECURITY_LEVEL)
.build();
List<EquivalentAddressGroup> addressGroups =
Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr), eagAttributes));
InternalLogId logId = InternalLogId.allocate("Subchannel", /*details=*/ AUTHORITY);
ChannelTracer subchannelTracer = new ChannelTracer(logId, 10,
fakeClock.getTimeProvider().currentTimeNanos(), "Subchannel");
LoadBalancer.CreateSubchannelArgs createSubchannelArgs =
LoadBalancer.CreateSubchannelArgs.newBuilder().setAddresses(addressGroups).build();
internalSubchannel = new InternalSubchannel(
createSubchannelArgs, AUTHORITY, USER_AGENT, mockBackoffPolicyProvider,
mockTransportFactory, fakeClock.getScheduledExecutorService(),
fakeClock.getStopwatchSupplier(), syncContext, mockInternalSubchannelCallback, channelz,
CallTracer.getDefaultFactory().create(), subchannelTracer, logId,
new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()),
Collections.emptyList(), AUTHORITY, mockMetricRecorder
);

internalSubchannel.obtainActiveTransport();
MockClientTransportInfo transportInfo = transports.poll();
assertNotNull(transportInfo);
transportInfo.listener.transportReady();
fakeClock.runDueTasks();

verify(mockMetricRecorder).addLongCounter(
eqMetricInstrumentName("grpc.subchannel.connection_attempts_succeeded"),
eq(1L),
eq(Arrays.asList(AUTHORITY)),
eq(Arrays.asList(BACKEND_SERVICE, LOCALITY))
);
}

private void assertNoCallbackInvoke() {
while (fakeExecutor.runDueTasks() > 0) {}
assertEquals(0, callbackInvokes.size());
Expand Down
2 changes: 2 additions & 0 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.InternalEquivalentAddressGroup;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
Expand Down Expand Up @@ -369,6 +370,7 @@ StatusOr<ClusterResolutionResult> edsUpdateToResult(
String localityName = localityName(locality);
Attributes attr =
endpoint.eag().getAttributes().toBuilder()
.set(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE, clusterName)
.set(io.grpc.xds.XdsAttributes.ATTR_LOCALITY, locality)
.set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, localityName)
.set(io.grpc.xds.XdsAttributes.ATTR_LOCALITY_WEIGHT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.InternalEquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
Expand Down Expand Up @@ -317,14 +318,20 @@ public void edsClustersWithRingHashEndpointLbPolicy() throws Exception {
// LOCALITY1 are equally weighted.
assertThat(addr1.getAddresses())
.isEqualTo(Arrays.asList(newInetSocketAddress("127.0.0.1", 8080)));
assertThat(addr1.getAttributes().get(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE))
.isEqualTo(CLUSTER);
assertThat(addr1.getAttributes().get(io.grpc.xds.XdsAttributes.ATTR_SERVER_WEIGHT))
.isEqualTo(CdsLoadBalancer2.pickFirstWeightedShuffling ? 0x0AAAAAAA /* 1/12 */ : 10);
assertThat(addr2.getAddresses())
.isEqualTo(Arrays.asList(newInetSocketAddress("127.0.0.2", 8080)));
assertThat(addr2.getAttributes().get(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE))
.isEqualTo(CLUSTER);
assertThat(addr2.getAttributes().get(io.grpc.xds.XdsAttributes.ATTR_SERVER_WEIGHT))
.isEqualTo(CdsLoadBalancer2.pickFirstWeightedShuffling ? 0x0AAAAAAA /* 1/12 */ : 10);
assertThat(addr3.getAddresses())
.isEqualTo(Arrays.asList(newInetSocketAddress("127.0.1.1", 8080)));
assertThat(addr3.getAttributes().get(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE))
.isEqualTo(CLUSTER);
assertThat(addr3.getAttributes().get(io.grpc.xds.XdsAttributes.ATTR_SERVER_WEIGHT))
.isEqualTo(CdsLoadBalancer2.pickFirstWeightedShuffling ? 0x6AAAAAAA /* 5/6 */ : 50 * 60);
assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME);
Expand Down Expand Up @@ -920,6 +927,8 @@ public void onlyLogicalDnsCluster_endpointsResolved() {
Arrays.asList(new EquivalentAddressGroup(Arrays.asList(
newInetSocketAddress("127.0.2.1", 9000), newInetSocketAddress("127.0.2.2", 9000)))),
childBalancer.addresses);
assertThat(childBalancer.addresses.get(0).getAttributes()
.get(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE)).isEqualTo(CLUSTER);
assertThat(childBalancer.addresses.get(0).getAttributes()
.get(XdsInternalAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME + ":9000");
}
Expand Down
Loading