Skip to content
Open
5 changes: 0 additions & 5 deletions google-cloud-bigtable/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,13 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-api</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-alts</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.checkerframework</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,8 @@ private Builder() {
.setReverseScans(true)
.setLastScannedRowResponses(true)
.setDirectAccessRequested(DIRECT_PATH_ENABLED)
.setTrafficDirectorEnabled(DIRECT_PATH_ENABLED);
.setTrafficDirectorEnabled(DIRECT_PATH_ENABLED)
.setPeerInfo(true);
}

private Builder(EnhancedBigtableStubSettings settings) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.bigtable.v2.PeerInfo;
import com.google.bigtable.v2.ResponseParams;
import com.google.common.base.Strings;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.alts.AltsContextUtil;
import java.util.Base64;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;

@InternalApi
public class MetadataExtractorInterceptor implements ClientInterceptor {
private final SidebandData sidebandData = new SidebandData();

public GrpcCallContext injectInto(GrpcCallContext ctx) {
// TODO: migrate to using .withTransportChannel
// This will require a change on gax's side to expose the underlying ManagedChannel in
// GrpcTransportChannel (its currently package private).
return ctx.withChannel(ClientInterceptors.intercept(ctx.getChannel(), this))
.withCallOptions(ctx.getCallOptions().withOption(SidebandData.KEY, sidebandData));
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
sidebandData.reset();

super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
@Override
public void onHeaders(Metadata headers) {
sidebandData.onResponseHeaders(headers, getAttributes());
super.onHeaders(headers);
}

@Override
public void onClose(Status status, Metadata trailers) {
sidebandData.onClose(status, trailers);
super.onClose(status, trailers);
}
},
headers);
}
};
}

public SidebandData getSidebandData() {
return sidebandData;
}

public static class SidebandData {
private static final CallOptions.Key<SidebandData> KEY =
CallOptions.Key.create("bigtable-sideband");

private static final Metadata.Key<String> SERVER_TIMING_HEADER_KEY =
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER);
private static final Pattern SERVER_TIMING_HEADER_PATTERN =
Pattern.compile(".*dur=(?<dur>\\d+)");
private static final Metadata.Key<byte[]> LOCATION_METADATA_KEY =
Metadata.Key.of("x-goog-ext-425905942-bin", Metadata.BINARY_BYTE_MARSHALLER);
private static final Metadata.Key<String> PEER_INFO_KEY =
Metadata.Key.of("bigtable-peer-info", Metadata.ASCII_STRING_MARSHALLER);

@Nullable private volatile ResponseParams responseParams;
@Nullable private volatile PeerInfo peerInfo;
@Nullable private volatile Long gfeTiming;

@Nullable
public ResponseParams getResponseParams() {
return responseParams;
}

@Nullable
public PeerInfo getPeerInfo() {
return peerInfo;
}

@Nullable
public Long getGfeTiming() {
return gfeTiming;
}

private void reset() {
responseParams = null;
peerInfo = null;
gfeTiming = null;
}

void onResponseHeaders(Metadata md, Attributes attributes) {
responseParams = extractResponseParams(md);
gfeTiming = extractGfeLatency(md);
peerInfo = extractPeerInfo(md, gfeTiming, attributes);
}

void onClose(Status status, Metadata trailers) {
if (responseParams == null) {
responseParams = extractResponseParams(trailers);
}
}

@Nullable
private static Long extractGfeLatency(Metadata metadata) {
String serverTiming = metadata.get(SERVER_TIMING_HEADER_KEY);
if (serverTiming == null) {
return null;
}
Matcher matcher = SERVER_TIMING_HEADER_PATTERN.matcher(serverTiming);
// this should always be true
if (matcher.find()) {
return Long.parseLong(matcher.group("dur"));
}
return null;
}

