diff --git a/java-datastore/google-cloud-datastore/pom.xml b/java-datastore/google-cloud-datastore/pom.xml index 819663f4bfda..5657c5d2bf67 100644 --- a/java-datastore/google-cloud-datastore/pom.xml +++ b/java-datastore/google-cloud-datastore/pom.xml @@ -196,12 +196,20 @@ io.opentelemetry opentelemetry-sdk - test io.opentelemetry opentelemetry-sdk-common - test + + + com.google.cloud + google-cloud-monitoring + 3.91.0 + + + com.google.api.grpc + proto-google-cloud-monitoring-v3 + 3.91.0 io.opentelemetry @@ -216,7 +224,6 @@ io.opentelemetry opentelemetry-sdk-metrics - test io.opentelemetry diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java index c7b938e5edcb..c5f4b9a7a027 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java @@ -162,6 +162,16 @@ public T call() throws DatastoreException { } } + /** + * Closes the Datastore client and releases all resources. + * + *

This method closes the underlying RPC channel and then closes the {@link + * com.google.cloud.datastore.telemetry.DatastoreMetricsRecorder}. For clients using the built-in + * Cloud Monitoring exporter, closing the recorder flushes any buffered metrics and shuts down the + * private {@link io.opentelemetry.sdk.OpenTelemetrySdk} instance. For clients using a + * user-provided {@link io.opentelemetry.api.OpenTelemetry} instance, the recorder close is a + * no-op since the user owns that instance's lifecycle. + */ @Override public void close() throws Exception { try { @@ -169,6 +179,8 @@ public void close() throws Exception { } catch (Exception e) { logger.log(Level.WARNING, "Failed to close channels", e); } + // Close the default Metrics Recorder if exists + getOptions().getMetricsRecorder().close(); } @Override diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOpenTelemetryOptions.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOpenTelemetryOptions.java index 50444353751b..9c6199c162f7 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOpenTelemetryOptions.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOpenTelemetryOptions.java @@ -16,25 +16,39 @@ package com.google.cloud.datastore; +import com.google.api.core.BetaApi; import io.opentelemetry.api.OpenTelemetry; import javax.annotation.Nonnull; import javax.annotation.Nullable; +/** + * Represents the options that are used to configure the use of OpenTelemetry for telemetry + * collection in the Datastore SDK. + */ public class DatastoreOpenTelemetryOptions { private final boolean tracingEnabled; private final boolean metricsEnabled; + private final boolean exportBuiltinMetricsToGoogleCloudMonitoring; private final @Nullable OpenTelemetry openTelemetry; DatastoreOpenTelemetryOptions(Builder builder) { this.tracingEnabled = builder.tracingEnabled; this.metricsEnabled = builder.metricsEnabled; + this.exportBuiltinMetricsToGoogleCloudMonitoring = + builder.exportBuiltinMetricsToGoogleCloudMonitoring; this.openTelemetry = builder.openTelemetry; } /** - * Returns whether either tracing or metrics are enabled. Telemetry is disabled by default. + * Returns whether either tracing or custom metrics (via a user-provided {@link OpenTelemetry} + * instance) are enabled. + * + *

Note: This method does not reflect the state of built-in metrics export to + * Google Cloud Monitoring, which is controlled separately by {@link + * #isExportBuiltinMetricsToGoogleCloudMonitoring()} and is {@code false} by default. To check + * whether any telemetry is active, also consult that flag. * - * @return {@code true} if either tracing or metrics are enabled, {@code false} otherwise. + * @return {@code true} if tracing or custom OTel metrics are enabled, {@code false} otherwise. */ public boolean isEnabled() { return tracingEnabled || metricsEnabled; @@ -50,7 +64,7 @@ public boolean isTracingEnabled() { } /** - * Returns whether metrics are enabled. + * Returns whether metrics are enabled for the custom (user-provided) OpenTelemetry backend. * * @return {@code true} if metrics are enabled, {@code false} otherwise. */ @@ -58,16 +72,46 @@ public boolean isMetricsEnabled() { return metricsEnabled; } + /** + * Returns whether built-in metrics should be exported to Google Cloud Monitoring. + * + *

When enabled, client-side metrics are automatically exported to Google Cloud Monitoring + * using the Cloud Monitoring API. This is independent of the custom OpenTelemetry backend + * configured via {@link #getOpenTelemetry()}. + * + * @return {@code true} if built-in metrics export to Cloud Monitoring is enabled, {@code false} + * otherwise. + */ + @BetaApi + public boolean isExportBuiltinMetricsToGoogleCloudMonitoring() { + return exportBuiltinMetricsToGoogleCloudMonitoring; + } + + /** + * Returns the custom {@link OpenTelemetry} instance, if one was provided. + * + * @return the custom {@link OpenTelemetry} instance, or {@code null} if none was provided. + */ @Nullable public OpenTelemetry getOpenTelemetry() { return openTelemetry; } + /** + * Returns a new {@link Builder} initialized with the values from this options instance. + * + * @return a new {@link Builder}. + */ @Nonnull public DatastoreOpenTelemetryOptions.Builder toBuilder() { return new DatastoreOpenTelemetryOptions.Builder(this); } + /** + * Returns a new default {@link Builder}. + * + * @return a new {@link Builder}. + */ @Nonnull public static DatastoreOpenTelemetryOptions.Builder newBuilder() { return new DatastoreOpenTelemetryOptions.Builder(); @@ -77,18 +121,23 @@ public static class Builder { private boolean tracingEnabled; private boolean metricsEnabled; + private boolean exportBuiltinMetricsToGoogleCloudMonitoring; @Nullable private OpenTelemetry openTelemetry; private Builder() { tracingEnabled = false; metricsEnabled = false; + // TODO: This is disabled by default until the Firestore namespace is deployed + exportBuiltinMetricsToGoogleCloudMonitoring = false; openTelemetry = null; } private Builder(DatastoreOpenTelemetryOptions options) { this.tracingEnabled = options.tracingEnabled; this.metricsEnabled = options.metricsEnabled; + this.exportBuiltinMetricsToGoogleCloudMonitoring = + options.exportBuiltinMetricsToGoogleCloudMonitoring; this.openTelemetry = options.openTelemetry; } @@ -96,6 +145,7 @@ private Builder(DatastoreOpenTelemetryOptions options) { * Sets whether tracing should be enabled. * * @param enabled Whether tracing should be enabled. + * @return this builder instance. */ @Nonnull public DatastoreOpenTelemetryOptions.Builder setTracingEnabled(boolean enabled) { @@ -104,10 +154,10 @@ public DatastoreOpenTelemetryOptions.Builder setTracingEnabled(boolean enabled) } /** - * Sets whether metrics should be enabled. + * Sets whether metrics should be enabled for the custom (user-provided) OpenTelemetry backend. * * @param enabled Whether metrics should be enabled. - * @return the builder instance. + * @return this builder instance. */ @Nonnull DatastoreOpenTelemetryOptions.Builder setMetricsEnabled(boolean enabled) { @@ -115,12 +165,31 @@ DatastoreOpenTelemetryOptions.Builder setMetricsEnabled(boolean enabled) { return this; } + /** + * Sets whether built-in metrics should be exported to Google Cloud Monitoring. + * + *

When enabled, client-side metrics are automatically exported to Google Cloud Monitoring + * using the Cloud Monitoring API. This can be disabled to prevent metrics from being sent to + * Cloud Monitoring while still allowing metrics to flow to a custom OpenTelemetry backend. + * + * @param exportBuiltinMetrics Whether built-in metrics should be exported to Cloud Monitoring. + * @return this builder instance. + */ + @BetaApi + @Nonnull + public DatastoreOpenTelemetryOptions.Builder setExportBuiltinMetricsToGoogleCloudMonitoring( + boolean exportBuiltinMetrics) { + this.exportBuiltinMetricsToGoogleCloudMonitoring = exportBuiltinMetrics; + return this; + } + /** * Sets the {@link OpenTelemetry} to use with this Datastore instance. If telemetry collection - * is enabled, but an `OpenTelemetry` is not provided, the Datastore SDK will attempt to use the - * `GlobalOpenTelemetry`. + * is enabled, but an {@code OpenTelemetry} is not provided, the Datastore SDK will attempt to + * use the {@code GlobalOpenTelemetry}. * * @param openTelemetry The OpenTelemetry that should be used by this Datastore instance. + * @return this builder instance. */ @Nonnull public DatastoreOpenTelemetryOptions.Builder setOpenTelemetry( @@ -129,6 +198,11 @@ public DatastoreOpenTelemetryOptions.Builder setOpenTelemetry( return this; } + /** + * Builds a new {@link DatastoreOpenTelemetryOptions} instance from this builder. + * + * @return a new {@link DatastoreOpenTelemetryOptions}. + */ @Nonnull public DatastoreOpenTelemetryOptions build() { return new DatastoreOpenTelemetryOptions(this); diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/BuiltInDatastoreMetricsProvider.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/BuiltInDatastoreMetricsProvider.java new file mode 100644 index 000000000000..3c5853fc5bdd --- /dev/null +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/BuiltInDatastoreMetricsProvider.java @@ -0,0 +1,222 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore.telemetry; + +import com.google.auth.Credentials; +import com.google.cloud.NoCredentials; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.resources.Resource; +import java.lang.management.ManagementFactory; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Provides a built-in {@link OpenTelemetry} instance for Datastore client-side metrics. + * + *

This class is responsible for configuring a private {@link OpenTelemetrySdk} that exports + * metrics to Google Cloud Monitoring using a {@link DatastoreCloudMonitoringExporter}. + * + *

The implementation follows the pattern used in other Google Cloud client libraries, providing + * automated environment detection and resource attribute configuration for the {@link + * TelemetryConstants#DATASTORE_RESOURCE_TYPE} monitored resource. + */ +public final class BuiltInDatastoreMetricsProvider { + + public static final BuiltInDatastoreMetricsProvider INSTANCE = + new BuiltInDatastoreMetricsProvider(); + + private static final Logger logger = + Logger.getLogger(BuiltInDatastoreMetricsProvider.class.getName()); + + private static volatile String location; + private static final String DEFAULT_LOCATION = "global"; + + private final Map cachedClientAttributes; + + private BuiltInDatastoreMetricsProvider() { + cachedClientAttributes = Collections.unmodifiableMap(buildClientAttributes()); + } + + private Map buildClientAttributes() { + Map attrs = new HashMap<>(); + attrs.put(TelemetryConstants.CLIENT_UID_KEY.getKey(), getDefaultTaskValue()); + attrs.put(TelemetryConstants.SERVICE_KEY.getKey(), TelemetryConstants.SERVICE_VALUE); + return attrs; + } + + /** + * Creates a new {@link OpenTelemetry} instance for a single Datastore client's built-in metrics. + * + *

Each call returns a dedicated {@link OpenTelemetrySdk} wrapping an {@link SdkMeterProvider} + * configured with the provided project's monitored resource attributes and a {@link + * DatastoreCloudMonitoringExporter}. No global or shared state is modified. + * + *

Lifecycle: The returned instance is owned by the caller. It should be closed by + * calling {@link io.opentelemetry.sdk.OpenTelemetrySdk#close()} (or via {@link + * OpenTelemetryDatastoreMetricsRecorder#close()}) when the associated Datastore client is closed. + * + *

No caching is performed here; callers are responsible for holding the returned instance for + * the lifetime of their Datastore client. + * + * @param projectId the GCP project ID. + * @param databaseId the Datastore database ID. + * @param credentials the credentials to use for exporting metrics. + * @return a new {@link OpenTelemetry} instance, or {@code null} if it could not be created. + */ + @Nullable + public OpenTelemetry createOpenTelemetry( + @Nonnull String projectId, @Nonnull String databaseId, @Nonnull Credentials credentials) { + SdkMeterProviderBuilder sdkMeterProviderBuilder = SdkMeterProvider.builder(); + + // Generate unique client attributes (including unique taskId) for this specific client + // instance. + Map clientAttributes = buildClientAttributes(); + + if (credentials instanceof NoCredentials) { + logger.log( + Level.WARNING, + "Built-in metrics exporting is disabled when using NoCredentials (emulator)."); + return null; + } + + DatastoreCloudMonitoringExporter exporter = + DatastoreCloudMonitoringExporter.create( + projectId, databaseId, credentials, clientAttributes); + if (exporter == null) { + return null; + } + + // Register Datastore-specific views and the PeriodicMetricReader. + DatastoreBuiltInMetricsView.registerBuiltinMetrics(exporter, sdkMeterProviderBuilder); + // Configure the monitored resource attributes for this specific client. + sdkMeterProviderBuilder.setResource( + Resource.create(createResourceAttributes(projectId, databaseId))); + SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build(); + return OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build(); + } + + /** + * Detects the client's GCP location (region). + * + *

To avoid dependencies on external resource detection libraries, this implementation + * currently defaults to "global". + * + * @return the detected location, or "global" if detection fails. + */ + public static String detectClientLocation() { + if (location == null) { + location = DEFAULT_LOCATION; + } + return location; + } + + /** + * Creates resource attributes for the {@link TelemetryConstants#DATASTORE_RESOURCE_TYPE} + * monitored resource. + * + *

These attributes are attached to the OTel {@link Resource} and used by the exporter to + * populate the resource labels in Cloud Monitoring. + * + * @param projectId the GCP project ID. + * @param databaseId the Datastore database ID. + * @return the resource attributes. + */ + Attributes createResourceAttributes(String projectId, String databaseId) { + AttributesBuilder attributesBuilder = + Attributes.builder() + .put(TelemetryConstants.PROJECT_ID_KEY, projectId) + .put(TelemetryConstants.DATABASE_ID_KEY, databaseId) + .put(TelemetryConstants.LOCATION_ID_KEY, detectClientLocation()); + return attributesBuilder.build(); + } + + /** + * Returns common client attributes added to every exported metric data point. + * + *

The returned map is pre-computed at construction time and shared across all export calls, + * since {@code client_name}, {@code client_uid}, and {@code service} are stable for the lifetime + * of the process. + * + * @return an unmodifiable map of client attributes. + */ + Map getClientAttributes() { + return cachedClientAttributes; + } + + /** + * Generates a unique identifier for the {@code client_uid} metric field. + * + *

Combines a random UUID with {@code RuntimeMXBean.getName()} (typically {@code + * pid@hostname}). The UUID prefix ensures uniqueness across process restarts that reuse the same + * PID, preventing Cloud Monitoring from conflating time series from different process lifecycles. + * + *

For Java 9 and later, the PID is obtained using the ProcessHandle API. For Java 8, the PID + * is extracted from ManagementFactory.getRuntimeMXBean().getName(). + * + *

Note: This method generates a new value every time it is called to ensure that each + * client instance gets a unique ID. It should be called sparingly (e.g., once per client + * creation) to avoid performance overhead from UUID generation and hostname lookup. + * + * @return a unique identifier string. + */ + private static String getDefaultTaskValue() { + String identifier = UUID.randomUUID().toString(); + String pid = getProcessId(); + + try { + String hostname = InetAddress.getLocalHost().getHostName(); + return identifier + "@" + pid + "@" + hostname; + } catch (UnknownHostException e) { + logger.log(Level.CONFIG, "Unable to get the hostname.", e); + return identifier + "@" + pid + "@localhost"; + } + } + + private static String getProcessId() { + try { + // Check if Java 9+ and ProcessHandle class is available + Class processHandleClass = Class.forName("java.lang.ProcessHandle"); + Method currentMethod = processHandleClass.getMethod("current"); + Object processHandleInstance = currentMethod.invoke(null); + Method pidMethod = processHandleClass.getMethod("pid"); + long pid = (long) pidMethod.invoke(processHandleInstance); + return Long.toString(pid); + } catch (Exception e) { + // Fallback to Java 8 method + final String jvmName = ManagementFactory.getRuntimeMXBean().getName(); + if (jvmName != null && jvmName.contains("@")) { + return jvmName.split("@")[0]; + } else { + return "unknown"; + } + } + } +} diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/CompositeDatastoreMetricsRecorder.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/CompositeDatastoreMetricsRecorder.java new file mode 100644 index 000000000000..705a14ff9796 --- /dev/null +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/CompositeDatastoreMetricsRecorder.java @@ -0,0 +1,102 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore.telemetry; + +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A {@link DatastoreMetricsRecorder} implementation that fans out recording calls to multiple + * underlying recorders. + * + *

This allows simultaneous recording to both built-in (Cloud Monitoring) and custom + * (user-provided) OpenTelemetry backends. + */ +class CompositeDatastoreMetricsRecorder implements DatastoreMetricsRecorder { + + private static final Logger logger = + Logger.getLogger(CompositeDatastoreMetricsRecorder.class.getName()); + + private final List recorders; + + CompositeDatastoreMetricsRecorder(List recorders) { + this.recorders = recorders; + } + + @Override + public void recordTransactionLatency(double latencyMs, Map attributes) { + for (DatastoreMetricsRecorder recorder : recorders) { + recorder.recordTransactionLatency(latencyMs, attributes); + } + } + + @Override + public void recordTransactionAttemptCount(long count, Map attributes) { + for (DatastoreMetricsRecorder recorder : recorders) { + recorder.recordTransactionAttemptCount(count, attributes); + } + } + + @Override + public void recordAttemptLatency(double latencyMs, Map attributes) { + for (DatastoreMetricsRecorder recorder : recorders) { + recorder.recordAttemptLatency(latencyMs, attributes); + } + } + + @Override + public void recordAttemptCount(long count, Map attributes) { + for (DatastoreMetricsRecorder recorder : recorders) { + recorder.recordAttemptCount(count, attributes); + } + } + + @Override + public void recordOperationLatency(double latencyMs, Map attributes) { + for (DatastoreMetricsRecorder recorder : recorders) { + recorder.recordOperationLatency(latencyMs, attributes); + } + } + + @Override + public void recordOperationCount(long count, Map attributes) { + for (DatastoreMetricsRecorder recorder : recorders) { + recorder.recordOperationCount(count, attributes); + } + } + + /** + * Closes all underlying recorders. + * + *

Each recorder's own {@link DatastoreMetricsRecorder#close()} semantics apply: recorders that + * own their {@link io.opentelemetry.api.OpenTelemetry} instance (built-in path) will flush and + * shut down; recorders backed by a user-provided instance will no-op. All recorders are closed + * even if one throws an exception. + */ + @Override + public void close() { + for (int i = recorders.size() - 1; i >= 0; i--) { + try { + recorders.get(i).close(); + } catch (Exception e) { + logger.log(Level.WARNING, "Failed to close metrics recorder", e); + } + } + } +} diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DatastoreBuiltInMetricsView.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DatastoreBuiltInMetricsView.java new file mode 100644 index 000000000000..61828ed071fc --- /dev/null +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DatastoreBuiltInMetricsView.java @@ -0,0 +1,160 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore.telemetry; + +import com.google.api.gax.tracing.OpenTelemetryMetricsRecorder; +import com.google.common.collect.ImmutableList; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Configures OpenTelemetry Views for Datastore built-in metrics. + * + *

Views are a critical "post-processing" layer in this implementation. They are used here for + * three primary reasons: + * + *

    + *
  1. Metric Mapping (GAX Alignment): GAX records generic metrics like {@code + * operation_latency}. We use Views to catch these metrics and prepend the official {@link + * TelemetryConstants#METRIC_PREFIX}, ensuring all RPC metrics follow the Firestore domain + * naming convention. + *
  2. Latency Precision: Datastore operations vary from sub-millisecond lookups to + * multi-second transactional commits. Default OTel histogram buckets do not handle this well. + * Define explicit {@link DatastoreBuiltInMetricsView#BUCKET_BOUNDARIES} to ensure that + * latency heatmaps in Cloud Monitoring are more readable. + *
  3. Cloud Monitoring Schema Alignment: The Cloud Monitoring API is strict about labels. + * Exporting unexpected attributes can cause the entire export to fail. Views allow us to + * strictly filter attributes down to the {@link TelemetryConstants#COMMON_ATTRIBUTES}. + *
+ */ +class DatastoreBuiltInMetricsView { + + // Standard bucket boundaries for latency metrics in milliseconds. + private static final List BUCKET_BOUNDARIES = + ImmutableList.of( + 0.0, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, + 16.0, 17.0, 18.0, 19.0, 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, + 200.0, 250.0, 300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, + 50000.0, 100000.0, 200000.0, 400000.0, 800000.0, 1600000.0, 3200000.0); + + private static final Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM = + Aggregation.explicitBucketHistogram(BUCKET_BOUNDARIES); + + private DatastoreBuiltInMetricsView() {} + + /** + * Registers Datastore built-in metrics and views on the provided {@link SdkMeterProviderBuilder}. + * + *

This method acts as the central configuration point for the metrics pipeline, stitching + * together the recording (SDK), the processing (Views), and the delivery (Exporter). + * + * @param metricExporter the exporter to use for metrics. + * @param builder the builder to register views and the {@link PeriodicMetricReader} on. + */ + static void registerBuiltinMetrics( + MetricExporter metricExporter, SdkMeterProviderBuilder builder) { + registerGaxViews(builder); + registerDatastoreViews(builder); + // Metrics are collected in-memory and flushed periodically to avoid impacting the + // performance of critical-path Datastore operations. + builder.registerMetricReader(PeriodicMetricReader.create(metricExporter)); + } + + /** + * Configures views for metrics generated by the GAX library. + * + *

GAX provides standardized RPC metrics but uses a generic namespace. We apply these Views to + * "claim" those metrics for Datastore, ensuring they appear in the official Cloud Monitoring + * dashboard with the correct names and the necessary millisecond-level precision. + */ + private static void registerGaxViews(SdkMeterProviderBuilder builder) { + for (String metricName : TelemetryConstants.GAX_METRICS) { + Aggregation aggregation = Aggregation.defaultAggregation(); + + // Differentiate the instrumentation type between `count` vs `latency` + InstrumentType type = InstrumentType.COUNTER; + String unit = "1"; + + // Latency metrics use histograms with specific millisecond buckets. + if (metricName.endsWith("latency")) { + aggregation = AGGREGATION_WITH_MILLIS_HISTOGRAM; + type = InstrumentType.HISTOGRAM; + unit = "ms"; + } + + // Select metrics from the GAX meter scope. + InstrumentSelector selector = + InstrumentSelector.builder() + .setMeterName(OpenTelemetryMetricsRecorder.GAX_METER_NAME) + .setName(metricName) + .setType(type) + .setUnit(unit) + .build(); + + // Only allow common attributes to be attached to the metrics. + Set attributesFilter = + TelemetryConstants.COMMON_ATTRIBUTES.stream() + .map(AttributeKey::getKey) + .collect(Collectors.toSet()); + + String renamedMetricName = + TelemetryConstants.GAX_METRIC_NAME_MAP.getOrDefault(metricName, metricName); + + // Rename the metric to use the Datastore prefix for Cloud Monitoring. + View view = + View.builder() + .setName(TelemetryConstants.METRIC_PREFIX + "/" + renamedMetricName) + .setAggregation(aggregation) + .setAttributeFilter(attributesFilter) + .build(); + + builder.registerView(selector, view); + } + } + + /** + * Configures views for metrics generated specifically by the Datastore SDK. + * + *

For Datastore-native metrics (like transaction latency), we use Views primarily to enforce a + * strict attribute allowlist. This ensures that only approved labels (e.g., service, method, + * status) are exported, keeping the Cloud Monitoring time series clean and within quota limits. + */ + private static void registerDatastoreViews(SdkMeterProviderBuilder builder) { + // Select all metrics from the Datastore meter scope. + InstrumentSelector selector = + InstrumentSelector.builder().setMeterName(TelemetryConstants.DATASTORE_METER_NAME).build(); + + // Filter to allow only common attributes. + Set attributesFilter = + TelemetryConstants.COMMON_ATTRIBUTES.stream() + .map(AttributeKey::getKey) + .collect(Collectors.toSet()); + + View view = View.builder().setAttributeFilter(attributesFilter).build(); + + builder.registerView(selector, view); + } +} diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DatastoreCloudMonitoringExporter.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DatastoreCloudMonitoringExporter.java new file mode 100644 index 000000000000..58351d3755cf --- /dev/null +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DatastoreCloudMonitoringExporter.java @@ -0,0 +1,340 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore.telemetry; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.rpc.PermissionDeniedException; +import com.google.auth.Credentials; +import com.google.cloud.monitoring.v3.MetricServiceClient; +import com.google.cloud.monitoring.v3.MetricServiceSettings; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.monitoring.v3.CreateTimeSeriesRequest; +import com.google.monitoring.v3.ProjectName; +import com.google.monitoring.v3.TimeSeries; +import com.google.protobuf.Empty; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Datastore Cloud Monitoring OpenTelemetry Exporter. + * + *

The exporter will look for all Datastore-owned metrics under {@link + * TelemetryConstants#METRIC_PREFIX} instrumentation scope and upload it via the Google Cloud + * Monitoring API. + * + *

This implementation is a standalone exporter that does not depend on the {@code + * com.google.cloud.opentelemetry:exporter-metrics} library, to avoid external version management + * and ensure tight integration with Datastore requirements. + * + *

The implementation in this file is inspired from the original work done in the Spanner client + * library (SpannerCloudMonitoringExporter) to export metrics. The logic has been adapted for + * Datastore's use case. + */ +class DatastoreCloudMonitoringExporter implements MetricExporter { + + private static final Logger logger = + Logger.getLogger(DatastoreCloudMonitoringExporter.class.getName()); + + /** + * Wrapper class to hold a {@link MetricServiceClient} and its reference count. This is used to + * share the client across multiple exporter instances. + */ + static class CachedMetricsClient { + final MetricServiceClient client; + final AtomicInteger refCount = new AtomicInteger(0); + + CachedMetricsClient(MetricServiceClient client) { + this.client = client; + } + } + + /** + * Shared cache for {@link MetricServiceClient} instances, keyed by "projectId:databaseId". This + * prevents creating a new gRPC client for every exporter instance, reducing resource usage. + * Reference counting is used to safely shut down the client when no longer needed. + */ + static final ConcurrentHashMap METRICS_CLIENT_CACHE = + new ConcurrentHashMap<>(); + + private final MetricServiceClient client; + private final Map clientAttributes; + + // This is the quota limit from Cloud Monitoring. More details in + // https://cloud.google.com/monitoring/quotas#custom_metrics_quotas. + private static final int EXPORT_BATCH_SIZE_LIMIT = 200; + + // Increase max metadata size to 32MB to avoid "Header size exceeded" errors + // when receiving large error payloads from Cloud Monitoring. + private static final int MAX_METADATA_SIZE = 32 * 1024 * 1024; + + // Flag to prevent log spam of any export failures + private final AtomicBoolean datastoreExportFailureLogged = new AtomicBoolean(false); + + private final String projectId; + private final String databaseId; + + /** + * Creates a new instance of the exporter. + * + *

The gRPC channel is configured with a 32MB inbound metadata limit ({@link + * #MAX_METADATA_SIZE}) to prevent "Header size exceeded" errors when Cloud Monitoring returns + * large error payloads in gRPC trailers. The default gRPC limit is too small for some error + * responses and can mask the real failure reason. + * + *

{@code createTimeSeries} is used instead of {@code createServiceTimeSeries} because the + * Firestore namespace in Cloud Monitoring has not yet been deployed as a service resource. Once + * the namespace is available, this should be migrated to {@code createServiceTimeSeries} for + * correct quota and resource attribution. + * + * @param projectId the GCP project ID where metrics will be exported. + * @param credentials the credentials used to authenticate with Cloud Monitoring. + * @return a new {@link DatastoreCloudMonitoringExporter} instance. + */ + @Nullable + static DatastoreCloudMonitoringExporter create( + String projectId, + String databaseId, + Credentials credentials, + Map clientAttributes) { + String key = projectId + ":" + databaseId; + + // Use compute to acquire or create the client atomically with reference counting. + // If creation fails, we log the error and return null so it's not added to the map. + CachedMetricsClient cachedMetricsClient = + METRICS_CLIENT_CACHE.compute( + key, + (k, v) -> { + if (v == null) { + try { + v = new CachedMetricsClient(createMetricServiceClient(credentials)); + } catch (IOException e) { + logger.log( + Level.WARNING, + "Failed to create MetricServiceClient for metrics export. Monitoring will be disabled.", + e); + return null; // Do not add to map + } + } + v.refCount.incrementAndGet(); + return v; + }); + + // If there is no client in the cache (creation failed), return null. + if (cachedMetricsClient == null) { + return null; + } + + return new DatastoreCloudMonitoringExporter( + projectId, databaseId, cachedMetricsClient.client, clientAttributes); + } + + private static MetricServiceClient createMetricServiceClient(Credentials credentials) + throws IOException { + MetricServiceSettings.Builder settingsBuilder = MetricServiceSettings.newBuilder(); + + InstantiatingGrpcChannelProvider transportChannelProvider = + MetricServiceSettings.defaultGrpcTransportProviderBuilder() + .setMaxInboundMetadataSize(MAX_METADATA_SIZE) + .build(); + settingsBuilder.setTransportChannelProvider(transportChannelProvider); + + settingsBuilder.setCredentialsProvider(FixedCredentialsProvider.create(credentials)); + + settingsBuilder + .createTimeSeriesSettings() + .setSimpleTimeoutNoRetriesDuration(Duration.ofMinutes(1)); + + return MetricServiceClient.create(settingsBuilder.build()); + } + + @VisibleForTesting + DatastoreCloudMonitoringExporter( + String projectId, + String databaseId, + MetricServiceClient client, + Map clientAttributes) { + this.client = client; + this.projectId = projectId; + this.databaseId = databaseId; + this.clientAttributes = clientAttributes; + } + + /** + * Exports the provided collection of {@link MetricData} to Cloud Monitoring. + * + * @param collection the collection of metric data to export. + * @return a {@link CompletableResultCode} indicating the result of the export operation. + */ + @Override + public CompletableResultCode export(@Nonnull Collection collection) { + if (client.isShutdown()) { + logger.log(Level.WARNING, "Exporter is shut down"); + return CompletableResultCode.ofFailure(); + } + + // Skips exporting if there's none + if (collection.isEmpty()) { + return CompletableResultCode.ofSuccess(); + } + + List datastoreTimeSeries; + try { + // Convert OTel MetricData to Cloud Monitoring TimeSeries. + datastoreTimeSeries = + DatastoreCloudMonitoringExporterUtils.convertToDatastoreTimeSeries( + new ArrayList<>(collection), clientAttributes); + } catch (Throwable e) { + logger.log( + Level.WARNING, + "Failed to convert Datastore metric data to Cloud Monitoring timeseries.", + e); + return CompletableResultCode.ofFailure(); + } + + ProjectName projectName = ProjectName.of(projectId); + + // Perform the actual network call to Cloud Monitoring. + ApiFuture> futureList = exportTimeSeriesInBatch(projectName, datastoreTimeSeries); + + CompletableResultCode datastoreExportCode = new CompletableResultCode(); + ApiFutures.addCallback( + futureList, + new ApiFutureCallback>() { + @Override + public void onFailure(Throwable throwable) { + // Only log failure once to avoid log spam, then reset on success. + if (datastoreExportFailureLogged.compareAndSet(false, true)) { + String msg = "createTimeSeries request failed for Datastore metrics."; + + if (throwable instanceof PermissionDeniedException) { + msg += + String.format( + " Need monitoring metric writer permission on project=%s.", + projectName.getProject()); + } + logger.log(Level.WARNING, msg, throwable); + } + datastoreExportCode.fail(); + } + + @Override + public void onSuccess(List empty) { + // When an export succeeded reset the export failure flag to false so if there's a + // transient failure it'll be logged. + datastoreExportFailureLogged.set(false); + datastoreExportCode.succeed(); + } + }, + MoreExecutors.directExecutor()); + + return datastoreExportCode; + } + + /** Batches and sends the {@link TimeSeries} to Cloud Monitoring. */ + private ApiFuture> exportTimeSeriesInBatch( + ProjectName projectName, List timeSeries) { + List> batchResults = new ArrayList<>(); + + for (List batch : Iterables.partition(timeSeries, EXPORT_BATCH_SIZE_LIMIT)) { + CreateTimeSeriesRequest req = + CreateTimeSeriesRequest.newBuilder() + .setName(projectName.toString()) + .addAllTimeSeries(batch) + .build(); + batchResults.add(this.client.createTimeSeriesCallable().futureCall(req)); + } + + return ApiFutures.allAsList(batchResults); + } + + /** + * Best-effort flush of any pending exports. + * + *

This implementation is a no-op and always returns {@link CompletableResultCode#ofSuccess()}. + * Because exports are performed asynchronously via {@link ApiFuture} callbacks, this flush cannot + * guarantee that all concurrent in-flight network requests have completed by the time this method + * returns. For a stronger guarantee, callers should invoke {@code SdkMeterProvider.forceFlush()}, + * which coordinates across the {@link io.opentelemetry.sdk.metrics.export.MetricReader} and + * ensures a full collection cycle completes before returning. + */ + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + /** Shuts down the exporter and the underlying {@link MetricServiceClient}. */ + @Override + public CompletableResultCode shutdown() { + if (client.isShutdown()) { + logger.log(Level.WARNING, "shutdown is called multiple times"); + return CompletableResultCode.ofSuccess(); + } + CompletableResultCode shutdownResult = new CompletableResultCode(); + try { + String key = projectId + ":" + databaseId; + // Atomically decrement reference count and cleanup if zero. + METRICS_CLIENT_CACHE.compute( + key, + (k, v) -> { + if (v != null && v.refCount.decrementAndGet() == 0) { + v.client.shutdown(); + return null; // Remove from map to prevent leaks + } + + return v; + }); + shutdownResult.succeed(); + } catch (Throwable e) { + logger.log(Level.WARNING, "failed to shutdown the monitoring client", e); + shutdownResult.fail(); + } + return shutdownResult; + } + + /** + * Returns the {@link AggregationTemporality} for this exporter. + * + *

For Google Cloud Monitoring, we always use {@link AggregationTemporality#CUMULATIVE} to + * maintain a continuous count or sum over time. + */ + @Override + public AggregationTemporality getAggregationTemporality(@Nonnull InstrumentType instrumentType) { + return AggregationTemporality.CUMULATIVE; + } +} diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DatastoreCloudMonitoringExporterUtils.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DatastoreCloudMonitoringExporterUtils.java new file mode 100644 index 000000000000..5c8f41b245ae --- /dev/null +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DatastoreCloudMonitoringExporterUtils.java @@ -0,0 +1,288 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore.telemetry; + +import static com.google.api.MetricDescriptor.MetricKind.CUMULATIVE; +import static com.google.api.MetricDescriptor.MetricKind.GAUGE; +import static com.google.api.MetricDescriptor.MetricKind.UNRECOGNIZED; +import static com.google.api.MetricDescriptor.ValueType.DISTRIBUTION; +import static com.google.api.MetricDescriptor.ValueType.DOUBLE; +import static com.google.api.MetricDescriptor.ValueType.INT64; +import static com.google.cloud.datastore.telemetry.TelemetryConstants.DATASTORE_RESOURCE_TYPE; +import static com.google.cloud.datastore.telemetry.TelemetryConstants.PROJECT_ID_KEY; + +import com.google.api.Distribution; +import com.google.api.Distribution.BucketOptions; +import com.google.api.Distribution.BucketOptions.Explicit; +import com.google.api.Metric; +import com.google.api.MetricDescriptor.MetricKind; +import com.google.api.MetricDescriptor.ValueType; +import com.google.api.MonitoredResource; +import com.google.monitoring.v3.Point; +import com.google.monitoring.v3.TimeInterval; +import com.google.monitoring.v3.TimeSeries; +import com.google.monitoring.v3.TypedValue; +import com.google.protobuf.util.Timestamps; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.HistogramData; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.metrics.data.SumData; +import io.opentelemetry.sdk.resources.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Utility class for converting OpenTelemetry metrics to Google Cloud Monitoring format. + * + *

This class contains the logic to map OpenTelemetry {@link MetricData} and {@link PointData} to + * Cloud Monitoring {@link TimeSeries}, including resource label mapping and attribute conversion. + * + *

The implementation in this file is inspired from the original work done in the Spanner client + * library (SpannerCloudMonitoringExporterUtils) to export metrics. The logic has been adapted for + * Datastore's use case. + */ +class DatastoreCloudMonitoringExporterUtils { + + private static final Logger logger = + Logger.getLogger(DatastoreCloudMonitoringExporterUtils.class.getName()); + + private DatastoreCloudMonitoringExporterUtils() {} + + /** + * Extracts the project ID from the OpenTelemetry {@link Resource}. + * + * @param resource the OTel resource. + * @return the project ID string, or null if not present. + */ + static String getProjectId(Resource resource) { + return resource.getAttributes().get(PROJECT_ID_KEY); + } + + /** + * Converts a list of {@link MetricData} to Cloud Monitoring {@link TimeSeries}. + * + * @param collection the collection of metrics to convert. + * @param clientAttributes common client labels (e.g. {@code client_name}, {@code client_uid}) to + * attach to every metric data point. + * @return a list of converted {@link TimeSeries}. + */ + static List convertToDatastoreTimeSeries( + List collection, Map clientAttributes) { + List allTimeSeries = new ArrayList<>(); + + // Metrics should already been filtered for Gax and Datastore related ones + for (MetricData metricData : collection) { + // Map OTel resource attributes to the specific monitored resource labels. + MonitoredResource.Builder monitoredResourceBuilder = + MonitoredResource.newBuilder().setType(DATASTORE_RESOURCE_TYPE); + + Attributes resourceAttributes = metricData.getResource().getAttributes(); + String resourceProjectId = resourceAttributes.get(TelemetryConstants.PROJECT_ID_KEY); + String resourceDatabaseId = resourceAttributes.get(TelemetryConstants.DATABASE_ID_KEY); + String resourceLocation = resourceAttributes.get(TelemetryConstants.LOCATION_ID_KEY); + + if (resourceProjectId != null) { + monitoredResourceBuilder.putLabels( + TelemetryConstants.RESOURCE_LABEL_PROJECT_ID, resourceProjectId); + } + + // TODO: The monitored resource is currently written to `global` because the Datastore + // namespace in Cloud Monitoring has not been deployed yet. Once the namespace is available, + // database_id and location labels should be added here using RESOURCE_LABEL_DATABASE_ID + // and RESOURCE_LABEL_LOCATION respectively. + + // Convert each point in the metric data to a TimeSeries. + metricData.getData().getPoints().stream() + .map( + pointData -> + convertPointToDatastoreTimeSeries( + metricData, pointData, monitoredResourceBuilder, clientAttributes)) + .forEach(allTimeSeries::add); + } + return allTimeSeries; + } + + /** + * Converts an individual {@link PointData} to a {@link TimeSeries}. + * + *

{@code clientAttributes} (e.g. {@code client_name}, {@code client_uid}) are injected here + * rather than being looked up from a singleton so that this method is testable in isolation. The + * caller ({@link DatastoreCloudMonitoringExporter}) is responsible for supplying them from {@link + * BuiltInDatastoreMetricsProvider#getClientAttributes()}. + */ + private static TimeSeries convertPointToDatastoreTimeSeries( + MetricData metricData, + PointData pointData, + MonitoredResource.Builder monitoredResourceBuilder, + Map clientAttributes) { + MetricKind metricKind = convertMetricKind(metricData); + TimeSeries.Builder builder = + TimeSeries.newBuilder() + .setMetricKind(metricKind) + .setValueType(convertValueType(metricData.getType())); + Metric.Builder metricBuilder = Metric.newBuilder().setType(metricData.getName()); + + Attributes attributes = pointData.getAttributes(); + + // Convert attribute keys by replacing "." with "_" for Cloud Monitoring compatibility. + for (AttributeKey key : attributes.asMap().keySet()) { + metricBuilder.putLabels(key.getKey().replace(".", "_"), String.valueOf(attributes.get(key))); + } + + // Attach common client attributes (service, client_uid) to each metric. + clientAttributes.forEach(metricBuilder::putLabels); + + builder.setResource(monitoredResourceBuilder.build()); + builder.setMetric(metricBuilder.build()); + + // Define the time interval for the metric point. + TimeInterval timeInterval = + TimeInterval.newBuilder() + .setStartTime( + // For GAUGE metrics, start and end time are identical. + metricKind == MetricKind.GAUGE + ? Timestamps.fromNanos(pointData.getEpochNanos()) + : Timestamps.fromNanos(pointData.getStartEpochNanos())) + .setEndTime(Timestamps.fromNanos(pointData.getEpochNanos())) + .build(); + + builder.addPoints(createPoint(metricData.getType(), pointData, timeInterval)); + + return builder.build(); + } + + /** Maps OpenTelemetry metric type to Cloud Monitoring {@link MetricKind}. */ + private static MetricKind convertMetricKind(MetricData metricData) { + switch (metricData.getType()) { + case HISTOGRAM: + case EXPONENTIAL_HISTOGRAM: + return convertHistogramType(metricData.getHistogramData()); + case LONG_GAUGE: + case DOUBLE_GAUGE: + return GAUGE; + case LONG_SUM: + return convertSumDataType(metricData.getLongSumData()); + case DOUBLE_SUM: + return convertSumDataType(metricData.getDoubleSumData()); + default: + return UNRECOGNIZED; + } + } + + /** + * Returns {@link com.google.api.MetricDescriptor.MetricKind#CUMULATIVE} for cumulative + * histograms, or {@link com.google.api.MetricDescriptor.MetricKind#UNRECOGNIZED} for delta + * histograms. Cloud Monitoring only accepts cumulative histograms; delta histograms from + * short-lived OTel SDK instances would produce incomplete data and are intentionally dropped. + */ + private static MetricKind convertHistogramType(HistogramData histogramData) { + if (histogramData.getAggregationTemporality() == AggregationTemporality.CUMULATIVE) { + return CUMULATIVE; + } + return UNRECOGNIZED; + } + + /** + * Maps an OTel {@link SumData} to a Cloud Monitoring {@link MetricKind}. + * + *

Non-monotonic sums (values that can decrease) are mapped to {@code GAUGE} because Cloud + * Monitoring {@code CUMULATIVE} metrics must be strictly monotonically increasing. Monotonic + * cumulative sums map to {@code CUMULATIVE}; delta sums are not supported and return {@code + * UNRECOGNIZED}. + */ + private static MetricKind convertSumDataType(SumData sum) { + // Non-monotonic sums are treated as GAUGE. + if (!sum.isMonotonic()) { + return GAUGE; + } + if (sum.getAggregationTemporality() == AggregationTemporality.CUMULATIVE) { + return CUMULATIVE; + } + return UNRECOGNIZED; + } + + /** Maps OpenTelemetry metric data type to Cloud Monitoring {@link ValueType}. */ + private static ValueType convertValueType(MetricDataType metricDataType) { + switch (metricDataType) { + case LONG_GAUGE: + case LONG_SUM: + return INT64; + case DOUBLE_GAUGE: + case DOUBLE_SUM: + return DOUBLE; + case HISTOGRAM: + case EXPONENTIAL_HISTOGRAM: + return DISTRIBUTION; + default: + return ValueType.UNRECOGNIZED; + } + } + + /** Creates a Cloud Monitoring {@link Point} from OpenTelemetry {@link PointData}. */ + private static Point createPoint( + MetricDataType type, PointData pointData, TimeInterval timeInterval) { + Point.Builder builder = Point.newBuilder().setInterval(timeInterval); + switch (type) { + case HISTOGRAM: + case EXPONENTIAL_HISTOGRAM: + return builder + .setValue( + TypedValue.newBuilder() + .setDistributionValue(convertHistogramData((HistogramPointData) pointData)) + .build()) + .build(); + case DOUBLE_GAUGE: + case DOUBLE_SUM: + return builder + .setValue( + TypedValue.newBuilder() + .setDoubleValue(((DoublePointData) pointData).getValue()) + .build()) + .build(); + case LONG_GAUGE: + case LONG_SUM: + return builder + .setValue(TypedValue.newBuilder().setInt64Value(((LongPointData) pointData).getValue())) + .build(); + default: + logger.log(Level.WARNING, "unsupported metric type"); + return builder.build(); + } + } + + /** Converts OpenTelemetry histogram data to Cloud Monitoring {@link Distribution}. */ + private static Distribution convertHistogramData(HistogramPointData pointData) { + return Distribution.newBuilder() + .setCount(pointData.getCount()) + .setMean(pointData.getCount() == 0L ? 0.0D : pointData.getSum() / pointData.getCount()) + .setBucketOptions( + BucketOptions.newBuilder() + .setExplicitBuckets(Explicit.newBuilder().addAllBounds(pointData.getBoundaries()))) + .addAllBucketCounts(pointData.getCounts()) + .build(); + } +} diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DatastoreMetricsRecorder.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DatastoreMetricsRecorder.java index e1e18b3104f6..ec3140cc8066 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DatastoreMetricsRecorder.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DatastoreMetricsRecorder.java @@ -22,7 +22,11 @@ import com.google.cloud.datastore.DatastoreOptions; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nonnull; /** @@ -39,6 +43,21 @@ @InternalExtensionOnly public interface DatastoreMetricsRecorder extends MetricsRecorder { + Logger logger = Logger.getLogger(DatastoreMetricsRecorder.class.getName()); + + /** + * Releases any resources held by this recorder. + * + *

For built-in recorders that own a private {@link io.opentelemetry.sdk.OpenTelemetrySdk} + * instance, this will flush and shut down the underlying {@link + * io.opentelemetry.sdk.metrics.SdkMeterProvider}. For recorders backed by a user-provided {@link + * io.opentelemetry.api.OpenTelemetry} instance, this is a no-op since the caller owns that + * instance's lifecycle. + * + *

This method should be called from {@link com.google.cloud.datastore.DatastoreImpl#close()}. + */ + default void close() {} + /** Records the total latency of a transaction in milliseconds. */ void recordTransactionLatency(double latencyMs, Map attributes); @@ -49,25 +68,72 @@ public interface DatastoreMetricsRecorder extends MetricsRecorder { * Returns a {@link DatastoreMetricsRecorder} instance based on the provided {@link * DatastoreOptions}. * - *

If the user has enabled metrics and provided an {@link OpenTelemetry} instance (or {@link - * GlobalOpenTelemetry} is used as fallback), an {@link OpenTelemetryDatastoreMetricsRecorder} is - * returned. Otherwise a {@link NoOpDatastoreMetricsRecorder} is returned. + *

This factory method creates a {@link CompositeDatastoreMetricsRecorder} that delegates to + * multiple backends: + * + *

    + *
  • Default provider: Always exports metrics to Google Cloud Monitoring via a + * privately-constructed {@link io.opentelemetry.sdk.OpenTelemetrySdk} with a {@link + * DatastoreCloudMonitoringExporter}, unless explicitly disabled via {@link + * DatastoreOpenTelemetryOptions#isExportBuiltinMetricsToGoogleCloudMonitoring()}. + *
  • Custom provider: If the user has enabled metrics and provided an {@link + * OpenTelemetry} instance (or {@link GlobalOpenTelemetry} is used as fallback), metrics are + * also recorded to that backend. + *
* * @param datastoreOptions the {@link DatastoreOptions} configuring the Datastore client - * @return a {@link DatastoreMetricsRecorder} for the configured backend + * @return a {@link DatastoreMetricsRecorder} that fans out to all configured backends */ static DatastoreMetricsRecorder getInstance(@Nonnull DatastoreOptions datastoreOptions) { DatastoreOpenTelemetryOptions otelOptions = datastoreOptions.getOpenTelemetryOptions(); + List recorders = new ArrayList<>(); + // Default provider: export built-in metrics to Cloud Monitoring + String emulatorHost = System.getenv(DatastoreOptions.LOCAL_HOST_ENV_VAR); + boolean emulatorEnabled = emulatorHost != null && !emulatorHost.isEmpty(); + + // When using a local emulator, there is no need to configure a built-in Otel instance + if (otelOptions.isExportBuiltinMetricsToGoogleCloudMonitoring() && !emulatorEnabled) { + try { + OpenTelemetry builtInOtel = + BuiltInDatastoreMetricsProvider.INSTANCE.createOpenTelemetry( + datastoreOptions.getProjectId(), + datastoreOptions.getDatabaseId(), + datastoreOptions.getCredentials()); + if (builtInOtel != null) { + recorders.add( + new OpenTelemetryDatastoreMetricsRecorder( + builtInOtel, TelemetryConstants.METRIC_PREFIX, /* ownsOpenTelemetry= */ true)); + } + } catch (Exception e) { + logger.log( + Level.WARNING, + "Failed to create built-in metrics provider for Cloud Monitoring export.", + e); + } + } + + // If the user has enabled metrics, we will attempt to export metrics to their + // configured backend. We will first check their supplied Otel object, then check + // the global Otel config. If there is nothing configured, then a no-op Otel object + // will be used. if (otelOptions.isMetricsEnabled()) { OpenTelemetry customOtel = otelOptions.getOpenTelemetry(); if (customOtel == null) { customOtel = GlobalOpenTelemetry.get(); } - return new OpenTelemetryDatastoreMetricsRecorder( - customOtel, TelemetryConstants.METRIC_PREFIX); + recorders.add( + new OpenTelemetryDatastoreMetricsRecorder(customOtel, TelemetryConstants.METRIC_PREFIX)); } - return new NoOpDatastoreMetricsRecorder(); + // Default metrics are disabled and user has not custom Otel instance + if (recorders.isEmpty()) { + return new NoOpDatastoreMetricsRecorder(); + } + // CompositeMetricsRecorder is not needed for one recorder + if (recorders.size() == 1) { + return recorders.get(0); + } + return new CompositeDatastoreMetricsRecorder(recorders); } } diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryDatastoreMetricsRecorder.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryDatastoreMetricsRecorder.java index 550dc1df9a7d..d858cc6d7ef1 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryDatastoreMetricsRecorder.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryDatastoreMetricsRecorder.java @@ -23,7 +23,10 @@ import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.OpenTelemetrySdk; import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nonnull; /** @@ -43,20 +46,40 @@ class OpenTelemetryDatastoreMetricsRecorder extends OpenTelemetryMetricsRecorder implements DatastoreMetricsRecorder { + private static final Logger logger = + Logger.getLogger(OpenTelemetryDatastoreMetricsRecorder.class.getName()); + private final OpenTelemetry openTelemetry; + // True when this recorder created the OpenTelemetry instance (built-in path) and therefore + // owns its lifecycle. False when the instance was provided by the user. + private final boolean ownsOpenTelemetry; // Datastore-specific transaction metrics (registered under the Datastore meter). private final DoubleHistogram transactionLatency; private final LongCounter transactionAttemptCount; - // Note: Standard GAX RPC metrics (operation_latency, attempt_latency, etc.) are handled by the - // base OpenTelemetryMetricsRecorder class. Those metrics are inherited from the parent classes. - // However, the internal metrics expect plural suffixes (e.g. `latencies` instead of `latency`). - // The discrepancy between the singular GAX names and the plural internal Cloud Monitoring names - // is handled by configuring OpenTelemetry Views. + /** Creates a recorder backed by a user-provided {@link OpenTelemetry} instance. */ OpenTelemetryDatastoreMetricsRecorder(@Nonnull OpenTelemetry openTelemetry, String metricPrefix) { + this(openTelemetry, metricPrefix, /* ownsOpenTelemetry= */ false); + } + + /** + * Creates a recorder, specifying whether this instance owns the {@link OpenTelemetry} lifecycle. + * + * @param ownsOpenTelemetry {@code true} if this recorder created the instance and should shut it + * down on {@link #close()}; {@code false} if the user provided it. + */ + OpenTelemetryDatastoreMetricsRecorder( + @Nonnull OpenTelemetry openTelemetry, String metricPrefix, boolean ownsOpenTelemetry) { super(openTelemetry, metricPrefix); this.openTelemetry = openTelemetry; + this.ownsOpenTelemetry = ownsOpenTelemetry; + + // Note: Standard GAX RPC metrics (operation_latency, attempt_latency, etc.) are handled by the + // base OpenTelemetryMetricsRecorder class. Those metrics are inherited from the parent classes. + // However, the internal metrics expect plural suffixes (e.g. `latencies` instead of `latency`). + // The discrepancy between the singular GAX names and the plural internal Cloud Monitoring names + // is handled by configuring OpenTelemetry Views. Meter meter = openTelemetry.getMeter(TelemetryConstants.DATASTORE_METER_NAME); @@ -78,6 +101,22 @@ OpenTelemetry getOpenTelemetry() { return openTelemetry; } + /** + * Closes this recorder. If this recorder owns the underlying {@link OpenTelemetry} instance + * (i.e., it was created by the built-in metrics provider), it will be shut down, flushing any + * pending metrics. If the instance was provided by the user, this is a no-op. + */ + @Override + public void close() { + if (ownsOpenTelemetry && openTelemetry instanceof OpenTelemetrySdk) { + try { + ((OpenTelemetrySdk) openTelemetry).close(); + } catch (Exception e) { + logger.log(Level.WARNING, "Failed to close built-in OpenTelemetry SDK instance.", e); + } + } + } + @Override public void recordTransactionLatency(double latencyMs, Map attributes) { transactionLatency.record(latencyMs, toOtelAttributes(attributes)); diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java index 802b35ff456f..f323391b64be 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java @@ -17,8 +17,10 @@ package com.google.cloud.datastore.telemetry; import com.google.api.core.InternalApi; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.opentelemetry.api.common.AttributeKey; +import java.util.Map; import java.util.Set; /** @@ -36,6 +38,31 @@ public class TelemetryConstants { public static final String METRIC_PREFIX = "custom.googleapis.com/internal/client"; public static final String DATASTORE_METER_NAME = "java-datastore"; + // Short names used to build GAX_METRICS and metric full-path constants below. + public static final String METRIC_NAME_SHORT_OPERATION_LATENCY = "operation_latency"; + public static final String METRIC_NAME_SHORT_ATTEMPT_LATENCY = "attempt_latency"; + public static final String METRIC_NAME_SHORT_OPERATION_COUNT = "operation_count"; + public static final String METRIC_NAME_SHORT_ATTEMPT_COUNT = "attempt_count"; + + public static final String METRIC_NAME_SHORT_OPERATION_LATENCIES = "operation_latencies"; + public static final String METRIC_NAME_SHORT_ATTEMPT_LATENCIES = "attempt_latencies"; + + /** + * Mapping from the singular metric names recorded by the GAX library to the pluralized metric + * names required by the internal Cloud Monitoring metric descriptors. + */ + public static final Map GAX_METRIC_NAME_MAP = + ImmutableMap.of( + METRIC_NAME_SHORT_OPERATION_LATENCY, METRIC_NAME_SHORT_OPERATION_LATENCIES, + METRIC_NAME_SHORT_ATTEMPT_LATENCY, METRIC_NAME_SHORT_ATTEMPT_LATENCIES, + METRIC_NAME_SHORT_OPERATION_COUNT, METRIC_NAME_SHORT_OPERATION_COUNT, + METRIC_NAME_SHORT_ATTEMPT_COUNT, METRIC_NAME_SHORT_ATTEMPT_COUNT); + + // Short metric names (without prefix) for the four metrics recorded by the GAX layer. + // Used by DatastoreBuiltInMetricsView to register OTel views that capture and rename + // GAX-emitted metrics for the built-in Cloud Monitoring export pipeline. + public static final Set GAX_METRICS = GAX_METRIC_NAME_MAP.keySet(); + // Monitored resource type for Cloud Monitoring public static final String DATASTORE_RESOURCE_TYPE = "global"; @@ -78,6 +105,7 @@ public class TelemetryConstants { // Metric attribute keys (used on metric data points) public static final AttributeKey CLIENT_UID_KEY = AttributeKey.stringKey("client_uid"); + public static final AttributeKey CLIENT_NAME_KEY = AttributeKey.stringKey("client_name"); public static final AttributeKey METHOD_KEY = AttributeKey.stringKey("method"); public static final AttributeKey STATUS_KEY = AttributeKey.stringKey("status"); public static final AttributeKey SERVICE_KEY = AttributeKey.stringKey("service"); @@ -109,26 +137,28 @@ public class TelemetryConstants { /** * Metric name for the total latency of an operation (one full RPC call including retries). * - *

The plural form ({@code operation_latencies}) is intentional: it matches the internal Cloud - * Monitoring metric descriptor name. {@link OpenTelemetryDatastoreMetricsRecorder} overrides the - * inherited GAX method to record to this name rather than the singular GAX default. + *

Renamed to the plural form ({@code operation_latencies}) by {@link + * DatastoreBuiltInMetricsView} during export. */ - public static final String METRIC_NAME_OPERATION_LATENCY = METRIC_PREFIX + "/operation_latencies"; + public static final String METRIC_NAME_OPERATION_LATENCY = + METRIC_PREFIX + "/" + METRIC_NAME_SHORT_OPERATION_LATENCY; /** * Metric name for the latency of a single RPC attempt. * - *

The plural form ({@code attempt_latencies}) is intentional: it matches the internal Cloud - * Monitoring metric descriptor name. {@link OpenTelemetryDatastoreMetricsRecorder} overrides the - * inherited GAX method to record to this name rather than the singular GAX default. + *

Renamed to the plural form ({@code attempt_latencies}) by {@link + * DatastoreBuiltInMetricsView} during export. */ - public static final String METRIC_NAME_ATTEMPT_LATENCY = METRIC_PREFIX + "/attempt_latencies"; + public static final String METRIC_NAME_ATTEMPT_LATENCY = + METRIC_PREFIX + "/" + METRIC_NAME_SHORT_ATTEMPT_LATENCY; /** Metric name for the count of operations. */ - public static final String METRIC_NAME_OPERATION_COUNT = METRIC_PREFIX + "/operation_count"; + public static final String METRIC_NAME_OPERATION_COUNT = + METRIC_PREFIX + "/" + METRIC_NAME_SHORT_OPERATION_COUNT; /** Metric name for the count of RPC attempts. */ - public static final String METRIC_NAME_ATTEMPT_COUNT = METRIC_PREFIX + "/attempt_count"; + public static final String METRIC_NAME_ATTEMPT_COUNT = + METRIC_PREFIX + "/" + METRIC_NAME_SHORT_ATTEMPT_COUNT; static final String METHOD_SERVICE_NAME = "Datastore"; diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreBuiltInAndCustomMetricsIT.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreBuiltInAndCustomMetricsIT.java new file mode 100644 index 000000000000..d32f279eb251 --- /dev/null +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreBuiltInAndCustomMetricsIT.java @@ -0,0 +1,393 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; +import static org.junit.Assume.assumeNotNull; + +import com.google.cloud.datastore.telemetry.TelemetryConstants; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Collection; +import java.util.Optional; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Integration test that verifies both the default built-in Cloud Monitoring export path and a + * user-configured OpenTelemetry backend (backed by {@link InMemoryMetricReader}) are active + * simultaneously via the {@link + * com.google.cloud.datastore.telemetry.CompositeDatastoreMetricsRecorder}. + * + *

What this test covers

+ * + *
    + *
  1. Custom OTel backend (in-memory) — {@link InMemoryMetricReader} captures every metric + * emitted by the Datastore SDK. After each operation the test collects all metrics + * synchronously and asserts that expected names, types, and attributes are present. Because + * the in-memory reader is under full test control, these assertions are deterministic. + *
  2. Built-in Cloud Monitoring backend — a private {@link OpenTelemetrySdk} configured + * with {@link com.google.cloud.datastore.telemetry.DatastoreCloudMonitoringExporter} is + * created alongside the custom backend. The test verifies the composite recorder is in place + * (confirming both backends are wired up) and that no exception is thrown during the export + * path. The periodic export interval means data will not appear in Cloud Monitoring + * immediately; manual verification steps are provided at the bottom of this class Javadoc. + *
+ * + *

Setup

+ * + *

Requires a real GCP project with Application Default Credentials that have the {@code + * monitoring.timeSeries.create} IAM permission. Set the following environment variables: + * + *

    + *
  • {@code GOOGLE_CLOUD_PROJECT} — GCP project ID (required) + *
  • {@code DATASTORE_DATABASE_ID} — Datastore database ID (defaults to {@code ""} for the + * default database) + *
+ * + *

Cloud Monitoring verification

+ * + *

After running this test, navigate to Cloud Console → Monitoring → Metrics Explorer and + * query: + * + *

+ *   Metric  : custom.googleapis.com/internal/client/transaction_latency
+ *   Resource: global
+ *   Filter  : project_id = <YOUR_PROJECT_ID>
+ * 
+ * + *

Data may take up to 60 seconds (one periodic flush interval) to appear. + */ +@RunWith(JUnit4.class) +@SuppressWarnings("checkstyle:abbreviationaswordinname") +public class DatastoreBuiltInAndCustomMetricsIT { + + private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String DATABASE_ID = + System.getenv().getOrDefault("DATASTORE_DATABASE_ID", ""); + + /** + * Delta temporality is used so that each {@link InMemoryMetricReader#collectAllMetrics()} call + * returns only the new data points recorded since the last collection, and automatically resets + * the in-memory state. This makes per-test assertions independent and avoids cross-test + * contamination. + */ + private InMemoryMetricReader metricReader; + + private SdkMeterProvider customMeterProvider; + private Datastore datastore; + private String kind; + + @Before + public void setUp() { + // Skip the test gracefully if the required environment variable is not configured. + assumeNotNull("GOOGLE_CLOUD_PROJECT must be set to run this IT test", PROJECT_ID); + + kind = "Kind-" + java.util.UUID.randomUUID().toString().substring(0, 8); + + // Build a user-configured OTel backend that records to an in-memory reader for assertions. + metricReader = InMemoryMetricReader.createDelta(); + customMeterProvider = SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + OpenTelemetrySdk customOtel = + OpenTelemetrySdk.builder().setMeterProvider(customMeterProvider).build(); + + // Configure the Datastore client with BOTH backends active: + // 1. Built-in Cloud Monitoring export (enabled by default via + // DatastoreOpenTelemetryOptions.exportBuiltinMetricsToGoogleCloudMonitoring = true) + // 2. User-configured OTel backend wired to the in-memory reader above. + // + // The resulting DatastoreMetricsRecorder will be a CompositeDatastoreMetricsRecorder that + // fans out all recording calls to both backends simultaneously. + DatastoreOptions options = + DatastoreOptions.newBuilder() + .setProjectId(PROJECT_ID) + .setDatabaseId(DATABASE_ID) + .setOpenTelemetryOptions( + DatastoreOpenTelemetryOptions.newBuilder() + .setMetricsEnabled(true) + .setOpenTelemetry(customOtel) + .build()) + .build(); + datastore = options.getService(); + + // Drain any metrics emitted during client initialisation so test assertions only capture + // data from the operations performed within the test method itself. + metricReader.collectAllMetrics(); + } + + @After + public void tearDown() throws Exception { + if (datastore != null) { + Key key = datastore.newKeyFactory().setKind(kind).newKey("metrics-it-entity"); + datastore.delete(key); + // Closing the client flushes the built-in SDK and shuts down the PeriodicMetricReader, + // ensuring any buffered metrics are exported before the test process exits. + datastore.close(); + } + if (customMeterProvider != null) { + customMeterProvider.close(); + } + } + + /** + * Verifies that the Datastore client is configured with a composite recorder so that both the + * built-in Cloud Monitoring backend and the user-provided custom OTel backend are active. + * + *

The composite recorder is the internal mechanism that fans out every {@code record*()} call + * to all registered backends. Its presence guarantees that: + * + *

    + *
  • the built-in SDK was successfully created and its exporter was wired up, AND + *
  • the user-provided custom OTel instance is also receiving metric data. + *
+ */ + @Test + public void bothBackendsActive_recorderIsComposite() { + // DatastoreOptions.getMetricsRecorder() is package-private; accessible because this test + // lives in the same package (com.google.cloud.datastore). + String recorderClassName = + datastore.getOptions().getMetricsRecorder().getClass().getSimpleName(); + assertThat(recorderClassName).isEqualTo("CompositeDatastoreMetricsRecorder"); + } + + /** + * Verifies that the built-in metrics export flag is on by default so that the Cloud Monitoring + * exporter path is active even when no additional configuration is provided. + */ + @Test + public void builtInMetricsExport_isEnabledByDefault() { + assertThat( + datastore + .getOptions() + .getOpenTelemetryOptions() + .isExportBuiltinMetricsToGoogleCloudMonitoring()) + .isTrue(); + } + + /** + * Verifies that a transaction operation records {@code transaction_latency} and {@code + * transaction_attempt_count} metrics in the custom (in-memory) OTel backend. + * + *

These are Datastore-specific metrics emitted by the SDK layer (not the GAX layer), so they + * validate that the Datastore-level recording path is working end-to-end. + */ + @Test + public void transactionOperation_recordsTransactionMetricsInCustomBackend() { + Key key = datastore.newKeyFactory().setKind(kind).newKey("metrics-it-entity"); + Entity initial = Entity.newBuilder(key).set("value", 0L).build(); + datastore.put(initial); + + datastore.runInTransaction( + tx -> { + Entity current = tx.get(key); + tx.put(Entity.newBuilder(current).set("value", current.getLong("value") + 1).build()); + return null; + }); + + Collection metrics = metricReader.collectAllMetrics(); + + // --- transaction_latency --- + Optional transactionLatencyMetric = + findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_LATENCY); + assertWithMessage("transaction_latency metric was recorded") + .that(transactionLatencyMetric.isPresent()) + .isTrue(); + + HistogramPointData latencyPoint = + transactionLatencyMetric.get().getHistogramData().getPoints().stream() + .findFirst() + .orElse(null); + assertThat(latencyPoint).isNotNull(); + assertThat(latencyPoint.getCount()).isEqualTo(1); + assertWithMessage("status=OK on transaction_latency") + .that( + dataContainsStringAttribute( + latencyPoint, TelemetryConstants.ATTRIBUTES_KEY_STATUS, "OK")) + .isTrue(); + assertWithMessage("method=Transaction.Run on transaction_latency") + .that( + dataContainsStringAttribute( + latencyPoint, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_TRANSACTION_RUN)) + .isTrue(); + assertWithMessage("database_id attribute on transaction_latency") + .that( + dataContainsStringAttribute( + latencyPoint, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID)) + .isTrue(); + + // --- transaction_attempt_count --- + Optional attemptCountMetric = + findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT); + assertWithMessage("transaction_attempt_count metric was recorded") + .that(attemptCountMetric.isPresent()) + .isTrue(); + + LongPointData attemptPoint = + attemptCountMetric.get().getLongSumData().getPoints().stream().findFirst().orElse(null); + assertThat(attemptPoint).isNotNull(); + assertThat(attemptPoint.getValue()).isEqualTo(1); + assertWithMessage("status=OK on transaction_attempt_count") + .that( + dataContainsStringAttribute( + attemptPoint, TelemetryConstants.ATTRIBUTES_KEY_STATUS, "OK")) + .isTrue(); + assertWithMessage("method=Transaction.Commit on transaction_attempt_count") + .that( + dataContainsStringAttribute( + attemptPoint, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_TRANSACTION_COMMIT)) + .isTrue(); + } + + /** + * Verifies that a lookup RPC records {@code operation_latency}, {@code attempt_latency}, {@code + * operation_count}, and {@code attempt_count} in the custom (in-memory) OTel backend. + * + *

These GAX-layer metrics are recorded by {@link com.google.cloud.datastore.telemetry + * .TelemetryUtils} and exercise the code path that was previously gated behind a {@code !GRPC} + * transport guard (now removed). This assertion confirms that all four RPC-level metrics are + * recorded regardless of transport. + */ + @Test + public void lookupRpc_recordsGaxMetricsInCustomBackend() { + // Issue a lookup for a key that does not exist; it still produces all four RPC-level metrics. + Key key = datastore.newKeyFactory().setKind(kind).newKey("does-not-exist"); + datastore.get(key); + + Collection metrics = metricReader.collectAllMetrics(); + + // --- operation_latency --- + Optional operationLatencyMetric = + findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_LATENCY); + assertWithMessage("operation_latency metric was recorded") + .that(operationLatencyMetric.isPresent()) + .isTrue(); + HistogramPointData opPoint = + operationLatencyMetric.get().getHistogramData().getPoints().stream() + .filter( + p -> + dataContainsStringAttribute( + p, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_LOOKUP)) + .findFirst() + .orElse(null); + assertWithMessage("operation_latency point for Lookup method").that(opPoint).isNotNull(); + assertThat(opPoint.getCount()).isAtLeast(1); + assertWithMessage("status=OK on operation_latency") + .that(dataContainsStringAttribute(opPoint, TelemetryConstants.ATTRIBUTES_KEY_STATUS, "OK")) + .isTrue(); + + // --- attempt_latency --- + Optional attemptLatencyMetric = + findMetric(metrics, TelemetryConstants.METRIC_NAME_ATTEMPT_LATENCY); + assertWithMessage("attempt_latency metric was recorded") + .that(attemptLatencyMetric.isPresent()) + .isTrue(); + + // --- operation_count --- + Optional operationCountMetric = + findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_COUNT); + assertWithMessage("operation_count metric was recorded") + .that(operationCountMetric.isPresent()) + .isTrue(); + + // --- attempt_count --- + Optional attemptCountMetric = + findMetric(metrics, TelemetryConstants.METRIC_NAME_ATTEMPT_COUNT); + assertWithMessage("attempt_count metric was recorded") + .that(attemptCountMetric.isPresent()) + .isTrue(); + } + + /** + * Verifies that all six expected metrics appear in the custom (in-memory) OTel backend after a + * combined transaction-plus-lookup workload. This is the primary "composite" scenario: both the + * SDK-layer metrics (transaction) and the GAX-layer metrics (operation/attempt) are captured in a + * single test run, confirming the full fan-out routing is correct. + */ + @Test + public void combinedWorkload_recordsAllSixMetricsInCustomBackend() { + Key key = datastore.newKeyFactory().setKind(kind).newKey("metrics-it-entity"); + Entity initial = Entity.newBuilder(key).set("value", 0L).build(); + + // Step 1: plain put (records operation_latency, attempt_latency, etc.) + datastore.put(initial); + + // Step 2: transaction (records transaction_latency, transaction_attempt_count on top) + datastore.runInTransaction( + tx -> { + Entity current = tx.get(key); + tx.put(Entity.newBuilder(current).set("value", current.getLong("value") + 1).build()); + return null; + }); + + // Step 3: standalone lookup + datastore.get(key); + + Collection metrics = metricReader.collectAllMetrics(); + + assertWithMessage("operation_latency present") + .that(findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_LATENCY).isPresent()) + .isTrue(); + assertWithMessage("attempt_latency present") + .that(findMetric(metrics, TelemetryConstants.METRIC_NAME_ATTEMPT_LATENCY).isPresent()) + .isTrue(); + assertWithMessage("operation_count present") + .that(findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_COUNT).isPresent()) + .isTrue(); + assertWithMessage("attempt_count present") + .that(findMetric(metrics, TelemetryConstants.METRIC_NAME_ATTEMPT_COUNT).isPresent()) + .isTrue(); + assertWithMessage("transaction_latency present") + .that(findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_LATENCY).isPresent()) + .isTrue(); + assertWithMessage("transaction_attempt_count present") + .that( + findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT) + .isPresent()) + .isTrue(); + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + private static Optional findMetric( + Collection metrics, String metricName) { + return metrics.stream().filter(m -> m.getName().equals(metricName)).findFirst(); + } + + private static boolean dataContainsStringAttribute( + PointData point, String attributeKey, String expectedValue) { + String actual = point.getAttributes().get(AttributeKey.stringKey(attributeKey)); + return expectedValue.equals(actual); + } +} diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/BuiltInDatastoreMetricsProviderTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/BuiltInDatastoreMetricsProviderTest.java new file mode 100644 index 000000000000..66c6dfabe277 --- /dev/null +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/BuiltInDatastoreMetricsProviderTest.java @@ -0,0 +1,65 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore.telemetry; + +import static com.google.common.truth.Truth.assertThat; + +import io.opentelemetry.api.OpenTelemetry; +import java.util.Map; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class BuiltInDatastoreMetricsProviderTest { + + private static final String PROJECT_ID = "project-id"; + + @Test + public void testGetClientAttributes() { + Map attributes = BuiltInDatastoreMetricsProvider.INSTANCE.getClientAttributes(); + assertThat(attributes).containsKey(TelemetryConstants.CLIENT_UID_KEY.getKey()); + assertThat(attributes.get(TelemetryConstants.CLIENT_UID_KEY.getKey())).contains("@"); + assertThat(attributes).containsKey(TelemetryConstants.SERVICE_KEY.getKey()); + assertThat(attributes.get(TelemetryConstants.SERVICE_KEY.getKey())) + .isEqualTo(TelemetryConstants.SERVICE_VALUE); + } + + @Test + public void testCreateOpenTelemetry_returnsNonNull() { + OpenTelemetry otel = + BuiltInDatastoreMetricsProvider.INSTANCE.createOpenTelemetry(PROJECT_ID, "test-db", null); + assertThat(otel).isNotNull(); + } + + @Test + public void testCreateOpenTelemetry_eachCallReturnsDistinctInstance() { + OpenTelemetry otel1 = + BuiltInDatastoreMetricsProvider.INSTANCE.createOpenTelemetry(PROJECT_ID, "test-db", null); + OpenTelemetry otel2 = + BuiltInDatastoreMetricsProvider.INSTANCE.createOpenTelemetry(PROJECT_ID, "test-db", null); + assertThat(otel1).isNotNull(); + assertThat(otel2).isNotNull(); + assertThat(otel1).isNotSameInstanceAs(otel2); + } + + @Test + public void testDetectClientLocation() { + String location = BuiltInDatastoreMetricsProvider.detectClientLocation(); + assertThat(location).isEqualTo("global"); + } +} diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/CompositeDatastoreMetricsRecorderTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/CompositeDatastoreMetricsRecorderTest.java new file mode 100644 index 000000000000..d26f0f05b2f6 --- /dev/null +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/CompositeDatastoreMetricsRecorderTest.java @@ -0,0 +1,88 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore.telemetry; + +import static com.google.common.truth.Truth.assertThat; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class CompositeDatastoreMetricsRecorderTest { + + @Test + public void testCloseLIFOAndExceptionSafe() { + List closeOrder = new ArrayList<>(); + DatastoreMetricsRecorder recorder1 = new MockRecorder(1, closeOrder, false); + DatastoreMetricsRecorder recorder2 = new MockRecorder(2, closeOrder, true); // will throw + DatastoreMetricsRecorder recorder3 = new MockRecorder(3, closeOrder, false); + + CompositeDatastoreMetricsRecorder composite = + new CompositeDatastoreMetricsRecorder(Arrays.asList(recorder1, recorder2, recorder3)); + + composite.close(); + + // LIFO order means 3, then 2, then 1. + // Even though 2 throws, 1 should still be closed. + assertThat(closeOrder).containsExactly(3, 2, 1).inOrder(); + } + + private static class MockRecorder implements DatastoreMetricsRecorder { + private final int id; + private final List closeOrder; + private final boolean throwOnClose; + + MockRecorder(int id, List closeOrder, boolean throwOnClose) { + this.id = id; + this.closeOrder = closeOrder; + this.throwOnClose = throwOnClose; + } + + @Override + public void close() { + closeOrder.add(id); + if (throwOnClose) { + throw new RuntimeException("Mock exception on close"); + } + } + + @Override + public void recordTransactionLatency( + double latencyMs, java.util.Map attributes) {} + + @Override + public void recordTransactionAttemptCount( + long count, java.util.Map attributes) {} + + @Override + public void recordAttemptLatency(double latencyMs, java.util.Map attributes) {} + + @Override + public void recordAttemptCount(long count, java.util.Map attributes) {} + + @Override + public void recordOperationLatency( + double latencyMs, java.util.Map attributes) {} + + @Override + public void recordOperationCount(long count, java.util.Map attributes) {} + } +} diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DatastoreCloudMonitoringExporterTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DatastoreCloudMonitoringExporterTest.java new file mode 100644 index 000000000000..712916eaf3db --- /dev/null +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DatastoreCloudMonitoringExporterTest.java @@ -0,0 +1,198 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore.telemetry; + +import static com.google.cloud.datastore.telemetry.TelemetryConstants.CLIENT_UID_KEY; +import static com.google.cloud.datastore.telemetry.TelemetryConstants.DATABASE_ID_KEY; +import static com.google.cloud.datastore.telemetry.TelemetryConstants.LOCATION_ID_KEY; +import static com.google.cloud.datastore.telemetry.TelemetryConstants.METRIC_NAME_SHORT_OPERATION_COUNT; +import static com.google.cloud.datastore.telemetry.TelemetryConstants.METRIC_PREFIX; +import static com.google.cloud.datastore.telemetry.TelemetryConstants.PROJECT_ID_KEY; +import static com.google.cloud.datastore.telemetry.TelemetryConstants.RESOURCE_LABEL_PROJECT_ID; +import static com.google.cloud.datastore.telemetry.TelemetryConstants.SERVICE_KEY; +import static com.google.cloud.datastore.telemetry.TelemetryConstants.SERVICE_VALUE; +import static com.google.common.truth.Truth.assertThat; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.tracing.OpenTelemetryMetricsRecorder; +import com.google.cloud.monitoring.v3.MetricServiceClient; +import com.google.cloud.monitoring.v3.stub.MetricServiceStub; +import com.google.common.collect.ImmutableList; +import com.google.monitoring.v3.CreateTimeSeriesRequest; +import com.google.monitoring.v3.TimeSeries; +import com.google.protobuf.Empty; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.resources.Resource; +import java.util.Collections; +import java.util.Map; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +public class DatastoreCloudMonitoringExporterTest { + + private static final String PROJECT_ID = "test-project"; + private static final String LOCATION_ID = "global"; + private static final String DATABASE_ID = "test-db"; + + private MetricServiceStub mockMetricServiceStub; + private MetricServiceClient fakeMetricServiceClient; + private DatastoreCloudMonitoringExporter exporter; + + private Attributes attributes; + private Attributes resourceAttributes; + private Resource resource; + private InstrumentationScopeInfo scope; + private String clientUid; + + @Before + public void setUp() { + mockMetricServiceStub = createMock(MetricServiceStub.class); + fakeMetricServiceClient = new FakeMetricServiceClient(mockMetricServiceStub); + + Map clientAttributes = + BuiltInDatastoreMetricsProvider.INSTANCE.getClientAttributes(); + this.clientUid = clientAttributes.get(CLIENT_UID_KEY.getKey()); + + exporter = + new DatastoreCloudMonitoringExporter( + PROJECT_ID, DATABASE_ID, fakeMetricServiceClient, clientAttributes); + + attributes = + Attributes.builder() + .put(DATABASE_ID_KEY, DATABASE_ID) + .put(CLIENT_UID_KEY, this.clientUid) + .build(); + + resourceAttributes = + Attributes.builder() + .put(PROJECT_ID_KEY, PROJECT_ID) + .put(DATABASE_ID_KEY, DATABASE_ID) + .put(LOCATION_ID_KEY, LOCATION_ID) + .build(); + resource = Resource.create(resourceAttributes); + + scope = InstrumentationScopeInfo.create(OpenTelemetryMetricsRecorder.GAX_METER_NAME); + } + + @Test + public void testExportingSumData() { + Capture capture = EasyMock.newCapture(); + + UnaryCallable mockCallable = createMock(UnaryCallable.class); + expect(mockMetricServiceStub.isShutdown()).andReturn(false).anyTimes(); + expect(mockMetricServiceStub.createTimeSeriesCallable()).andReturn(mockCallable); + ApiFuture future = ApiFutures.immediateFuture(Empty.getDefaultInstance()); + expect(mockCallable.futureCall(EasyMock.capture(capture))).andReturn(future); + + replay(mockMetricServiceStub, mockCallable); + + long fakeValue = 11L; + long startEpoch = 10; + long endEpoch = 15; + LongPointData longPointData = + ImmutableLongPointData.create(startEpoch, endEpoch, attributes, fakeValue); + + MetricData longData = + ImmutableMetricData.createLongSum( + resource, + scope, + METRIC_PREFIX + "/" + METRIC_NAME_SHORT_OPERATION_COUNT, + "description", + "1", + ImmutableSumData.create( + true, AggregationTemporality.CUMULATIVE, ImmutableList.of(longPointData))); + + exporter.export(Collections.singletonList(longData)); + + CreateTimeSeriesRequest request = capture.getValue(); + assertThat(request.getTimeSeriesList()).hasSize(1); + + TimeSeries timeSeries = request.getTimeSeriesList().get(0); + + assertThat(timeSeries.getResource().getLabelsMap()) + .containsExactly(RESOURCE_LABEL_PROJECT_ID, PROJECT_ID); + + assertThat(timeSeries.getMetric().getLabelsMap()) + .containsExactly( + DATABASE_ID_KEY.getKey(), + DATABASE_ID, + CLIENT_UID_KEY.getKey(), + this.clientUid, + SERVICE_KEY.getKey(), + SERVICE_VALUE); + + assertThat(timeSeries.getPoints(0).getValue().getInt64Value()).isEqualTo(fakeValue); + + verify(mockMetricServiceStub, mockCallable); + } + + @Test + public void testClientCacheReferenceCounting() { + MetricServiceClient mockClient = createMock(MetricServiceClient.class); + expect(mockClient.isShutdown()).andReturn(false).anyTimes(); + mockClient.shutdown(); + EasyMock.expectLastCall(); // Expect shutdown when refCount reaches 0 + replay(mockClient); + + String key = PROJECT_ID + ":" + DATABASE_ID; + DatastoreCloudMonitoringExporter.CachedMetricsClient cachedMetricsClient = + new DatastoreCloudMonitoringExporter.CachedMetricsClient(mockClient); + cachedMetricsClient.refCount.set(2); // Simulate 2 references + DatastoreCloudMonitoringExporter.METRICS_CLIENT_CACHE.put(key, cachedMetricsClient); + + DatastoreCloudMonitoringExporter exporter1 = + new DatastoreCloudMonitoringExporter( + PROJECT_ID, DATABASE_ID, mockClient, Collections.emptyMap()); + + // First shutdown should decrement refCount to 1, but not close client + exporter1.shutdown(); + assertThat(cachedMetricsClient.refCount.get()).isEqualTo(1); + assertThat(DatastoreCloudMonitoringExporter.METRICS_CLIENT_CACHE.containsKey(key)).isTrue(); + + DatastoreCloudMonitoringExporter exporter2 = + new DatastoreCloudMonitoringExporter( + PROJECT_ID, DATABASE_ID, mockClient, Collections.emptyMap()); + + // Second shutdown should decrement refCount to 0, close client, and remove from cache + exporter2.shutdown(); + assertThat(cachedMetricsClient.refCount.get()).isEqualTo(0); + assertThat(DatastoreCloudMonitoringExporter.METRICS_CLIENT_CACHE.containsKey(key)).isFalse(); + + verify(mockClient); + } + + private static class FakeMetricServiceClient extends MetricServiceClient { + protected FakeMetricServiceClient(MetricServiceStub stub) { + super(stub); + } + } +} diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DatastoreMetricsRecorderTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DatastoreMetricsRecorderTest.java index 1c1f76ddc156..8f4ba3982a89 100644 --- a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DatastoreMetricsRecorderTest.java +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DatastoreMetricsRecorderTest.java @@ -19,7 +19,6 @@ import com.google.cloud.NoCredentials; import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; -import com.google.cloud.datastore.DatastoreOpenTelemetryOptionsTestHelper; import com.google.cloud.datastore.DatastoreOptions; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.sdk.OpenTelemetrySdk; @@ -42,42 +41,49 @@ private DatastoreOptions.Builder baseOptions() { } @Test - public void defaultOptions_returnsNoOp() { - // metricsEnabled defaults to false, so getInstance() should return NoOp - DatastoreOptions options = baseOptions().build(); + public void defaultOptionsWithBuiltInMetricsDisabled_returnsNoOp() { + // When both custom metrics and built-in metrics export are disabled, should return NoOp + DatastoreOptions options = + baseOptions() + .setOpenTelemetryOptions( + DatastoreOpenTelemetryOptions.newBuilder() + .setExportBuiltinMetricsToGoogleCloudMonitoring(false) + .build()) + .build(); DatastoreMetricsRecorder recorder = DatastoreMetricsRecorder.getInstance(options); assertThat(recorder).isInstanceOf(NoOpDatastoreMetricsRecorder.class); } @Test - public void tracingEnabledButMetricsDisabled_returnsNoOp() { + public void tracingEnabledButMetricsDisabledAndBuiltInDisabled_returnsNoOp() { // Enabling tracing alone should not enable metrics DatastoreOptions options = baseOptions() .setOpenTelemetryOptions( - DatastoreOpenTelemetryOptions.newBuilder().setTracingEnabled(true).build()) + DatastoreOpenTelemetryOptions.newBuilder() + .setTracingEnabled(true) + .setExportBuiltinMetricsToGoogleCloudMonitoring(false) + .build()) .build(); DatastoreMetricsRecorder recorder = DatastoreMetricsRecorder.getInstance(options); assertThat(recorder).isInstanceOf(NoOpDatastoreMetricsRecorder.class); } @Test - public void metricsEnabled_withCustomOtel_returnsOpenTelemetryRecorder() { - InMemoryMetricReader metricReader = InMemoryMetricReader.create(); - SdkMeterProvider meterProvider = - SdkMeterProvider.builder().registerMetricReader(metricReader).build(); - OpenTelemetry openTelemetry = - OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build(); - - // Use DatastoreOpenTelemetryOptionsTestHelper since setMetricsEnabled() is package-private - // and this test lives in the telemetry sub-package. + public void defaultOptionsWithBuiltInMetricsEnabled_butNoCredentials_returnsNoOpRecorder() { + // Explicitly enable built-in metrics export DatastoreOptions options = - baseOptions() + baseOptions() // Uses NoCredentials by default .setOpenTelemetryOptions( - DatastoreOpenTelemetryOptionsTestHelper.withMetricsEnabled(openTelemetry)) + DatastoreOpenTelemetryOptions.newBuilder() + .setExportBuiltinMetricsToGoogleCloudMonitoring(true) + .build()) .build(); DatastoreMetricsRecorder recorder = DatastoreMetricsRecorder.getInstance(options); - assertThat(recorder).isInstanceOf(OpenTelemetryDatastoreMetricsRecorder.class); + + // Since baseOptions() uses NoCredentials, it should return NoOpDatastoreMetricsRecorder + // as we don't want to send metrics for local emulator logic. + assertThat(recorder).isInstanceOf(NoOpDatastoreMetricsRecorder.class); } @Test