diff --git a/exporters/common/build.gradle.kts b/exporters/common/build.gradle.kts index 8d72eba3715..8efac3a8435 100644 --- a/exporters/common/build.gradle.kts +++ b/exporters/common/build.gradle.kts @@ -84,10 +84,11 @@ val testJavaVersion: String? by project testing { suites { - register("testHttpSenderProvider") { + register("testSenderProvider") { dependencies { implementation(project(":exporters:sender:jdk")) implementation(project(":exporters:sender:okhttp")) + implementation(project(":exporters:sender:grpc-managed-channel")) } targets { all { @@ -98,18 +99,6 @@ testing { } } } - suites { - register("testGrpcSenderProvider") { - dependencies { - implementation(project(":exporters:sender:okhttp")) - implementation(project(":exporters:sender:grpc-managed-channel")) - - implementation("io.grpc:grpc-stub") - implementation("io.grpc:grpc-netty") - implementation("com.fasterxml.jackson.core:jackson-core") - } - } - } suites { register("testWithoutUnsafe") {} } @@ -131,7 +120,7 @@ tasks { } afterEvaluate { - tasks.named("compileTestHttpSenderProviderJava") { + tasks.named("compileTestSenderProviderJava") { options.release.set(11) } } diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/SenderUtil.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/SenderUtil.java new file mode 100644 index 00000000000..567f1830a97 --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/SenderUtil.java @@ -0,0 +1,181 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal; + +import io.opentelemetry.api.internal.ConfigUtil; +import io.opentelemetry.common.ComponentLoader; +import io.opentelemetry.sdk.common.export.GrpcSenderProvider; +import io.opentelemetry.sdk.common.export.HttpSenderProvider; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Utilities for loading senders. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class SenderUtil { + + private static final Logger LOGGER = Logger.getLogger(SenderUtil.class.getName()); + + private static final String OLD_GRPC_SPI_PROPERTY = + "io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider"; + private static final String GRPC_SPI_PROPERTY = + "io.opentelemetry.sdk.common.export.GrpcSenderProvider"; + private static final String OLD_HTTP_SPI_PROPERTY = + "io.opentelemetry.exporter.internal.http.HttpSenderProvider"; + private static final String HTTP_SPI_PROPERTY = + "io.opentelemetry.sdk.common.export.HttpSenderProvider"; + + private SenderUtil() {} + + /** + * Resolve the {@link GrpcSenderProvider}. + * + *

If no {@link GrpcSenderProvider} is available, throw {@link IllegalStateException}. + * + *

If only one {@link GrpcSenderProvider} is available, use it. + * + *

If multiple are available and.. + * + *

    + *
  • {@code io.opentelemetry.sdk.common.export.GrpcSenderProvider} is empty, use the first + * found. + *
  • {@code io.opentelemetry.sdk.common.export.GrpcSenderProvider} is set, use the matching + * provider. If none match, throw {@link IllegalStateException}. + *
+ */ + public static GrpcSenderProvider resolveGrpcSenderProvider(ComponentLoader componentLoader) { + Map grpcSenderProviders = new HashMap<>(); + for (GrpcSenderProvider spi : componentLoader.load(GrpcSenderProvider.class)) { + grpcSenderProviders.put(spi.getClass().getName(), spi); + } + + // No provider on classpath, throw + if (grpcSenderProviders.isEmpty()) { + throw new IllegalStateException( + "No GrpcSenderProvider found on classpath. Please add dependency on " + + "opentelemetry-exporter-sender-okhttp or opentelemetry-exporter-sender-grpc-managed-channel"); + } + + // Exactly one provider on classpath, use it + if (grpcSenderProviders.size() == 1) { + return grpcSenderProviders.values().stream().findFirst().get(); + } + + // If we've reached here, there are multiple GrpcSenderProviders + String configuredSender = ConfigUtil.getString(GRPC_SPI_PROPERTY, ""); + // TODO: remove support for reading OLD_SPI_PROPERTY after 1.61.0 + if (configuredSender.isEmpty()) { + configuredSender = ConfigUtil.getString(OLD_GRPC_SPI_PROPERTY, ""); + if (configuredSender.isEmpty()) { + LOGGER.log( + Level.WARNING, + OLD_GRPC_SPI_PROPERTY + + " was used to set GrpcSenderProvider. Please use " + + GRPC_SPI_PROPERTY + + " instead. " + + OLD_GRPC_SPI_PROPERTY + + " will be removed after 1.61.0"); + } + } + + // Multiple providers but none configured, use first we find and log a warning + if (configuredSender.isEmpty()) { + LOGGER.log( + Level.WARNING, + "Multiple GrpcSenderProvider found. Please include only one, " + + "or specify preference setting " + + GRPC_SPI_PROPERTY + + " to the FQCN of the preferred provider."); + return grpcSenderProviders.values().stream().findFirst().get(); + } + + // Multiple providers with configuration match, use configuration match + if (grpcSenderProviders.containsKey(configuredSender)) { + return grpcSenderProviders.get(configuredSender); + } + + // Multiple providers, configured does not match, throw + throw new IllegalStateException( + "No GrpcSenderProvider matched configured " + GRPC_SPI_PROPERTY + ": " + configuredSender); + } + + /** + * Resolve the {@link HttpSenderProvider}. + * + *

If no {@link HttpSenderProvider} is available, throw {@link IllegalStateException}. + * + *

If only one {@link HttpSenderProvider} is available, use it. + * + *

If multiple are available and.. + * + *

    + *
  • {@code io.opentelemetry.sdk.common.export.HttpSenderProvider} is empty, use the first + * found. + *
  • {@code io.opentelemetry.sdk.common.export.HttpSenderProvider} is set, use the matching + * provider. If none match, throw {@link IllegalStateException}. + *
+ */ + public static HttpSenderProvider resolveHttpSenderProvider(ComponentLoader componentLoader) { + Map httpSenderProviders = new HashMap<>(); + for (HttpSenderProvider spi : componentLoader.load(HttpSenderProvider.class)) { + httpSenderProviders.put(spi.getClass().getName(), spi); + } + + // No provider on classpath, throw + if (httpSenderProviders.isEmpty()) { + throw new IllegalStateException( + "No HttpSenderProvider found on classpath. Please add dependency on " + + "opentelemetry-exporter-sender-okhttp or opentelemetry-exporter-sender-jdk"); + } + + // Exactly one provider on classpath, use it + if (httpSenderProviders.size() == 1) { + return httpSenderProviders.values().stream().findFirst().get(); + } + + // If we've reached here, there are multiple HttpSenderProviders + String configuredSender = ConfigUtil.getString(HTTP_SPI_PROPERTY, ""); + // TODO: remove support for reading OLD_SPI_PROPERTY after 1.61.0 + if (configuredSender.isEmpty()) { + configuredSender = ConfigUtil.getString(OLD_HTTP_SPI_PROPERTY, ""); + if (configuredSender.isEmpty()) { + LOGGER.log( + Level.WARNING, + OLD_HTTP_SPI_PROPERTY + + " was used to set HttpSenderProvider. Please use " + + HTTP_SPI_PROPERTY + + " instead. " + + OLD_HTTP_SPI_PROPERTY + + " will be removed after 1.61.0"); + } + } + + // Multiple providers but none configured, use first we find and log a warning + if (configuredSender.isEmpty()) { + LOGGER.log( + Level.WARNING, + "Multiple HttpSenderProvider found. Please include only one, " + + "or specify preference setting " + + HTTP_SPI_PROPERTY + + " to the FQCN of the preferred provider."); + return httpSenderProviders.values().stream().findFirst().get(); + } + + // Multiple providers with configuration match, use configuration match + if (httpSenderProviders.containsKey(configuredSender)) { + return httpSenderProviders.get(configuredSender); + } + + // Multiple providers, configured does not match, throw + throw new IllegalStateException( + "No HttpSenderProvider matched configured " + HTTP_SPI_PROPERTY + ": " + configuredSender); + } +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java index 8d83755ea54..53a3d5c7c5e 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java @@ -5,9 +5,10 @@ package io.opentelemetry.exporter.internal.grpc; +import static io.opentelemetry.exporter.internal.SenderUtil.resolveGrpcSenderProvider; + import io.grpc.ManagedChannel; import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.api.internal.ConfigUtil; import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.common.ComponentLoader; import io.opentelemetry.exporter.internal.ExporterBuilderUtil; @@ -49,10 +50,6 @@ public class GrpcExporterBuilder { public static final long DEFAULT_CONNECT_TIMEOUT_SECS = 10; private static final Logger LOGGER = Logger.getLogger(GrpcExporterBuilder.class.getName()); - private static final String OLD_SPI_PROPERTY = - "io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider"; - private static final String SPI_PROPERTY = - "io.opentelemetry.sdk.common.export.GrpcSenderProvider"; private final StandardComponentId.ExporterType exporterType; private final String fullMethodName; @@ -219,7 +216,7 @@ public GrpcExporter build() { }; boolean isPlainHttp = "http".equals(endpoint.getScheme()); - GrpcSenderProvider grpcSenderProvider = resolveGrpcSenderProvider(); + GrpcSenderProvider grpcSenderProvider = resolveGrpcSenderProvider(componentLoader); GrpcSender grpcSender = grpcSenderProvider.createSender( ImmutableGrpcSenderConfig.create( @@ -284,76 +281,4 @@ public String toString(boolean includePrefixAndSuffix) { public String toString() { return toString(true); } - - /** - * Resolve the {@link GrpcSenderProvider}. - * - *

If no {@link GrpcSenderProvider} is available, throw {@link IllegalStateException}. - * - *

If only one {@link GrpcSenderProvider} is available, use it. - * - *

If multiple are available and.. - * - *

    - *
  • {@code io.opentelemetry.sdk.common.export.GrpcSenderProvider} is empty, use the first - * found. - *
  • {@code io.opentelemetry.sdk.common.export.GrpcSenderProvider} is set, use the matching - * provider. If none match, throw {@link IllegalStateException}. - *
- */ - private GrpcSenderProvider resolveGrpcSenderProvider() { - Map grpcSenderProviders = new HashMap<>(); - for (GrpcSenderProvider spi : componentLoader.load(GrpcSenderProvider.class)) { - grpcSenderProviders.put(spi.getClass().getName(), spi); - } - - // No provider on classpath, throw - if (grpcSenderProviders.isEmpty()) { - throw new IllegalStateException( - "No GrpcSenderProvider found on classpath. Please add dependency on " - + "opentelemetry-exporter-sender-okhttp or opentelemetry-exporter-sender-grpc-managed-channel"); - } - - // Exactly one provider on classpath, use it - if (grpcSenderProviders.size() == 1) { - return grpcSenderProviders.values().stream().findFirst().get(); - } - - // If we've reached here, there are multiple GrpcSenderProviders - String configuredSender = ConfigUtil.getString(SPI_PROPERTY, ""); - // TODO: remove support for reading OLD_SPI_PROPERTY after 1.61.0 - if (configuredSender.isEmpty()) { - configuredSender = ConfigUtil.getString(OLD_SPI_PROPERTY, ""); - if (configuredSender.isEmpty()) { - LOGGER.log( - Level.WARNING, - OLD_SPI_PROPERTY - + " was used to set GrpcSenderProvider. Please use " - + SPI_PROPERTY - + " instead. " - + OLD_SPI_PROPERTY - + " will be removed after 1.61.0"); - } - } - - // Multiple providers but none configured, use first we find and log a warning - if (configuredSender.isEmpty()) { - LOGGER.log( - Level.WARNING, - "Multiple GrpcSenderProvider found. Please include only one, " - + "or specify preference setting " - + SPI_PROPERTY - + " to the FQCN of the preferred provider."); - return grpcSenderProviders.values().stream().findFirst().get(); - } - - // Multiple providers with configuration match, use configuration match - if (grpcSenderProviders.containsKey(configuredSender)) { - return grpcSenderProviders.get(configuredSender); - } - - // Multiple providers, configured does not match, throw - throw new IllegalStateException( - "No GrpcSenderProvider matched configured " + SPI_PROPERTY + ": " + configuredSender); - } } diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcSenderConfig.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcSenderConfig.java index 6903488564b..66a3a2ad5f0 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcSenderConfig.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcSenderConfig.java @@ -18,11 +18,15 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.X509TrustManager; +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ @AutoValue -abstract class ImmutableGrpcSenderConfig implements ExtendedGrpcSenderConfig { +public abstract class ImmutableGrpcSenderConfig implements ExtendedGrpcSenderConfig { @SuppressWarnings("TooManyParameters") - static ImmutableGrpcSenderConfig create( + public static ImmutableGrpcSenderConfig create( URI endpoint, String fullMethodName, @Nullable Compressor compressor, diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ManagedChannelUtil.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ManagedChannelUtil.java index de4adb4512e..efb08ee5544 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ManagedChannelUtil.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ManagedChannelUtil.java @@ -7,18 +7,13 @@ import static java.util.stream.Collectors.toList; -import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.opentelemetry.exporter.internal.RetryUtil; -import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; /** * Utilities for working with gRPC channels. @@ -28,8 +23,6 @@ */ public final class ManagedChannelUtil { - private static final Logger logger = Logger.getLogger(ManagedChannelUtil.class.getName()); - /** * Convert the {@link RetryPolicy} into a gRPC service config for the {@code serviceName}. The * resulting map can be passed to {@link ManagedChannelBuilder#defaultServiceConfig(Map)}. @@ -53,29 +46,5 @@ public final class ManagedChannelUtil { return Collections.singletonMap("methodConfig", Collections.singletonList(methodConfig)); } - /** Shutdown the gRPC channel. */ - public static CompletableResultCode shutdownChannel(ManagedChannel managedChannel) { - CompletableResultCode result = new CompletableResultCode(); - managedChannel.shutdown(); - // Remove thread creation if gRPC adds an asynchronous shutdown API. - // https://github.com/grpc/grpc-java/issues/8432 - Thread thread = - new Thread( - () -> { - try { - managedChannel.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.log(Level.WARNING, "Failed to shutdown the gRPC channel", e); - result.fail(); - } - result.succeed(); - }); - thread.setDaemon(true); - thread.setName("grpc-cleanup"); - thread.start(); - return result; - } - private ManagedChannelUtil() {} } diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java index 46c6687b26c..2e06f8caa26 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java @@ -6,10 +6,10 @@ package io.opentelemetry.exporter.internal.http; import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.api.internal.ConfigUtil; import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.common.ComponentLoader; import io.opentelemetry.exporter.internal.ExporterBuilderUtil; +import io.opentelemetry.exporter.internal.SenderUtil; import io.opentelemetry.exporter.internal.TlsConfigHelper; import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.sdk.common.InternalTelemetryVersion; @@ -49,10 +49,6 @@ public final class HttpExporterBuilder { public static final long DEFAULT_CONNECT_TIMEOUT_SECS = 10; private static final Logger LOGGER = Logger.getLogger(HttpExporterBuilder.class.getName()); - private static final String OLD_SPI_PROPERTY = - "io.opentelemetry.exporter.internal.http.HttpSenderProvider"; - private static final String SPI_PROPERTY = - "io.opentelemetry.sdk.common.export.HttpSenderProvider"; private StandardComponentId.ExporterType exporterType; @@ -235,7 +231,7 @@ public HttpExporter build() { }; boolean isPlainHttp = endpoint.getScheme().equals("http"); - HttpSenderProvider httpSenderProvider = resolveHttpSenderProvider(); + HttpSenderProvider httpSenderProvider = SenderUtil.resolveHttpSenderProvider(componentLoader); HttpSender httpSender = httpSenderProvider.createSender( ImmutableHttpSenderConfig.create( @@ -299,76 +295,4 @@ public String toString(boolean includePrefixAndSuffix) { public String toString() { return toString(true); } - - /** - * Resolve the {@link HttpSenderProvider}. - * - *

If no {@link HttpSenderProvider} is available, throw {@link IllegalStateException}. - * - *

If only one {@link HttpSenderProvider} is available, use it. - * - *

If multiple are available and.. - * - *

    - *
  • {@code io.opentelemetry.sdk.common.export.HttpSenderProvider} is empty, use the first - * found. - *
  • {@code io.opentelemetry.sdk.common.export.HttpSenderProvider} is set, use the matching - * provider. If none match, throw {@link IllegalStateException}. - *
- */ - private HttpSenderProvider resolveHttpSenderProvider() { - Map httpSenderProviders = new HashMap<>(); - for (HttpSenderProvider spi : componentLoader.load(HttpSenderProvider.class)) { - httpSenderProviders.put(spi.getClass().getName(), spi); - } - - // No provider on classpath, throw - if (httpSenderProviders.isEmpty()) { - throw new IllegalStateException( - "No HttpSenderProvider found on classpath. Please add dependency on " - + "opentelemetry-exporter-sender-okhttp or opentelemetry-exporter-sender-jdk"); - } - - // Exactly one provider on classpath, use it - if (httpSenderProviders.size() == 1) { - return httpSenderProviders.values().stream().findFirst().get(); - } - - // If we've reached here, there are multiple HttpSenderProviders - String configuredSender = ConfigUtil.getString(SPI_PROPERTY, ""); - // TODO: remove support for reading OLD_SPI_PROPERTY after 1.61.0 - if (configuredSender.isEmpty()) { - configuredSender = ConfigUtil.getString(OLD_SPI_PROPERTY, ""); - if (configuredSender.isEmpty()) { - LOGGER.log( - Level.WARNING, - OLD_SPI_PROPERTY - + " was used to set HttpSenderProvider. Please use " - + SPI_PROPERTY - + " instead. " - + OLD_SPI_PROPERTY - + " will be removed after 1.61.0"); - } - } - - // Multiple providers but none configured, use first we find and log a warning - if (configuredSender.isEmpty()) { - LOGGER.log( - Level.WARNING, - "Multiple HttpSenderProvider found. Please include only one, " - + "or specify preference setting " - + SPI_PROPERTY - + " to the FQCN of the preferred provider."); - return httpSenderProviders.values().stream().findFirst().get(); - } - - // Multiple providers with configuration match, use configuration match - if (httpSenderProviders.containsKey(configuredSender)) { - return httpSenderProviders.get(configuredSender); - } - - // Multiple providers, configured does not match, throw - throw new IllegalStateException( - "No HttpSenderProvider matched configured " + SPI_PROPERTY + ": " + configuredSender); - } } diff --git a/exporters/common/src/testGrpcSenderProvider/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java b/exporters/common/src/testGrpcSenderProvider/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java deleted file mode 100644 index cabb8ecbeb4..00000000000 --- a/exporters/common/src/testGrpcSenderProvider/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.exporter.internal.grpc; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatCode; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -import io.github.netmikey.logunit.api.LogCapturer; -import io.grpc.ManagedChannelBuilder; -import io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSender; -import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSender; -import io.opentelemetry.internal.testing.slf4j.SuppressLogger; -import io.opentelemetry.sdk.common.internal.StandardComponentId; -import java.net.URI; -import java.net.URISyntaxException; -import java.time.Duration; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; -import org.junitpioneer.jupiter.SetSystemProperty; - -class GrpcExporterTest { - - @RegisterExtension - LogCapturer logCapturer = - LogCapturer.create().captureForLogger(GrpcExporterBuilder.class.getName()); - - @Test - @SuppressLogger(GrpcExporterBuilder.class) - void build_multipleSendersNoConfiguration() { - assertThatCode( - () -> - new GrpcExporterBuilder( - StandardComponentId.ExporterType.OTLP_GRPC_SPAN_EXPORTER, - Duration.ofSeconds(10), - new URI("http://localhost"), - "io.opentelemetry.Dummy/Method") - .setChannel(ManagedChannelBuilder.forTarget("localhost").build()) - .build()) - .doesNotThrowAnyException(); - - logCapturer.assertContains( - "Multiple GrpcSenderProvider found. Please include only one, " - + "or specify preference setting io.opentelemetry.sdk.common.export.GrpcSenderProvider " - + "to the FQCN of the preferred provider."); - } - - // TODO: delete test after support for old spi is removed - @Test - @SetSystemProperty( - key = "io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider", - value = - "io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSenderProvider") - void build_configureUsingOldSpi() throws URISyntaxException { - assertThat( - new GrpcExporterBuilder( - StandardComponentId.ExporterType.OTLP_GRPC_SPAN_EXPORTER, - Duration.ofSeconds(10), - new URI("http://localhost"), - "io.opentelemetry.Dummy/Method") - .setChannel(ManagedChannelBuilder.forTarget("localhost").build()) - .build()) - .extracting("grpcSender") - .isInstanceOf(UpstreamGrpcSender.class); - - assertThat(logCapturer.getEvents()).isEmpty(); - } - - @Test - @SetSystemProperty( - key = "io.opentelemetry.sdk.common.export.GrpcSenderProvider", - value = - "io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSenderProvider") - void build_multipleSendersWithUpstream() throws URISyntaxException { - assertThat( - new GrpcExporterBuilder( - StandardComponentId.ExporterType.OTLP_GRPC_SPAN_EXPORTER, - Duration.ofSeconds(10), - new URI("http://localhost"), - "io.opentelemetry.Dummy/Method") - .setChannel(ManagedChannelBuilder.forTarget("localhost").build()) - .build()) - .extracting("grpcSender") - .isInstanceOf(UpstreamGrpcSender.class); - - assertThat(logCapturer.getEvents()).isEmpty(); - } - - @Test - @SetSystemProperty( - key = "io.opentelemetry.sdk.common.export.GrpcSenderProvider", - value = "io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSenderProvider") - void build_multipleSendersWithOkHttp() throws URISyntaxException { - assertThat( - new GrpcExporterBuilder( - StandardComponentId.ExporterType.OTLP_GRPC_SPAN_EXPORTER, - Duration.ofSeconds(10), - new URI("http://localhost"), - "io.opentelemetry.Dummy/Method") - .setChannel(ManagedChannelBuilder.forTarget("localhost").build()) - .build()) - .extracting("grpcSender") - .isInstanceOf(OkHttpGrpcSender.class); - - assertThat(logCapturer.getEvents()).isEmpty(); - } - - @Test - @SetSystemProperty(key = "io.opentelemetry.sdk.common.export.GrpcSenderProvider", value = "foo") - void build_multipleSendersNoMatch() { - assertThatThrownBy( - () -> - new GrpcExporterBuilder( - StandardComponentId.ExporterType.OTLP_GRPC_SPAN_EXPORTER, - Duration.ofSeconds(10), - new URI("http://localhost"), - "io.opentelemetry.Dummy/Method") - .setChannel(ManagedChannelBuilder.forTarget("localhost").build()) - .build()) - .isInstanceOf(IllegalStateException.class) - .hasMessage( - "No GrpcSenderProvider matched configured io.opentelemetry.sdk.common.export.GrpcSenderProvider: foo"); - - assertThat(logCapturer.getEvents()).isEmpty(); - } -} diff --git a/exporters/common/src/testSenderProvider/java/io/opentelemetry/exporter/internal/grpc/SenderUtilTest.java b/exporters/common/src/testSenderProvider/java/io/opentelemetry/exporter/internal/grpc/SenderUtilTest.java new file mode 100644 index 00000000000..4e6799c9eb4 --- /dev/null +++ b/exporters/common/src/testSenderProvider/java/io/opentelemetry/exporter/internal/grpc/SenderUtilTest.java @@ -0,0 +1,148 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.grpc; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.github.netmikey.logunit.api.LogCapturer; +import io.opentelemetry.common.ComponentLoader; +import io.opentelemetry.exporter.internal.SenderUtil; +import io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSenderProvider; +import io.opentelemetry.exporter.sender.jdk.internal.JdkHttpSenderProvider; +import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSenderProvider; +import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpHttpSenderProvider; +import io.opentelemetry.internal.testing.slf4j.SuppressLogger; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junitpioneer.jupiter.SetSystemProperty; + +class SenderUtilTest { + + @RegisterExtension + LogCapturer logCapturer = LogCapturer.create().captureForLogger(SenderUtil.class.getName()); + + private final ComponentLoader componentLoader = + ComponentLoader.forClassLoader(SenderUtilTest.class.getClassLoader()); + + @Test + @SuppressLogger(SenderUtil.class) + void resolveGrpcSenderProvider_multipleSendersNoConfiguration() { + assertThatCode(() -> SenderUtil.resolveGrpcSenderProvider(componentLoader)) + .doesNotThrowAnyException(); + + logCapturer.assertContains( + "Multiple GrpcSenderProvider found. Please include only one, " + + "or specify preference setting io.opentelemetry.sdk.common.export.GrpcSenderProvider " + + "to the FQCN of the preferred provider."); + } + + // TODO: delete test after support for old spi is removed + @Test + @SetSystemProperty( + key = "io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider", + value = + "io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSenderProvider") + void resolveGrpcSenderProvider_configureUsingOldSpi() { + assertThat(SenderUtil.resolveGrpcSenderProvider(componentLoader)) + .isInstanceOf(UpstreamGrpcSenderProvider.class); + + assertThat(logCapturer.getEvents()).isEmpty(); + } + + @Test + @SetSystemProperty( + key = "io.opentelemetry.sdk.common.export.GrpcSenderProvider", + value = + "io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSenderProvider") + void resolveGrpcSenderProvider_multipleSendersWithUpstream() { + assertThat(SenderUtil.resolveGrpcSenderProvider(componentLoader)) + .isInstanceOf(UpstreamGrpcSenderProvider.class); + + assertThat(logCapturer.getEvents()).isEmpty(); + } + + @Test + @SetSystemProperty( + key = "io.opentelemetry.sdk.common.export.GrpcSenderProvider", + value = "io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSenderProvider") + void resolveGrpcSenderProvider_multipleSendersWithOkHttp() { + assertThat(SenderUtil.resolveGrpcSenderProvider(componentLoader)) + .isInstanceOf(OkHttpGrpcSenderProvider.class); + + assertThat(logCapturer.getEvents()).isEmpty(); + } + + @Test + @SetSystemProperty(key = "io.opentelemetry.sdk.common.export.GrpcSenderProvider", value = "foo") + void resolveGrpcSenderProvider_multipleSendersNoMatch() { + assertThatThrownBy(() -> SenderUtil.resolveGrpcSenderProvider(componentLoader)) + .isInstanceOf(IllegalStateException.class) + .hasMessage( + "No GrpcSenderProvider matched configured io.opentelemetry.sdk.common.export.GrpcSenderProvider: foo"); + + assertThat(logCapturer.getEvents()).isEmpty(); + } + + @Test + @SuppressLogger(SenderUtil.class) + void build_multipleSendersNoConfiguration() { + Assertions.assertThatCode(() -> SenderUtil.resolveHttpSenderProvider(componentLoader)) + .doesNotThrowAnyException(); + + logCapturer.assertContains( + "Multiple HttpSenderProvider found. Please include only one, " + + "or specify preference setting io.opentelemetry.sdk.common.export.HttpSenderProvider " + + "to the FQCN of the preferred provider."); + } + + // TODO: delete test after support for old spi is removed + @Test + @SetSystemProperty( + key = "io.opentelemetry.exporter.internal.http.HttpSenderProvider", + value = "io.opentelemetry.exporter.sender.jdk.internal.JdkHttpSenderProvider") + void build_configureUsingOldSpi() { + assertThat(SenderUtil.resolveHttpSenderProvider(componentLoader)) + .isInstanceOf(JdkHttpSenderProvider.class); + + assertThat(logCapturer.getEvents()).isEmpty(); + } + + @Test + @SetSystemProperty( + key = "io.opentelemetry.sdk.common.export.HttpSenderProvider", + value = "io.opentelemetry.exporter.sender.jdk.internal.JdkHttpSenderProvider") + void build_multipleSendersWithJdk() { + assertThat(SenderUtil.resolveHttpSenderProvider(componentLoader)) + .isInstanceOf(JdkHttpSenderProvider.class); + + assertThat(logCapturer.getEvents()).isEmpty(); + } + + @Test + @SetSystemProperty( + key = "io.opentelemetry.sdk.common.export.HttpSenderProvider", + value = "io.opentelemetry.exporter.sender.okhttp.internal.OkHttpHttpSenderProvider") + void build_multipleSendersWithOkHttp() { + assertThat(SenderUtil.resolveHttpSenderProvider(componentLoader)) + .isInstanceOf(OkHttpHttpSenderProvider.class); + + assertThat(logCapturer.getEvents()).isEmpty(); + } + + @Test + @SetSystemProperty(key = "io.opentelemetry.sdk.common.export.HttpSenderProvider", value = "foo") + void build_multipleSendersNoMatch() { + assertThatThrownBy(() -> SenderUtil.resolveHttpSenderProvider(componentLoader)) + .isInstanceOf(IllegalStateException.class) + .hasMessage( + "No HttpSenderProvider matched configured io.opentelemetry.sdk.common.export.HttpSenderProvider: foo"); + + assertThat(logCapturer.getEvents()).isEmpty(); + } +} diff --git a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java index a6f2270a541..0ccf7f4d00e 100644 --- a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java +++ b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java @@ -28,6 +28,7 @@ import io.opentelemetry.sdk.common.export.GrpcSender; import io.opentelemetry.sdk.common.export.GrpcStatusCode; import io.opentelemetry.sdk.common.export.MessageWriter; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -69,8 +70,37 @@ public InputStream stream(byte[] value) { } @Override - public byte[] parse(InputStream stream) { - return new byte[] {}; // TODO: drain input to byte array + public byte[] parse(InputStream inputStream) { + try { + int bufLen = 4 * 0x400; // 4KB + byte[] buf = new byte[bufLen]; + int readLen; + IOException exception = null; + + try { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + while ((readLen = inputStream.read(buf, 0, bufLen)) != -1) { + outputStream.write(buf, 0, readLen); + } + return outputStream.toByteArray(); + } + } catch (IOException e) { + exception = e; + throw e; + } finally { + if (exception == null) { + inputStream.close(); + } else { + try { + inputStream.close(); + } catch (IOException e) { + exception.addSuppressed(e); + } + } + } + } catch (IOException e) { + throw new IllegalStateException("Failed to read response", e); + } } }; diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java index cbd76aaacae..8b6895bdb5a 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java @@ -32,6 +32,7 @@ import io.opentelemetry.sdk.common.export.GrpcStatusCode; import io.opentelemetry.sdk.common.export.MessageWriter; import io.opentelemetry.sdk.common.export.RetryPolicy; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -60,6 +61,9 @@ import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.ResponseBody; +import okio.Buffer; +import okio.GzipSource; +import okio.Okio; /** * A {@link GrpcSender} which uses OkHttp instead of grpc-java. @@ -165,10 +169,10 @@ public void onResponse(Call call, Response response) { // Must consume body before accessing trailers byte[] bodyBytes = null; try { - bodyBytes = body.bytes(); + bodyBytes = getResponseMessageBytes(body.bytes()); } catch (IOException e) { bodyBytes = new byte[0]; - logger.log(Level.WARNING, "Failed to read response body", e); + logger.log(Level.FINE, "Failed to read response body", e); } byte[] resolvedBodyBytes = bodyBytes; GrpcStatusCode status = grpcStatus(response); @@ -195,6 +199,24 @@ public byte[] getResponseMessage() { })); } + private static byte[] getResponseMessageBytes(byte[] bodyBytes) throws IOException { + if (bodyBytes.length > 5) { + ByteArrayInputStream bodyStream = new ByteArrayInputStream(bodyBytes); + bodyStream.skip(5); + if (bodyBytes[0] == '1') { + Buffer buffer = new Buffer(); + buffer.readFrom(bodyStream); + GzipSource gzipSource = new GzipSource(buffer); + bodyBytes = Okio.buffer(gzipSource).getBuffer().readByteArray(); + } else { + bodyBytes = Arrays.copyOfRange(bodyBytes, 5, bodyBytes.length); + } + return bodyBytes; + } else { + throw new IOException("Invalid response"); + } + } + private static GrpcStatusCode grpcStatus(Response response) { // Status can either be in the headers or trailers depending on error String grpcStatus = response.header(GRPC_STATUS); diff --git a/sdk-extensions/jaeger-remote-sampler/build.gradle.kts b/sdk-extensions/jaeger-remote-sampler/build.gradle.kts index 871fd46b065..528c801e9c4 100644 --- a/sdk-extensions/jaeger-remote-sampler/build.gradle.kts +++ b/sdk-extensions/jaeger-remote-sampler/build.gradle.kts @@ -20,8 +20,6 @@ dependencies { implementation(project(":exporters:common")) implementation(project(":exporters:sender:okhttp")) - implementation("com.squareup.okhttp3:okhttp") - compileOnly("io.grpc:grpc-api") compileOnly("io.grpc:grpc-protobuf") compileOnly("io.grpc:grpc-stub") @@ -41,6 +39,7 @@ testing { dependencies { implementation(project(":sdk:testing")) implementation(project(":exporters:common")) + implementation(project(":exporters:sender:grpc-managed-channel")) implementation("com.google.protobuf:protobuf-java") implementation("com.linecorp.armeria:armeria-junit5") implementation("com.linecorp.armeria:armeria-grpc-protocol") @@ -48,6 +47,16 @@ testing { implementation("io.grpc:grpc-netty") implementation("io.grpc:grpc-stub") } + targets { + all { + testTask { + systemProperty( + "io.opentelemetry.sdk.common.export.GrpcSenderProvider", + "io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSenderProvider" + ) + } + } + } } } } diff --git a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/GrpcService.java b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/GrpcService.java deleted file mode 100644 index b40df754ea2..00000000000 --- a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/GrpcService.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.extension.trace.jaeger.sampler; - -import io.opentelemetry.exporter.internal.marshal.Marshaler; -import io.opentelemetry.sdk.common.CompletableResultCode; - -interface GrpcService { - - /** - * Exports the {@code exportRequest} which is a request {@link Marshaler} for {@code numItems} - * items. - */ - SamplingStrategyResponseUnMarshaler execute( - SamplingStrategyParametersMarshaler request, SamplingStrategyResponseUnMarshaler response); - - /** Shuts the exporter down. */ - CompletableResultCode shutdown(); -} diff --git a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSampler.java b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSampler.java index 12fc32c32bc..0b2a9d8dd21 100644 --- a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSampler.java +++ b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSampler.java @@ -8,11 +8,16 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.export.GrpcResponse; +import io.opentelemetry.sdk.common.export.GrpcSender; +import io.opentelemetry.sdk.common.export.GrpcStatusCode; +import io.opentelemetry.sdk.common.export.MessageWriter; import io.opentelemetry.sdk.common.internal.DaemonThreadFactory; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.samplers.Sampler; import io.opentelemetry.sdk.trace.samplers.SamplingResult; import java.io.Closeable; +import java.io.IOException; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -28,23 +33,23 @@ public final class JaegerRemoteSampler implements Sampler, Closeable { private static final String WORKER_THREAD_NAME = JaegerRemoteSampler.class.getSimpleName() + "_WorkerThread"; + private static final String type = "remoteSampling"; private final String serviceName; - private final ScheduledExecutorService pollExecutor; private final ScheduledFuture pollFuture; private volatile Sampler sampler; - private final GrpcService delegate; + private final GrpcSender grpcSender; JaegerRemoteSampler( - GrpcService delegate, + GrpcSender grpcSender, @Nullable String serviceName, int pollingIntervalMs, Sampler initialSampler) { this.serviceName = serviceName != null ? serviceName : ""; - this.delegate = delegate; + this.grpcSender = grpcSender; this.sampler = initialSampler; pollExecutor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(WORKER_THREAD_NAME)); pollFuture = @@ -64,17 +69,69 @@ public SamplingResult shouldSample( } private void getAndUpdateSampler() { - try { - SamplingStrategyResponseUnMarshaler samplingStrategyResponseUnMarshaler = - delegate.execute( - SamplingStrategyParametersMarshaler.create(this.serviceName), - new SamplingStrategyResponseUnMarshaler()); - SamplingStrategyResponse response = samplingStrategyResponseUnMarshaler.get(); - if (response != null) { - this.sampler = updateSampler(response); + SamplingStrategyParametersMarshaler marsher = + SamplingStrategyParametersMarshaler.create(this.serviceName); + MessageWriter messageWriter = marsher.toBinaryMessageWriter(); + grpcSender.send(messageWriter, this::onResponse, JaegerRemoteSampler::onError); + } + + private void onResponse(GrpcResponse grpcResponse) { + GrpcStatusCode statusCode = grpcResponse.getStatusCode(); + + if (statusCode == GrpcStatusCode.OK) { + try { + SamplingStrategyResponse strategyResponse = + SamplingStrategyResponseUnMarshaler.read(grpcResponse.getResponseMessage()); + sampler = updateSampler(strategyResponse); + } catch (IOException e) { + logger.log(Level.WARNING, "Failed to unmarshal strategy response", e); } - } catch (Throwable e) { // keep the timer thread alive - logger.log(Level.WARNING, "Failed to update sampler", e); + return; + } + + switch (statusCode) { + case UNIMPLEMENTED: + logger.log( + Level.SEVERE, + "Failed to execute " + + type + + "s. Server responded with UNIMPLEMENTED. " + + "Full error message: " + + grpcResponse.getStatusDescription()); + break; + case UNAVAILABLE: + logger.log( + Level.SEVERE, + "Failed to execute " + + type + + "s. Server is UNAVAILABLE. " + + "Make sure your service is running and reachable from this network. " + + "Full error message:" + + grpcResponse.getStatusDescription()); + break; + default: + logger.log( + Level.WARNING, + "Failed to export " + + type + + "s. Server responded with gRPC status code " + + statusCode.name() + + ". Error message: " + + grpcResponse.getStatusDescription()); + break; + } + } + + private static void onError(Throwable e) { + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. The request could not be executed. Error message: " + + e.getMessage(), + e); + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "Failed to export " + type + "s. Details follow: " + e); } } @@ -124,6 +181,6 @@ public static JaegerRemoteSamplerBuilder builder() { public void close() { pollFuture.cancel(true); pollExecutor.shutdownNow(); - delegate.shutdown(); + grpcSender.shutdown(); } } diff --git a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java index adc286ce824..10a63cb0ade 100644 --- a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java +++ b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java @@ -9,36 +9,35 @@ import io.grpc.ManagedChannel; import io.opentelemetry.api.internal.Utils; +import io.opentelemetry.common.ComponentLoader; import io.opentelemetry.exporter.internal.ExporterBuilderUtil; +import io.opentelemetry.exporter.internal.SenderUtil; import io.opentelemetry.exporter.internal.TlsConfigHelper; -import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpUtil; +import io.opentelemetry.exporter.internal.grpc.ImmutableGrpcSenderConfig; +import io.opentelemetry.sdk.common.export.GrpcSender; +import io.opentelemetry.sdk.common.export.GrpcSenderProvider; import io.opentelemetry.sdk.trace.samplers.Sampler; import java.net.URI; import java.time.Duration; -import java.util.Arrays; import java.util.Collections; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import javax.net.ssl.SSLContext; import javax.net.ssl.X509TrustManager; -import okhttp3.ConnectionSpec; -import okhttp3.Headers; -import okhttp3.OkHttpClient; -import okhttp3.Protocol; /** A builder for {@link JaegerRemoteSampler}. */ public final class JaegerRemoteSamplerBuilder { private static final String GRPC_SERVICE_NAME = "jaeger.api_v2.SamplingManager"; - // Visible for testing - static final String GRPC_ENDPOINT_PATH = "/" + GRPC_SERVICE_NAME + "/GetSamplingStrategy"; + private static final String GRPC_FULL_METHOD_NAME = + GRPC_SERVICE_NAME + "/" + "GetSamplingStrategy"; private static final String DEFAULT_ENDPOINT_URL = "http://localhost:14250"; private static final URI DEFAULT_ENDPOINT = URI.create(DEFAULT_ENDPOINT_URL); private static final int DEFAULT_POLLING_INTERVAL_MILLIS = 60000; private static final Sampler INITIAL_SAMPLER = Sampler.parentBased(Sampler.traceIdRatioBased(0.001)); - private static final long DEFAULT_TIMEOUT_SECS = 10; + private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(10); private URI endpoint = DEFAULT_ENDPOINT; private Sampler initialSampler = INITIAL_SAMPLER; @@ -153,54 +152,27 @@ public JaegerRemoteSamplerBuilder setChannel(ManagedChannel channel) { * @return the remote sampler instance. */ public JaegerRemoteSampler build() { - if (grpcChannel != null) { - return new JaegerRemoteSampler( - UpstreamGrpcExporterFactory.buildWithChannel((ManagedChannel) grpcChannel), - serviceName, - pollingIntervalMillis, - initialSampler); - } - - OkHttpClient.Builder clientBuilder = - new OkHttpClient.Builder().dispatcher(OkHttpUtil.newDispatcher()); - - clientBuilder.callTimeout(Duration.ofNanos(TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS))); - - String endpoint = this.endpoint.resolve(GRPC_ENDPOINT_PATH).toString(); - boolean isPlainHttp = endpoint.startsWith("http://"); - - SSLContext sslContext = isPlainHttp ? null : tlsConfigHelper.getSslContext(); - X509TrustManager trustManager = isPlainHttp ? null : tlsConfigHelper.getTrustManager(); - if (sslContext != null && trustManager != null) { - clientBuilder.sslSocketFactory(sslContext.getSocketFactory(), trustManager); - } - - if (isPlainHttp) { - clientBuilder.connectionSpecs(Collections.singletonList(ConnectionSpec.CLEARTEXT)); - clientBuilder.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE)); - } else { - clientBuilder.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1)); - } - - Headers.Builder headers = new Headers.Builder(); - headers.add("te", "trailers"); - - return new JaegerRemoteSampler( - new OkHttpGrpcService("remoteSampling", clientBuilder.build(), endpoint, headers.build()), - serviceName, - pollingIntervalMillis, - initialSampler); + GrpcSender grpcSender = resolveGrpcSender(); + return new JaegerRemoteSampler(grpcSender, serviceName, pollingIntervalMillis, initialSampler); } - // Use an inner class to ensure GrpcExporterBuilder does not have classloading dependencies on - // upstream gRPC. - private static class UpstreamGrpcExporterFactory { - private static GrpcService buildWithChannel(ManagedChannel channel) { - return new UpstreamGrpcService( - "remoteSampling", - channel, - MarshallerRemoteSamplerServiceGrpc.getPostSpansMethod, - TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS)); - } + private GrpcSender resolveGrpcSender() { + ComponentLoader componentLoader = + ComponentLoader.forClassLoader(JaegerRemoteSamplerBuilder.class.getClassLoader()); + GrpcSenderProvider grpcSenderProvider = SenderUtil.resolveGrpcSenderProvider(componentLoader); + ImmutableGrpcSenderConfig grpcSenderConfig = + ImmutableGrpcSenderConfig.create( + endpoint, + GRPC_FULL_METHOD_NAME, + null, + DEFAULT_TIMEOUT, + DEFAULT_TIMEOUT, + Collections::emptyMap, + null, + tlsConfigHelper.getSslContext(), + tlsConfigHelper.getTrustManager(), + null, + grpcChannel); + return grpcSenderProvider.createSender(grpcSenderConfig); } } diff --git a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/MarshallerRemoteSamplerServiceGrpc.java b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/MarshallerRemoteSamplerServiceGrpc.java deleted file mode 100644 index 800a3e64b38..00000000000 --- a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/MarshallerRemoteSamplerServiceGrpc.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.extension.trace.jaeger.sampler; - -import static io.grpc.MethodDescriptor.generateFullMethodName; - -import io.grpc.MethodDescriptor; -import io.opentelemetry.exporter.internal.grpc.MarshalerInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; - -class MarshallerRemoteSamplerServiceGrpc { - - private static final String SERVICE_NAME = "jaeger.api_v2.SamplingManager"; - - private static final MethodDescriptor.Marshaller - REQUEST_MARSHALLER = - new MethodDescriptor.Marshaller() { - @Override - public InputStream stream(SamplingStrategyParametersMarshaler value) { - return new MarshalerInputStream(value.toBinaryMessageWriter()); - } - - @Override - public SamplingStrategyParametersMarshaler parse(InputStream stream) { - throw new UnsupportedOperationException("Only for serializing"); - } - }; - - private static final MethodDescriptor.Marshaller - RESPONSE_MARSHALLER = - new MethodDescriptor.Marshaller() { - @Override - public InputStream stream(SamplingStrategyResponseUnMarshaler value) { - throw new UnsupportedOperationException("Only for parsing"); - } - - @Override - public SamplingStrategyResponseUnMarshaler parse(InputStream stream) { - SamplingStrategyResponseUnMarshaler unmarshaller = - new SamplingStrategyResponseUnMarshaler(); - try { - unmarshaller.read(readAllBytes(stream)); - } catch (IOException e) { - // could not parse response - throw new IllegalStateException( - "could not parse jaeger remote sampling response", e); - } - return unmarshaller; - } - }; - - static final MethodDescriptor< - SamplingStrategyParametersMarshaler, SamplingStrategyResponseUnMarshaler> - getPostSpansMethod = - MethodDescriptor - . - newBuilder() - .setType(MethodDescriptor.MethodType.UNARY) - .setFullMethodName(generateFullMethodName(SERVICE_NAME, "GetSamplingStrategy")) - .setRequestMarshaller(REQUEST_MARSHALLER) - .setResponseMarshaller(RESPONSE_MARSHALLER) - .build(); - - private static byte[] readAllBytes(InputStream inputStream) throws IOException { - int bufLen = 4 * 0x400; // 4KB - byte[] buf = new byte[bufLen]; - int readLen; - IOException exception = null; - - try { - try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { - while ((readLen = inputStream.read(buf, 0, bufLen)) != -1) { - outputStream.write(buf, 0, readLen); - } - return outputStream.toByteArray(); - } - } catch (IOException e) { - exception = e; - throw e; - } finally { - if (exception == null) { - inputStream.close(); - } else { - try { - inputStream.close(); - } catch (IOException e) { - exception.addSuppressed(e); - } - } - } - } - - private MarshallerRemoteSamplerServiceGrpc() {} -} diff --git a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/OkHttpGrpcService.java b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/OkHttpGrpcService.java deleted file mode 100644 index 263c33375f8..00000000000 --- a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/OkHttpGrpcService.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.extension.trace.jaeger.sampler; - -import io.opentelemetry.exporter.sender.okhttp.internal.GrpcRequestBody; -import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.common.export.GrpcStatusCode; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.annotation.Nullable; -import okhttp3.Headers; -import okhttp3.HttpUrl; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.RequestBody; -import okhttp3.Response; -import okio.Buffer; -import okio.GzipSource; -import okio.Okio; - -final class OkHttpGrpcService implements GrpcService { - - private static final String GRPC_STATUS = "grpc-status"; - private static final String GRPC_MESSAGE = "grpc-message"; - - private static final Logger logger = Logger.getLogger(OkHttpGrpcService.class.getName()); - - private final String type; - private final OkHttpClient client; - private final HttpUrl url; - private final Headers headers; - - /** Creates a new {@link OkHttpGrpcService}. */ - OkHttpGrpcService(String type, OkHttpClient client, String endpoint, Headers headers) { - this.type = type; - this.client = client; - this.url = HttpUrl.get(endpoint); - this.headers = headers; - } - - @Override - public SamplingStrategyResponseUnMarshaler execute( - SamplingStrategyParametersMarshaler exportRequest, - SamplingStrategyResponseUnMarshaler responseUnmarshaller) { - Request.Builder requestBuilder = new Request.Builder().url(url).headers(headers); - - RequestBody requestBody = new GrpcRequestBody(exportRequest.toBinaryMessageWriter(), null); - requestBuilder.post(requestBody); - - try { - Response response = client.newCall(requestBuilder.build()).execute(); - - byte[] bodyBytes = new byte[0]; - try { - bodyBytes = response.body().bytes(); - } catch (IOException ignored) { - // It's unlikely a transport exception would actually be useful in debugging. There may - // be gRPC status information available handled below though, so ignore this exception - // and continue through gRPC error handling logic. In the worst case we will record the - // HTTP error. - } - - GrpcStatusCode status = grpcStatus(response); - if (GrpcStatusCode.OK == status) { - if (bodyBytes.length > 5) { - ByteArrayInputStream bodyStream = new ByteArrayInputStream(bodyBytes); - bodyStream.skip(5); - if (bodyBytes[0] == '1') { - Buffer buffer = new Buffer(); - buffer.readFrom(bodyStream); - GzipSource gzipSource = new GzipSource(buffer); - bodyBytes = Okio.buffer(gzipSource).getBuffer().readByteArray(); - } else { - bodyBytes = Arrays.copyOfRange(bodyBytes, 5, bodyBytes.length); - } - responseUnmarshaller.read(bodyBytes); - return responseUnmarshaller; - } - return responseUnmarshaller; - } - - // handle non OK status codes - String errorMessage = grpcMessage(response); - if (GrpcStatusCode.UNIMPLEMENTED == status) { - logger.log( - Level.SEVERE, - "Failed to execute " - + type - + "s. Server responded with UNIMPLEMENTED. " - + "Full error message: " - + errorMessage); - } else if (GrpcStatusCode.UNAVAILABLE == status) { - logger.log( - Level.SEVERE, - "Failed to execute " - + type - + "s. Server is UNAVAILABLE. " - + "Make sure your service is running and reachable from this network. " - + "Full error message:" - + errorMessage); - } else { - String codeMessage = - status != null ? "gRPC status code " + status : "HTTP status code " + response.code(); - logger.log( - Level.WARNING, - "Failed to execute " - + type - + "s. Server responded with " - + codeMessage - + ". Error message: " - + errorMessage); - } - } catch (IOException e) { - logger.log( - Level.SEVERE, - "Failed to execute " - + type - + "s. The request could not be executed. Full error message: " - + e.getMessage()); - } - - return responseUnmarshaller; - } - - @Nullable - private static GrpcStatusCode grpcStatus(Response response) { - // Status can either be in the headers or trailers depending on error - String grpcStatus = response.header(GRPC_STATUS); - if (grpcStatus == null) { - try { - grpcStatus = response.trailers().get(GRPC_STATUS); - if (grpcStatus == null) { - return null; - } - } catch (IOException e) { - // Could not read a status, this generally means the HTTP status is the error. - return null; - } - } - try { - return GrpcStatusCode.fromValue(Integer.parseInt(grpcStatus)); - } catch (NumberFormatException ex) { - // If grpcStatus is not an integer, it's not a valid grpc status code - return null; - } - } - - private static String grpcMessage(Response response) { - String message = response.header(GRPC_MESSAGE); - if (message == null) { - try { - message = response.trailers().get(GRPC_MESSAGE); - } catch (IOException e) { - // Fall through - } - } - if (message != null) { - return unescape(message); - } - // Couldn't get message for some reason, use the HTTP status. - return response.message(); - } - - @Override - public CompletableResultCode shutdown() { - client.dispatcher().cancelAll(); - client.dispatcher().executorService().shutdownNow(); - client.connectionPool().evictAll(); - return CompletableResultCode.ofSuccess(); - } - - // From grpc-java - - /** Unescape the provided ascii to a unicode {@link String}. */ - private static String unescape(String value) { - for (int i = 0; i < value.length(); i++) { - char c = value.charAt(i); - if (c < ' ' || c >= '~' || (c == '%' && i + 2 < value.length())) { - return doUnescape(value.getBytes(StandardCharsets.US_ASCII)); - } - } - return value; - } - - private static String doUnescape(byte[] value) { - ByteBuffer buf = ByteBuffer.allocate(value.length); - for (int i = 0; i < value.length; ) { - if (value[i] == '%' && i + 2 < value.length) { - try { - buf.put((byte) Integer.parseInt(new String(value, i + 1, 2, StandardCharsets.UTF_8), 16)); - i += 3; - continue; - } catch (NumberFormatException e) { - // ignore, fall through, just push the bytes. - } - } - buf.put(value[i]); - i += 1; - } - return new String(buf.array(), 0, buf.position(), StandardCharsets.UTF_8); - } -} diff --git a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/SamplingStrategyResponseUnMarshaler.java b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/SamplingStrategyResponseUnMarshaler.java index 0c06df5c16a..18ddecf437a 100644 --- a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/SamplingStrategyResponseUnMarshaler.java +++ b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/SamplingStrategyResponseUnMarshaler.java @@ -7,26 +7,16 @@ import io.opentelemetry.exporter.internal.marshal.CodedInputStream; import java.io.IOException; -import javax.annotation.Nullable; class SamplingStrategyResponseUnMarshaler { - @Nullable private SamplingStrategyResponse samplingStrategyResponse; + private SamplingStrategyResponseUnMarshaler() {} - @Nullable - public SamplingStrategyResponse get() { - return samplingStrategyResponse; - } - - public void read(byte[] payload) throws IOException { + static SamplingStrategyResponse read(byte[] payload) throws IOException { SamplingStrategyResponse.Builder responseBuilder = new SamplingStrategyResponse.Builder(); - try { - CodedInputStream codedInputStream = CodedInputStream.newInstance(payload); - parseResponse(responseBuilder, codedInputStream); - samplingStrategyResponse = responseBuilder.build(); - } catch (IOException ex) { - // use null message - } + CodedInputStream codedInputStream = CodedInputStream.newInstance(payload); + parseResponse(responseBuilder, codedInputStream); + return responseBuilder.build(); } private static void parseResponse( diff --git a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/UpstreamGrpcService.java b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/UpstreamGrpcService.java deleted file mode 100644 index a6ef1e56e0c..00000000000 --- a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/UpstreamGrpcService.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.extension.trace.jaeger.sampler; - -import com.google.common.util.concurrent.Futures; -import io.grpc.CallOptions; -import io.grpc.ManagedChannel; -import io.grpc.MethodDescriptor; -import io.grpc.Status; -import io.grpc.stub.ClientCalls; -import io.opentelemetry.exporter.internal.grpc.ManagedChannelUtil; -import io.opentelemetry.sdk.common.CompletableResultCode; -import java.time.Duration; -import java.util.Objects; -import java.util.logging.Level; -import java.util.logging.Logger; - -final class UpstreamGrpcService implements GrpcService { - - private static final Logger logger = Logger.getLogger(UpstreamGrpcService.class.getName()); - - private final String type; - private final ManagedChannel managedChannel; - private final MethodDescriptor< - SamplingStrategyParametersMarshaler, SamplingStrategyResponseUnMarshaler> - methodDescriptor; - private final long timeoutNanos; - - /** Creates a new {@link UpstreamGrpcService}. */ - UpstreamGrpcService( - String type, - ManagedChannel channel, - MethodDescriptor - methodDescriptor, - long timeoutNanos) { - this.type = type; - this.managedChannel = channel; - this.timeoutNanos = timeoutNanos; - this.methodDescriptor = methodDescriptor; - } - - @Override - public SamplingStrategyResponseUnMarshaler execute( - SamplingStrategyParametersMarshaler exportRequest, - SamplingStrategyResponseUnMarshaler responseUnmarshaller) { - - CallOptions callOptions = CallOptions.DEFAULT; - - if (timeoutNanos > 0) { - callOptions = callOptions.withDeadlineAfter(Duration.ofNanos(timeoutNanos)); - } - - try { - return Objects.requireNonNull( - Futures.getUnchecked( - ClientCalls.futureUnaryCall( - managedChannel.newCall(methodDescriptor, callOptions), exportRequest))); - } catch (Throwable t) { - Status status = Status.fromThrowable(t); - - if (status.getCode().equals(Status.Code.UNIMPLEMENTED)) { - logger.log( - Level.SEVERE, - "Failed to execute " - + type - + "s. Server responded with UNIMPLEMENTED. " - + "Full error message: " - + status.getDescription()); - } else if (status.getCode().equals(Status.Code.UNAVAILABLE)) { - logger.log( - Level.SEVERE, - "Failed to execute " - + type - + "s. Server is UNAVAILABLE. " - + "Make sure your service is running and reachable from this network. " - + "Full error message:" - + status.getDescription()); - } else { - logger.log( - Level.WARNING, - "Failed to execute " - + type - + "s. Server responded with gRPC status code " - + status.getCode().value() - + ". Error message: " - + status.getDescription()); - } - } - - return responseUnmarshaller; - } - - @Override - public CompletableResultCode shutdown() { - if (managedChannel.isTerminated()) { - return CompletableResultCode.ofSuccess(); - } - return ManagedChannelUtil.shutdownChannel(managedChannel); - } -} diff --git a/sdk-extensions/jaeger-remote-sampler/src/test/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerProviderTest.java b/sdk-extensions/jaeger-remote-sampler/src/test/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerProviderTest.java index e691b576f18..c70d5b8a1ed 100644 --- a/sdk-extensions/jaeger-remote-sampler/src/test/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerProviderTest.java +++ b/sdk-extensions/jaeger-remote-sampler/src/test/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerProviderTest.java @@ -15,7 +15,6 @@ import io.opentelemetry.sdk.trace.samplers.Sampler; import java.util.HashMap; import java.util.ServiceLoader; -import okhttp3.HttpUrl; import org.junit.jupiter.api.Test; public class JaegerRemoteSamplerProviderTest { @@ -53,11 +52,13 @@ void serviceProvider() { .isEqualTo(sampler.toString()); assertThat(s).extracting("serviceName").isEqualTo("test_service"); assertThat(s) - .extracting("delegate") + .extracting("grpcSender") .extracting("url") - .isEqualTo( - HttpUrl.get( - "http://localhost:9999/jaeger.api_v2.SamplingManager/GetSamplingStrategy")); + .satisfies( + url -> + assertThat(url.toString()) + .isEqualTo( + "http://localhost:9999/jaeger.api_v2.SamplingManager/GetSamplingStrategy")); } }); } diff --git a/sdk-extensions/jaeger-remote-sampler/src/test/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerTest.java b/sdk-extensions/jaeger-remote-sampler/src/test/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerTest.java index c9351790d75..14acd5c78d8 100644 --- a/sdk-extensions/jaeger-remote-sampler/src/test/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerTest.java +++ b/sdk-extensions/jaeger-remote-sampler/src/test/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerTest.java @@ -22,7 +22,9 @@ import io.github.netmikey.logunit.api.LogCapturer; import io.netty.handler.ssl.ClientAuth; import io.opentelemetry.exporter.internal.TlsUtil; +import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSender; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; +import io.opentelemetry.sdk.common.export.GrpcStatusCode; import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.Sampling; import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.Sampling.RateLimitingSamplingStrategy; import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.Sampling.SamplingStrategyType; @@ -55,7 +57,7 @@ import org.slf4j.event.Level; import org.slf4j.event.LoggingEvent; -@SuppressLogger(OkHttpGrpcService.class) +@SuppressLogger(JaegerRemoteSampler.class) class JaegerRemoteSamplerTest { private static final String SERVICE_NAME = "my-service"; @@ -67,12 +69,12 @@ class JaegerRemoteSamplerTest { private static final ConcurrentLinkedQueue responses = new ConcurrentLinkedQueue<>(); - private static void addGrpcError(int code, @Nullable String message) { - grpcErrors.add(new ArmeriaStatusException(code, message)); + private static void addGrpcError(GrpcStatusCode code, @Nullable String message) { + grpcErrors.add(new ArmeriaStatusException(code.getValue(), message)); } @RegisterExtension - LogCapturer logs = LogCapturer.create().captureForType(OkHttpGrpcService.class, Level.TRACE); + LogCapturer logs = LogCapturer.create().captureForType(JaegerRemoteSampler.class, Level.TRACE); @Order(1) @RegisterExtension @@ -90,7 +92,7 @@ private static void addGrpcError(int code, @Nullable String message) { @Override protected void configure(ServerBuilder sb) { sb.service( - JaegerRemoteSamplerBuilder.GRPC_ENDPOINT_PATH, + "/jaeger.api_v2.SamplingManager/GetSamplingStrategy", new AbstractUnaryGrpcService() { @Override protected CompletionStage handleMessage( @@ -142,7 +144,7 @@ void connectionWorks() { .setPollingInterval(1, TimeUnit.SECONDS) .setServiceName(SERVICE_NAME) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(OkHttpGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(OkHttpGrpcSender.class); await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class)); @@ -160,7 +162,7 @@ void tlsConnectionWorks() throws IOException { .setTrustedCertificates(Files.readAllBytes(certificate.certificateFile().toPath())) .setServiceName(SERVICE_NAME) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(OkHttpGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(OkHttpGrpcSender.class); await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class)); @@ -181,7 +183,7 @@ void clientTlsConnectionWorks(byte[] privateKey) throws IOException { privateKey, Files.readAllBytes(clientCertificate.certificateFile().toPath())) .setServiceName(SERVICE_NAME) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(OkHttpGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(OkHttpGrpcSender.class); await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class)); @@ -220,7 +222,7 @@ void tlsViaSslContext() throws Exception { .setSslContext(sslContext, trustManager) .setServiceName(SERVICE_NAME) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(OkHttpGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(OkHttpGrpcSender.class); await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class)); @@ -237,7 +239,7 @@ void description() { .setPollingInterval(1, TimeUnit.SECONDS) .setServiceName(SERVICE_NAME) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(OkHttpGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(OkHttpGrpcSender.class); assertThat(sampler.getDescription()) .startsWith("JaegerRemoteSampler{ParentBased{root:TraceIdRatioBased{0.001000}"); @@ -255,7 +257,7 @@ void initialSampler() { .setServiceName(SERVICE_NAME) .setInitialSampler(Sampler.alwaysOn()) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(OkHttpGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(OkHttpGrpcSender.class); assertThat(sampler.getDescription()).startsWith("JaegerRemoteSampler{AlwaysOnSampler}"); } @@ -269,7 +271,7 @@ void pollingInterval() { .setServiceName(SERVICE_NAME) .setPollingInterval(1, TimeUnit.MILLISECONDS) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(OkHttpGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(OkHttpGrpcSender.class); // wait until the sampling strategy is retrieved before exiting test method await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class)); @@ -284,7 +286,7 @@ void pollingInterval_duration() { .setServiceName(SERVICE_NAME) .setPollingInterval(Duration.ofMillis(1)) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(OkHttpGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(OkHttpGrpcSender.class); // wait until the sampling strategy is retrieved before exiting test method await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class)); @@ -329,7 +331,7 @@ void perOperationSampling() { // Make sure only polls once. .setPollingInterval(500, TimeUnit.SECONDS) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(OkHttpGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(OkHttpGrpcSender.class); await() .untilAsserted( @@ -343,9 +345,8 @@ void perOperationSampling() { } @Test - @SuppressLogger(OkHttpGrpcService.class) void internal_error_server_response() { - addGrpcError(13, "internal error"); + addGrpcError(GrpcStatusCode.INTERNAL, "internal error"); try (JaegerRemoteSampler sampler = JaegerRemoteSampler.builder() @@ -353,7 +354,7 @@ void internal_error_server_response() { .setServiceName(SERVICE_NAME) .setPollingInterval(50, TimeUnit.MILLISECONDS) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(OkHttpGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(OkHttpGrpcSender.class); assertThat(sampler.getDescription()) .startsWith("JaegerRemoteSampler{ParentBased{root:TraceIdRatioBased{0.001000}"); @@ -369,9 +370,8 @@ void internal_error_server_response() { } @Test - @SuppressLogger(OkHttpGrpcService.class) void unavailable_error_server_response() { - addGrpcError(14, "クマ🐻 resource exhausted"); + addGrpcError(GrpcStatusCode.UNAVAILABLE, "クマ🐻 resource exhausted"); try (JaegerRemoteSampler sampler = JaegerRemoteSampler.builder() @@ -379,7 +379,7 @@ void unavailable_error_server_response() { .setServiceName(SERVICE_NAME) .setPollingInterval(50, TimeUnit.MILLISECONDS) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(OkHttpGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(OkHttpGrpcSender.class); assertThat(sampler.getDescription()) .startsWith("JaegerRemoteSampler{ParentBased{root:TraceIdRatioBased{0.001000}"); @@ -394,9 +394,8 @@ void unavailable_error_server_response() { } @Test - @SuppressLogger(OkHttpGrpcService.class) void unimplemented_error_server_response() { - addGrpcError(12, null); + addGrpcError(GrpcStatusCode.UNIMPLEMENTED, null); try (JaegerRemoteSampler sampler = JaegerRemoteSampler.builder() @@ -404,7 +403,7 @@ void unimplemented_error_server_response() { .setServiceName(SERVICE_NAME) .setPollingInterval(50, TimeUnit.MILLISECONDS) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(OkHttpGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(OkHttpGrpcSender.class); assertThat(sampler.getDescription()) .startsWith("JaegerRemoteSampler{ParentBased{root:TraceIdRatioBased{0.001000}"); diff --git a/sdk-extensions/jaeger-remote-sampler/src/testGrpcNetty/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerGrpcNettyTest.java b/sdk-extensions/jaeger-remote-sampler/src/testGrpcNetty/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerGrpcNettyTest.java index 257ddf3b139..af29d51281a 100644 --- a/sdk-extensions/jaeger-remote-sampler/src/testGrpcNetty/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerGrpcNettyTest.java +++ b/sdk-extensions/jaeger-remote-sampler/src/testGrpcNetty/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerGrpcNettyTest.java @@ -17,7 +17,9 @@ import io.github.netmikey.logunit.api.LogCapturer; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSender; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; +import io.opentelemetry.sdk.common.export.GrpcStatusCode; import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.Sampling; import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.Sampling.RateLimitingSamplingStrategy; import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.Sampling.SamplingStrategyType; @@ -36,7 +38,7 @@ import org.slf4j.event.Level; import org.slf4j.event.LoggingEvent; -@SuppressLogger(UpstreamGrpcService.class) +@SuppressLogger(JaegerRemoteSampler.class) @SuppressWarnings("deprecation") // Testing deprecated code class JaegerRemoteSamplerGrpcNettyTest { @@ -49,12 +51,12 @@ class JaegerRemoteSamplerGrpcNettyTest { private static final ConcurrentLinkedQueue responses = new ConcurrentLinkedQueue<>(); - private static void addGrpcError(int code, @Nullable String message) { - grpcErrors.add(new ArmeriaStatusException(code, message)); + private static void addGrpcError(GrpcStatusCode code, @Nullable String message) { + grpcErrors.add(new ArmeriaStatusException(code.getValue(), message)); } @RegisterExtension - LogCapturer logs = LogCapturer.create().captureForType(UpstreamGrpcService.class, Level.TRACE); + LogCapturer logs = LogCapturer.create().captureForType(JaegerRemoteSampler.class, Level.TRACE); @Order(1) @RegisterExtension @@ -67,7 +69,7 @@ private static void addGrpcError(int code, @Nullable String message) { @Override protected void configure(ServerBuilder sb) { sb.service( - JaegerRemoteSamplerBuilder.GRPC_ENDPOINT_PATH, + "/jaeger.api_v2.SamplingManager/GetSamplingStrategy", new AbstractUnaryGrpcService() { @Override protected CompletionStage handleMessage( @@ -117,7 +119,7 @@ void connectionWorks() { .setPollingInterval(1, TimeUnit.SECONDS) .setServiceName(SERVICE_NAME) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(UpstreamGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(UpstreamGrpcSender.class); await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class)); @@ -134,7 +136,7 @@ void description() { .setPollingInterval(1, TimeUnit.SECONDS) .setServiceName(SERVICE_NAME) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(UpstreamGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(UpstreamGrpcSender.class); assertThat(sampler.getDescription()) .startsWith("JaegerRemoteSampler{ParentBased{root:TraceIdRatioBased{0.001000}"); @@ -152,7 +154,7 @@ void initialSampler() { .setServiceName(SERVICE_NAME) .setInitialSampler(Sampler.alwaysOn()) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(UpstreamGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(UpstreamGrpcSender.class); assertThat(sampler.getDescription()).startsWith("JaegerRemoteSampler{AlwaysOnSampler}"); } @@ -166,7 +168,7 @@ void pollingInterval() { .setServiceName(SERVICE_NAME) .setPollingInterval(1, TimeUnit.MILLISECONDS) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(UpstreamGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(UpstreamGrpcSender.class); // wait until the sampling strategy is retrieved before exiting test method await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class)); @@ -181,7 +183,7 @@ void pollingInterval_duration() { .setServiceName(SERVICE_NAME) .setPollingInterval(Duration.ofMillis(1)) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(UpstreamGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(UpstreamGrpcSender.class); // wait until the sampling strategy is retrieved before exiting test method await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class)); @@ -226,7 +228,7 @@ void perOperationSampling() { // Make sure only polls once. .setPollingInterval(500, TimeUnit.SECONDS) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(UpstreamGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(UpstreamGrpcSender.class); await() .untilAsserted( @@ -241,7 +243,7 @@ void perOperationSampling() { @Test void internal_error_server_response() { - addGrpcError(13, "internal error"); + addGrpcError(GrpcStatusCode.INTERNAL, "internal error"); try (JaegerRemoteSampler sampler = JaegerRemoteSampler.builder() @@ -250,7 +252,7 @@ void internal_error_server_response() { // Make sure only polls once. .setPollingInterval(500, TimeUnit.SECONDS) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(UpstreamGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(UpstreamGrpcSender.class); assertThat(sampler.getDescription()) .startsWith("JaegerRemoteSampler{ParentBased{root:TraceIdRatioBased{0.001000}"); @@ -259,7 +261,7 @@ void internal_error_server_response() { .untilAsserted( () -> { LoggingEvent log = - logs.assertContains(" Server responded with gRPC status code 13"); + logs.assertContains(" Server responded with gRPC status code INTERNAL"); assertThat(log.getLevel()).isEqualTo(Level.WARN); }); } @@ -267,7 +269,7 @@ void internal_error_server_response() { @Test void unavailable_error_server_response() { - addGrpcError(14, "クマ🐻 resource exhausted"); + addGrpcError(GrpcStatusCode.UNAVAILABLE, "クマ🐻 resource exhausted"); try (JaegerRemoteSampler sampler = JaegerRemoteSampler.builder() @@ -276,7 +278,7 @@ void unavailable_error_server_response() { // Make sure only polls once. .setPollingInterval(500, TimeUnit.SECONDS) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(UpstreamGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(UpstreamGrpcSender.class); assertThat(sampler.getDescription()) .startsWith("JaegerRemoteSampler{ParentBased{root:TraceIdRatioBased{0.001000}"); @@ -292,7 +294,7 @@ void unavailable_error_server_response() { @Test void unimplemented_error_server_response() { - addGrpcError(12, null); + addGrpcError(GrpcStatusCode.UNIMPLEMENTED, null); try (JaegerRemoteSampler sampler = JaegerRemoteSampler.builder() @@ -301,7 +303,7 @@ void unimplemented_error_server_response() { // Make sure only polls once. .setPollingInterval(500, TimeUnit.SECONDS) .build()) { - assertThat(sampler).extracting("delegate").isInstanceOf(UpstreamGrpcService.class); + assertThat(sampler).extracting("grpcSender").isInstanceOf(UpstreamGrpcSender.class); assertThat(sampler.getDescription()) .startsWith("JaegerRemoteSampler{ParentBased{root:TraceIdRatioBased{0.001000}");