diff --git a/api/src/main/java/io/grpc/EquivalentAddressGroup.java b/api/src/main/java/io/grpc/EquivalentAddressGroup.java index 18151e88aba..2dd52fe7f21 100644 --- a/api/src/main/java/io/grpc/EquivalentAddressGroup.java +++ b/api/src/main/java/io/grpc/EquivalentAddressGroup.java @@ -55,6 +55,12 @@ public final class EquivalentAddressGroup { */ public static final Attributes.Key ATTR_LOCALITY_NAME = Attributes.Key.create("io.grpc.EquivalentAddressGroup.LOCALITY"); + /** + * The backend service associated with this EquivalentAddressGroup. + */ + @Attr + static final Attributes.Key ATTR_BACKEND_SERVICE = + Attributes.Key.create("io.grpc.EquivalentAddressGroup.BACKEND_SERVICE"); /** * Endpoint weight for load balancing purposes. While the type is Long, it must be a valid uint32. * Must not be zero. The weight is proportional to the other endpoints; if an endpoint's weight is diff --git a/api/src/main/java/io/grpc/InternalEquivalentAddressGroup.java b/api/src/main/java/io/grpc/InternalEquivalentAddressGroup.java index d4bed4d81bc..cd171208af7 100644 --- a/api/src/main/java/io/grpc/InternalEquivalentAddressGroup.java +++ b/api/src/main/java/io/grpc/InternalEquivalentAddressGroup.java @@ -26,4 +26,10 @@ private InternalEquivalentAddressGroup() {} * twice that of another endpoint, it is intended to receive twice the load. */ public static final Attributes.Key ATTR_WEIGHT = EquivalentAddressGroup.ATTR_WEIGHT; + + /** + * The backend service associated with this EquivalentAddressGroup. + */ + public static final Attributes.Key ATTR_BACKEND_SERVICE = + EquivalentAddressGroup.ATTR_BACKEND_SERVICE; } diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 7a48bf642fe..74867c6b4e6 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -42,6 +42,7 @@ import io.grpc.HttpConnectProxiedSocketAddress; import io.grpc.InternalChannelz; import io.grpc.InternalChannelz.ChannelStats; +import io.grpc.InternalEquivalentAddressGroup; import io.grpc.InternalInstrumented; import io.grpc.InternalLogId; import io.grpc.InternalWithLogId; @@ -603,8 +604,8 @@ public void run() { connectedAddressAttributes = addressIndex.getCurrentEagAttributes(); gotoNonErrorState(READY); subchannelMetrics.recordConnectionAttemptSucceeded(/* target= */ target, - /* backendService= */ getAttributeOrDefault( - addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), + /* backendService= */ getBackendServiceOrDefault( + addressIndex.getCurrentEagAttributes()), /* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(), EquivalentAddressGroup.ATTR_LOCALITY_NAME), /* securityLevel= */ extractSecurityLevel(addressIndex.getCurrentEagAttributes() @@ -635,8 +636,8 @@ public void run() { addressIndex.reset(); gotoNonErrorState(IDLE); subchannelMetrics.recordDisconnection(/* target= */ target, - /* backendService= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(), - NameResolver.ATTR_BACKEND_SERVICE), + /* backendService= */ getBackendServiceOrDefault( + addressIndex.getCurrentEagAttributes()), /* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(), EquivalentAddressGroup.ATTR_LOCALITY_NAME), /* disconnectError= */ disconnectError.toErrorString(), @@ -644,8 +645,8 @@ public void run() { .get(GrpcAttributes.ATTR_SECURITY_LEVEL))); } else if (pendingTransport == transport) { subchannelMetrics.recordConnectionAttemptFailed(/* target= */ target, - /* backendService= */getAttributeOrDefault(addressIndex.getCurrentEagAttributes(), - NameResolver.ATTR_BACKEND_SERVICE), + /* backendService= */ getBackendServiceOrDefault( + addressIndex.getCurrentEagAttributes()), /* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(), EquivalentAddressGroup.ATTR_LOCALITY_NAME)); Preconditions.checkState(state.getState() == CONNECTING, @@ -708,6 +709,14 @@ private String getAttributeOrDefault(Attributes attributes, Attributes.Key addressGroups = + Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr), eagAttributes)); + InternalLogId logId = InternalLogId.allocate("Subchannel", /*details=*/ AUTHORITY); + ChannelTracer subchannelTracer = new ChannelTracer(logId, 10, + fakeClock.getTimeProvider().currentTimeNanos(), "Subchannel"); + LoadBalancer.CreateSubchannelArgs createSubchannelArgs = + LoadBalancer.CreateSubchannelArgs.newBuilder().setAddresses(addressGroups).build(); + internalSubchannel = new InternalSubchannel( + createSubchannelArgs, AUTHORITY, USER_AGENT, mockBackoffPolicyProvider, + mockTransportFactory, fakeClock.getScheduledExecutorService(), + fakeClock.getStopwatchSupplier(), syncContext, mockInternalSubchannelCallback, channelz, + CallTracer.getDefaultFactory().create(), subchannelTracer, logId, + new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()), + Collections.emptyList(), AUTHORITY, mockMetricRecorder + ); + + internalSubchannel.obtainActiveTransport(); + MockClientTransportInfo transportInfo = transports.poll(); + assertNotNull(transportInfo); + transportInfo.listener.transportReady(); + fakeClock.runDueTasks(); + + verify(mockMetricRecorder).addLongCounter( + eqMetricInstrumentName("grpc.subchannel.connection_attempts_succeeded"), + eq(1L), + eq(Arrays.asList(AUTHORITY)), + eq(Arrays.asList(BACKEND_SERVICE, LOCALITY)) + ); + } + private void assertNoCallbackInvoke() { while (fakeExecutor.runDueTasks() > 0) {} assertEquals(0, callbackInvokes.size()); diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 5a59b47c529..f6ee60ab1ef 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -27,6 +27,7 @@ import io.grpc.Attributes; import io.grpc.EquivalentAddressGroup; import io.grpc.HttpConnectProxiedSocketAddress; +import io.grpc.InternalEquivalentAddressGroup; import io.grpc.InternalLogId; import io.grpc.LoadBalancer; import io.grpc.LoadBalancerProvider; @@ -369,6 +370,7 @@ StatusOr edsUpdateToResult( String localityName = localityName(locality); Attributes attr = endpoint.eag().getAttributes().toBuilder() + .set(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE, clusterName) .set(io.grpc.xds.XdsAttributes.ATTR_LOCALITY, locality) .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, localityName) .set(io.grpc.xds.XdsAttributes.ATTR_LOCALITY_WEIGHT, diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index c6e5db08526..7912575215b 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -60,6 +60,7 @@ import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; import io.grpc.HttpConnectProxiedSocketAddress; +import io.grpc.InternalEquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickResult; @@ -317,14 +318,20 @@ public void edsClustersWithRingHashEndpointLbPolicy() throws Exception { // LOCALITY1 are equally weighted. assertThat(addr1.getAddresses()) .isEqualTo(Arrays.asList(newInetSocketAddress("127.0.0.1", 8080))); + assertThat(addr1.getAttributes().get(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE)) + .isEqualTo(CLUSTER); assertThat(addr1.getAttributes().get(io.grpc.xds.XdsAttributes.ATTR_SERVER_WEIGHT)) .isEqualTo(CdsLoadBalancer2.pickFirstWeightedShuffling ? 0x0AAAAAAA /* 1/12 */ : 10); assertThat(addr2.getAddresses()) .isEqualTo(Arrays.asList(newInetSocketAddress("127.0.0.2", 8080))); + assertThat(addr2.getAttributes().get(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE)) + .isEqualTo(CLUSTER); assertThat(addr2.getAttributes().get(io.grpc.xds.XdsAttributes.ATTR_SERVER_WEIGHT)) .isEqualTo(CdsLoadBalancer2.pickFirstWeightedShuffling ? 0x0AAAAAAA /* 1/12 */ : 10); assertThat(addr3.getAddresses()) .isEqualTo(Arrays.asList(newInetSocketAddress("127.0.1.1", 8080))); + assertThat(addr3.getAttributes().get(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE)) + .isEqualTo(CLUSTER); assertThat(addr3.getAttributes().get(io.grpc.xds.XdsAttributes.ATTR_SERVER_WEIGHT)) .isEqualTo(CdsLoadBalancer2.pickFirstWeightedShuffling ? 0x6AAAAAAA /* 5/6 */ : 50 * 60); assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME); @@ -920,6 +927,8 @@ public void onlyLogicalDnsCluster_endpointsResolved() { Arrays.asList(new EquivalentAddressGroup(Arrays.asList( newInetSocketAddress("127.0.2.1", 9000), newInetSocketAddress("127.0.2.2", 9000)))), childBalancer.addresses); + assertThat(childBalancer.addresses.get(0).getAttributes() + .get(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE)).isEqualTo(CLUSTER); assertThat(childBalancer.addresses.get(0).getAttributes() .get(XdsInternalAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME + ":9000"); }