@Nullable
private static PeerInfo extractPeerInfo(
Metadata metadata, Long gfeTiming, Attributes attributes) {
String encodedStr = metadata.get(PEER_INFO_KEY);
if (Strings.isNullOrEmpty(encodedStr)) {
return null;
}

try {
byte[] decoded = Base64.getUrlDecoder().decode(encodedStr);
PeerInfo peerInfo = PeerInfo.parseFrom(decoded);
PeerInfo.TransportType effectiveTransport = peerInfo.getTransportType();

// TODO: remove this once transport_type is being sent by the server
// This is a temporary workaround to detect directpath until its available from the server
if (effectiveTransport == PeerInfo.TransportType.TRANSPORT_TYPE_UNKNOWN) {
boolean isAlts = AltsContextUtil.check(attributes);
if (isAlts) {
effectiveTransport = PeerInfo.TransportType.TRANSPORT_TYPE_DIRECT_ACCESS;
} else if (gfeTiming != null) {
effectiveTransport = PeerInfo.TransportType.TRANSPORT_TYPE_CLOUD_PATH;
}
}
if (effectiveTransport != PeerInfo.TransportType.TRANSPORT_TYPE_UNKNOWN) {
peerInfo = peerInfo.toBuilder().setTransportType(effectiveTransport).build();
}
return peerInfo;
} catch (Exception e) {
throw new IllegalArgumentException(
"Failed to parse "
+ PEER_INFO_KEY.name()
+ " from the response header value: "
+ encodedStr);
}
}

@Nullable
private static ResponseParams extractResponseParams(Metadata metadata) {
byte[] responseParams = metadata.get(LOCATION_METADATA_KEY);
if (responseParams != null) {
try {
return ResponseParams.parseFrom(responseParams);
} catch (InvalidProtocolBufferException e) {
}
}
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,18 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracer.TransportAttrs;
import io.grpc.ClientStreamTracer;
import io.grpc.Metadata;
import io.grpc.Status;

/**
* Records the time a request is enqueued in a grpc channel queue. This a bridge between gRPC stream
* tracing and Bigtable tracing. Its primary purpose is to measure the transition time between
* asking gRPC to start an RPC and gRPC actually serializing that RPC.
*/
class BigtableGrpcStreamTracer extends ClientStreamTracer {
private static final String GRPC_LB_LOCALITY_KEY = "grpc.lb.locality";
private static final String GRPC_LB_BACKEND_SERVICE_KEY = "grpc.lb.backend_service";

private final StreamInfo info;
private final BigtableTracer tracer;
private volatile String backendService = null;
private volatile String locality = null;

public BigtableGrpcStreamTracer(StreamInfo info, BigtableTracer tracer) {
this.info = info;
public BigtableGrpcStreamTracer(BigtableTracer tracer) {
this.tracer = tracer;
}

Expand All @@ -44,26 +35,6 @@ public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalU
tracer.grpcMessageSent();
}

@Override
public void addOptionalLabel(String key, String value) {
switch (key) {
case GRPC_LB_LOCALITY_KEY:
this.locality = value;
break;
case GRPC_LB_BACKEND_SERVICE_KEY:
this.backendService = value;
break;
}

super.addOptionalLabel(key, value);
}

@Override
public void streamClosed(Status status) {
tracer.setTransportAttrs(TransportAttrs.create(locality, backendService));
super.streamClosed(status);
}

static class Factory extends ClientStreamTracer.Factory {

private final BigtableTracer tracer;
Expand All @@ -75,7 +46,7 @@ static class Factory extends ClientStreamTracer.Factory {
@Override
public ClientStreamTracer newClientStreamTracer(
ClientStreamTracer.StreamInfo info, Metadata headers) {
return new BigtableGrpcStreamTracer(info, tracer);
return new BigtableGrpcStreamTracer(tracer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.BaseApiTracer;
import com.google.cloud.bigtable.data.v2.stub.MetadataExtractorInterceptor;
import java.time.Duration;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -70,36 +71,12 @@ public int getAttempt() {
return attempt;
}

/**
* Record the latency between Google's network receives the RPC and reads back the first byte of
* the response from server-timing header. If server-timing header is missing, increment the
* missing header count.
*/
public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) {
// noop
}

/** Adds an annotation of the total throttled time of a batch. */
public void batchRequestThrottled(long throttledTimeMs) {
// noop
}

/**
* Set the Bigtable zone and cluster so metrics can be tagged with location information. This will
* be called in BuiltinMetricsTracer.
*/
public void setLocations(String zone, String cluster) {
// noop
}

/** Set the underlying transport used to process the attempt */
public void setTransportAttrs(BuiltinMetricsTracer.TransportAttrs attrs) {}

@Deprecated
/**
* @deprecated {@link #grpcMessageSent()} is called instead.
*/
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
public void setSidebandData(MetadataExtractorInterceptor.SidebandData sidebandData) {
// noop
}

Expand Down
Loading
Loading