handler) {
- group.submit(() -> handler.completed(0L, attach));
+ group.execute(() -> handler.completed(0L, attach));
}
/**
diff --git a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java
index d9b1420a6e3..5150149fa6a 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java
@@ -43,6 +43,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@@ -65,7 +66,7 @@
* instance of this class is a singleton-like object that manages a thread pool that makes it
* possible to run a group of asynchronous channels.
*/
-public class AsynchronousTlsChannelGroup {
+public class AsynchronousTlsChannelGroup implements Executor {
private static final Logger LOGGER = Loggers.getLogger("connection.tls");
@@ -224,8 +225,16 @@ public AsynchronousTlsChannelGroup(@Nullable final ExecutorService executorServi
selectorThread.start();
}
- void submit(final Runnable r) {
- executor.submit(r);
+
+ @Override
+ public void execute(final Runnable r) {
+ executor.execute(() -> {
+ try {
+ r.run();
+ } catch (Throwable t) {
+ LOGGER.error(null, t);
+ }
+ });
}
RegisteredSocket registerSocket(TlsChannel reader, SocketChannel socketChannel) {
diff --git a/driver-core/src/main/com/mongodb/internal/observability/micrometer/TracingManager.java b/driver-core/src/main/com/mongodb/internal/observability/micrometer/TracingManager.java
index b26cb396e7b..4b08dd9a15c 100644
--- a/driver-core/src/main/com/mongodb/internal/observability/micrometer/TracingManager.java
+++ b/driver-core/src/main/com/mongodb/internal/observability/micrometer/TracingManager.java
@@ -39,6 +39,8 @@
import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.COMMAND_NAME;
import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.CURSOR_ID;
import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.NAMESPACE;
+import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.OPERATION_NAME;
+import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.OPERATION_SUMMARY;
import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.NETWORK_TRANSPORT;
import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.QUERY_SUMMARY;
import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.SERVER_ADDRESS;
@@ -266,4 +268,47 @@ public Span createTracingSpan(final CommandMessage message,
return span;
}
+
+ /**
+ * Creates an operation-level tracing span for a database command.
+ *
+ * The span is named "{commandName} {database}[.{collection}]" and tagged with standard
+ * low-cardinality attributes (system, namespace, collection, operation name, operation summary).
+ * The span is also set on the {@link OperationContext} for use by downstream command-level tracing.
+ *
+ * @param transactionSpan the active transaction span (for parent context), or null
+ * @param operationContext the operation context to attach the span to
+ * @param commandName the name of the command (e.g. "find", "insert")
+ * @param namespace the MongoDB namespace for the operation
+ * @return the created span, or null if tracing is disabled
+ */
+ @Nullable
+ public Span createOperationSpan(@Nullable final TransactionSpan transactionSpan,
+ final OperationContext operationContext, final String commandName, final MongoNamespace namespace) {
+ if (!isEnabled()) {
+ return null;
+ }
+ TraceContext parentContext = null;
+ if (transactionSpan != null) {
+ parentContext = transactionSpan.getContext();
+ }
+ String name = commandName + " " + namespace.getDatabaseName()
+ + (MongoNamespaceHelper.COMMAND_COLLECTION_NAME.equalsIgnoreCase(namespace.getCollectionName())
+ ? ""
+ : "." + namespace.getCollectionName());
+
+ KeyValues keyValues = KeyValues.of(
+ SYSTEM.withValue("mongodb"),
+ NAMESPACE.withValue(namespace.getDatabaseName()));
+ if (!MongoNamespaceHelper.COMMAND_COLLECTION_NAME.equalsIgnoreCase(namespace.getCollectionName())) {
+ keyValues = keyValues.and(COLLECTION.withValue(namespace.getCollectionName()));
+ }
+ keyValues = keyValues.and(OPERATION_NAME.withValue(commandName),
+ OPERATION_SUMMARY.withValue(name));
+
+ Span span = addSpan(name, parentContext, namespace);
+ span.tagLowCardinality(keyValues);
+ operationContext.setTracingSpan(span);
+ return span;
+ }
}
diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy
index 2d7dc04d758..f1585f82595 100644
--- a/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy
+++ b/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy
@@ -52,6 +52,7 @@ class CommandHelperSpecification extends Specification {
}
def cleanup() {
+ InternalStreamConnection.setRecordEverything(false)
connection?.close()
}
@@ -81,5 +82,4 @@ class CommandHelperSpecification extends Specification {
!receivedDocument
receivedException instanceof MongoCommandException
}
-
}
diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java b/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java
index fc5926b3bad..81e778b4a61 100644
--- a/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java
+++ b/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java
@@ -127,7 +127,7 @@ public void shouldThrowOnTimeout() throws InterruptedException {
// when
TimeoutTrackingConnectionGetter connectionGetter = new TimeoutTrackingConnectionGetter(provider, timeoutSettings);
- cachedExecutor.submit(connectionGetter);
+ cachedExecutor.execute(connectionGetter);
connectionGetter.getLatch().await();
@@ -152,7 +152,7 @@ public void shouldNotUseMaxAwaitTimeMSWhenTimeoutMsIsSet() throws InterruptedExc
// when
TimeoutTrackingConnectionGetter connectionGetter = new TimeoutTrackingConnectionGetter(provider, timeoutSettings);
- cachedExecutor.submit(connectionGetter);
+ cachedExecutor.execute(connectionGetter);
sleep(70); // wait for more than maxWaitTimeMS but less than timeoutMs.
internalConnection.close();
diff --git a/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java b/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java
index be4526aada7..5f736f421c2 100644
--- a/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java
+++ b/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java
@@ -331,9 +331,10 @@ static Stream shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS()
);
}
- @ParameterizedTest
- @MethodSource
@DisplayName("should choose timeoutMS when timeoutMS is less than connectTimeoutMS")
+ @ParameterizedTest(name = "should choose timeoutMS when timeoutMS is less than connectTimeoutMS. "
+ + "Parameters: connectTimeoutMS: {0}, timeoutMS: {1}, expected: {2}")
+ @MethodSource
void shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS(final Long connectTimeoutMS,
final Long timeoutMS,
final long expected) {
@@ -345,7 +346,7 @@ void shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS(final Long connectTim
0));
long calculatedTimeoutMS = timeoutContext.getConnectTimeoutMs();
- assertTrue(expected - calculatedTimeoutMS <= 1);
+ assertTrue(expected - calculatedTimeoutMS <= 2);
}
private TimeoutContextTest() {
diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ClientSession.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ClientSession.kt
index 6c53a1faf47..cbe308eece0 100644
--- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ClientSession.kt
+++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ClientSession.kt
@@ -19,6 +19,7 @@ import com.mongodb.ClientSessionOptions
import com.mongodb.ServerAddress
import com.mongodb.TransactionOptions
import com.mongodb.internal.TimeoutContext
+import com.mongodb.internal.observability.micrometer.TransactionSpan
import com.mongodb.reactivestreams.client.ClientSession as reactiveClientSession
import com.mongodb.session.ClientSession as jClientSession
import com.mongodb.session.ServerSession
@@ -58,6 +59,9 @@ public class ClientSession(public val wrapped: reactiveClientSession) : jClientS
*/
public fun notifyOperationInitiated(operation: Any): Unit = wrapped.notifyOperationInitiated(operation)
+ /** Get the transaction span (if started). */
+ public fun getTransactionSpan(): TransactionSpan? = wrapped.transactionSpan
+
/**
* Get the server address of the pinned mongos on this session. For internal use only.
*
diff --git a/driver-reactive-streams/build.gradle.kts b/driver-reactive-streams/build.gradle.kts
index dab192e2583..b55dd95d683 100644
--- a/driver-reactive-streams/build.gradle.kts
+++ b/driver-reactive-streams/build.gradle.kts
@@ -15,6 +15,7 @@
*/
import ProjectExtensions.configureJarManifest
import ProjectExtensions.configureMavenPublication
+import project.DEFAULT_JAVA_VERSION
plugins {
id("project.java")
@@ -36,6 +37,9 @@ dependencies {
implementation(libs.project.reactor.core)
compileOnly(project(path = ":mongodb-crypt", configuration = "default"))
+ optionalImplementation(platform(libs.micrometer.observation.bom))
+ optionalImplementation(libs.micrometer.observation)
+
testImplementation(libs.project.reactor.test)
testImplementation(project(path = ":driver-sync", configuration = "default"))
testImplementation(project(path = ":bson", configuration = "testArtifacts"))
@@ -45,11 +49,20 @@ dependencies {
// Reactive Streams TCK testing
testImplementation(libs.reactive.streams.tck)
- // Tracing
+ // Tracing testing
testImplementation(platform(libs.micrometer.tracing.integration.test.bom))
testImplementation(libs.micrometer.tracing.integration.test) { exclude(group = "org.junit.jupiter") }
}
+tasks.withType {
+ // Needed for MicrometerProseTest to set env variable programmatically (calls
+ // `field.setAccessible(true)`)
+ val testJavaVersion: Int = findProperty("javaVersion")?.toString()?.toInt() ?: DEFAULT_JAVA_VERSION
+ if (testJavaVersion >= DEFAULT_JAVA_VERSION) {
+ jvmArgs("--add-opens=java.base/java.util=ALL-UNNAMED")
+ }
+}
+
configureMavenPublication {
pom {
name.set("The MongoDB Reactive Streams Driver")
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/ClientSession.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/ClientSession.java
index 3d9354e9ae9..fe58864fad0 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/ClientSession.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/ClientSession.java
@@ -18,6 +18,8 @@
package com.mongodb.reactivestreams.client;
import com.mongodb.TransactionOptions;
+import com.mongodb.internal.observability.micrometer.TransactionSpan;
+import com.mongodb.lang.Nullable;
import org.reactivestreams.Publisher;
/**
@@ -94,4 +96,13 @@ public interface ClientSession extends com.mongodb.session.ClientSession {
* @mongodb.server.release 4.0
*/
Publisher abortTransaction();
+
+ /**
+ * Get the transaction span (if started).
+ *
+ * @return the transaction span
+ * @since 5.7
+ */
+ @Nullable
+ TransactionSpan getTransactionSpan();
}
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionHelper.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionHelper.java
index 30714a6a576..b5e94c02975 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionHelper.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionHelper.java
@@ -18,6 +18,7 @@
import com.mongodb.ClientSessionOptions;
import com.mongodb.TransactionOptions;
+import com.mongodb.internal.observability.micrometer.TracingManager;
import com.mongodb.internal.session.ServerSessionPool;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ClientSession;
@@ -31,10 +32,13 @@
public class ClientSessionHelper {
private final MongoClientImpl mongoClient;
private final ServerSessionPool serverSessionPool;
+ private final TracingManager tracingManager;
- public ClientSessionHelper(final MongoClientImpl mongoClient, final ServerSessionPool serverSessionPool) {
+ public ClientSessionHelper(final MongoClientImpl mongoClient, final ServerSessionPool serverSessionPool,
+ final TracingManager tracingManager) {
this.mongoClient = mongoClient;
this.serverSessionPool = serverSessionPool;
+ this.tracingManager = tracingManager;
}
Mono withClientSession(@Nullable final ClientSession clientSessionFromOperation, final OperationExecutor executor) {
@@ -62,6 +66,6 @@ ClientSession createClientSession(final ClientSessionOptions options, final Oper
.readPreference(mongoClient.getSettings().getReadPreference())
.build()))
.build();
- return new ClientSessionPublisherImpl(serverSessionPool, mongoClient, mergedOptions, executor);
+ return new ClientSessionPublisherImpl(serverSessionPool, mongoClient, mergedOptions, executor, tracingManager);
}
}
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java
index 5cf0ea103bd..511f9f62c6b 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java
@@ -24,6 +24,8 @@
import com.mongodb.TransactionOptions;
import com.mongodb.WriteConcern;
import com.mongodb.internal.TimeoutContext;
+import com.mongodb.internal.observability.micrometer.TracingManager;
+import com.mongodb.internal.observability.micrometer.TransactionSpan;
import com.mongodb.internal.operation.AbortTransactionOperation;
import com.mongodb.internal.operation.CommitTransactionOperation;
import com.mongodb.internal.operation.ReadOperation;
@@ -48,17 +50,21 @@ final class ClientSessionPublisherImpl extends BaseClientSessionImpl implements
private final MongoClientImpl mongoClient;
private final OperationExecutor executor;
+ private final TracingManager tracingManager;
private TransactionState transactionState = TransactionState.NONE;
private boolean messageSentInCurrentTransaction;
private boolean commitInProgress;
private TransactionOptions transactionOptions;
+ @Nullable
+ private TransactionSpan transactionSpan;
ClientSessionPublisherImpl(final ServerSessionPool serverSessionPool, final MongoClientImpl mongoClient,
- final ClientSessionOptions options, final OperationExecutor executor) {
+ final ClientSessionOptions options, final OperationExecutor executor, final TracingManager tracingManager) {
super(serverSessionPool, mongoClient, options);
this.executor = executor;
this.mongoClient = mongoClient;
+ this.tracingManager = tracingManager;
}
@Override
@@ -128,6 +134,10 @@ public void startTransaction(final TransactionOptions transactionOptions) {
if (!writeConcern.isAcknowledged()) {
throw new MongoClientException("Transactions do not support unacknowledged write concern");
}
+
+ if (tracingManager.isEnabled()) {
+ transactionSpan = new TransactionSpan(tracingManager);
+ }
clearTransactionContext();
setTimeoutContext(timeoutContext);
}
@@ -152,6 +162,9 @@ public Publisher commitTransaction() {
}
if (!messageSentInCurrentTransaction) {
cleanupTransaction(TransactionState.COMMITTED);
+ if (transactionSpan != null) {
+ transactionSpan.finalizeTransactionSpan(TransactionState.COMMITTED.name());
+ }
return Mono.create(MonoSink::success);
} else {
ReadConcern readConcern = transactionOptions.getReadConcern();
@@ -171,7 +184,17 @@ public Publisher commitTransaction() {
commitInProgress = false;
transactionState = TransactionState.COMMITTED;
})
- .doOnError(MongoException.class, this::clearTransactionContextOnError);
+ .doOnError(MongoException.class, e -> {
+ clearTransactionContextOnError(e);
+ if (transactionSpan != null) {
+ transactionSpan.handleTransactionSpanError(e);
+ }
+ })
+ .doOnSuccess(v -> {
+ if (transactionSpan != null) {
+ transactionSpan.finalizeTransactionSpan(TransactionState.COMMITTED.name());
+ }
+ });
}
});
}
@@ -191,6 +214,9 @@ public Publisher abortTransaction() {
}
if (!messageSentInCurrentTransaction) {
cleanupTransaction(TransactionState.ABORTED);
+ if (transactionSpan != null) {
+ transactionSpan.finalizeTransactionSpan(TransactionState.ABORTED.name());
+ }
return Mono.create(MonoSink::success);
} else {
ReadConcern readConcern = transactionOptions.getReadConcern();
@@ -208,6 +234,9 @@ public Publisher abortTransaction() {
.doOnTerminate(() -> {
clearTransactionContext();
cleanupTransaction(TransactionState.ABORTED);
+ if (transactionSpan != null) {
+ transactionSpan.finalizeTransactionSpan(TransactionState.ABORTED.name());
+ }
});
}
});
@@ -219,6 +248,12 @@ private void clearTransactionContextOnError(final MongoException e) {
}
}
+ @Override
+ @Nullable
+ public TransactionSpan getTransactionSpan() {
+ return transactionSpan;
+ }
+
@Override
public void close() {
if (transactionState == TransactionState.IN) {
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java
index 07a17badcd7..8fda2e9294d 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java
@@ -33,6 +33,7 @@
import com.mongodb.internal.connection.Cluster;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
+import com.mongodb.internal.observability.micrometer.TracingManager;
import com.mongodb.internal.session.ServerSessionPool;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
@@ -88,9 +89,10 @@ private MongoClientImpl(final MongoClientSettings settings, final MongoDriverInf
notNull("settings", settings);
notNull("cluster", cluster);
+ TracingManager tracingManager = new TracingManager(settings.getObservabilitySettings());
TimeoutSettings timeoutSettings = TimeoutSettings.create(settings);
ServerSessionPool serverSessionPool = new ServerSessionPool(cluster, timeoutSettings, settings.getServerApi());
- ClientSessionHelper clientSessionHelper = new ClientSessionHelper(this, serverSessionPool);
+ ClientSessionHelper clientSessionHelper = new ClientSessionHelper(this, serverSessionPool, tracingManager);
AutoEncryptionSettings autoEncryptSettings = settings.getAutoEncryptionSettings();
Crypt crypt = autoEncryptSettings != null ? Crypts.createCrypt(settings, autoEncryptSettings) : null;
@@ -100,7 +102,8 @@ private MongoClientImpl(final MongoClientSettings settings, final MongoDriverInf
+ ReactiveContextProvider.class.getName() + " when using the Reactive Streams driver");
}
OperationExecutor operationExecutor = executor != null ? executor
- : new OperationExecutorImpl(this, clientSessionHelper, timeoutSettings, (ReactiveContextProvider) contextProvider);
+ : new OperationExecutorImpl(this, clientSessionHelper, timeoutSettings, (ReactiveContextProvider) contextProvider,
+ tracingManager);
MongoOperationPublisher mongoOperationPublisher = new MongoOperationPublisher<>(Document.class,
withUuidRepresentation(settings.getCodecRegistry(),
settings.getUuidRepresentation()),
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java
index ef18c2c6b1f..62a4431cc9a 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java
@@ -31,10 +31,11 @@
import com.mongodb.internal.binding.AsyncReadWriteBinding;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext;
+import com.mongodb.internal.observability.micrometer.Span;
+import com.mongodb.internal.observability.micrometer.TracingManager;
import com.mongodb.internal.operation.OperationHelper;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteOperation;
-import com.mongodb.internal.observability.micrometer.TracingManager;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.ReactiveContextProvider;
@@ -63,13 +64,16 @@ public class OperationExecutorImpl implements OperationExecutor {
@Nullable
private final ReactiveContextProvider contextProvider;
private final TimeoutSettings timeoutSettings;
+ private final TracingManager tracingManager;
OperationExecutorImpl(final MongoClientImpl mongoClient, final ClientSessionHelper clientSessionHelper,
- final TimeoutSettings timeoutSettings, @Nullable final ReactiveContextProvider contextProvider) {
+ final TimeoutSettings timeoutSettings, @Nullable final ReactiveContextProvider contextProvider,
+ final TracingManager tracingManager) {
this.mongoClient = mongoClient;
this.clientSessionHelper = clientSessionHelper;
this.timeoutSettings = timeoutSettings;
this.contextProvider = contextProvider;
+ this.tracingManager = tracingManager;
}
@Override
@@ -93,22 +97,37 @@ public Mono execute(final ReadOperation, T> operation, final ReadPrefer
OperationContext operationContext = getOperationContext(requestContext, actualClientSession, readConcern, operation.getCommandName())
.withSessionContext(new ClientSessionBinding.AsyncClientSessionContext(actualClientSession,
isImplicitSession(session), readConcern));
+ Span span = tracingManager.createOperationSpan(actualClientSession.getTransactionSpan(),
+ operationContext, operation.getCommandName(), operation.getNamespace());
if (session != null && session.hasActiveTransaction() && !binding.getReadPreference().equals(primary())) {
binding.release();
- return Mono.error(new MongoClientException("Read preference in a transaction must be primary"));
+ MongoClientException error = new MongoClientException("Read preference in a transaction must be primary");
+ if (span != null) {
+ span.error(error);
+ span.end();
+ }
+ return Mono.error(error);
} else {
return Mono.create(sink -> operation.executeAsync(binding, operationContext, (result, t) -> {
try {
binding.release();
} finally {
+ if (t != null) {
+ Throwable exceptionToHandle = t instanceof MongoException
+ ? OperationHelper.unwrap((MongoException) t) : t;
+ labelException(session, exceptionToHandle);
+ unpinServerAddressOnTransientTransactionError(session, exceptionToHandle);
+ if (span != null) {
+ span.error(t);
+ }
+ }
+ if (span != null) {
+ span.end();
+ }
sinkToCallback(sink).onResult(result, t);
}
- })).doOnError((t) -> {
- Throwable exceptionToHandle = t instanceof MongoException ? OperationHelper.unwrap((MongoException) t) : t;
- labelException(session, exceptionToHandle);
- unpinServerAddressOnTransientTransactionError(session, exceptionToHandle);
- });
+ }));
}
}).subscribe(subscriber)
);
@@ -133,18 +152,28 @@ public Mono execute(final WriteOperation operation, final ReadConcern
OperationContext operationContext = getOperationContext(requestContext, actualClientSession, readConcern, operation.getCommandName())
.withSessionContext(new ClientSessionBinding.AsyncClientSessionContext(actualClientSession,
isImplicitSession(session), readConcern));
+ Span span = tracingManager.createOperationSpan(actualClientSession.getTransactionSpan(),
+ operationContext, operation.getCommandName(), operation.getNamespace());
return Mono.create(sink -> operation.executeAsync(binding, operationContext, (result, t) -> {
try {
binding.release();
} finally {
+ if (t != null) {
+ Throwable exceptionToHandle = t instanceof MongoException
+ ? OperationHelper.unwrap((MongoException) t) : t;
+ labelException(session, exceptionToHandle);
+ unpinServerAddressOnTransientTransactionError(session, exceptionToHandle);
+ if (span != null) {
+ span.error(t);
+ }
+ }
+ if (span != null) {
+ span.end();
+ }
sinkToCallback(sink).onResult(result, t);
}
- })).doOnError((t) -> {
- Throwable exceptionToHandle = t instanceof MongoException ? OperationHelper.unwrap((MongoException) t) : t;
- labelException(session, exceptionToHandle);
- unpinServerAddressOnTransientTransactionError(session, exceptionToHandle);
- });
+ }));
}
).subscribe(subscriber)
);
@@ -155,7 +184,7 @@ public OperationExecutor withTimeoutSettings(final TimeoutSettings newTimeoutSet
if (Objects.equals(timeoutSettings, newTimeoutSettings)) {
return this;
}
- return new OperationExecutorImpl(mongoClient, clientSessionHelper, newTimeoutSettings, contextProvider);
+ return new OperationExecutorImpl(mongoClient, clientSessionHelper, newTimeoutSettings, contextProvider, tracingManager);
}
@Override
@@ -214,7 +243,7 @@ private OperationContext getOperationContext(final RequestContext requestContext
requestContext,
new ReadConcernAwareNoOpSessionContext(readConcern),
createTimeoutContext(session, timeoutSettings),
- TracingManager.NO_OP,
+ tracingManager,
mongoClient.getSettings().getServerApi(),
commandName);
}
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/TimeoutHelper.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/TimeoutHelper.java
index bc4da3026a9..cefdf7184d8 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/TimeoutHelper.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/TimeoutHelper.java
@@ -55,8 +55,14 @@ public static MongoCollection collectionWithTimeout(final MongoCollection
public static Mono> collectionWithTimeoutMono(final MongoCollection collection,
@Nullable final Timeout timeout) {
+ return collectionWithTimeoutMono(collection, timeout, DEFAULT_TIMEOUT_MESSAGE);
+ }
+
+ public static Mono> collectionWithTimeoutMono(final MongoCollection collection,
+ @Nullable final Timeout timeout,
+ final String message) {
try {
- return Mono.just(collectionWithTimeout(collection, timeout));
+ return Mono.just(collectionWithTimeout(collection, timeout, message));
} catch (MongoOperationTimeoutException e) {
return Mono.error(e);
}
@@ -64,9 +70,14 @@ public static Mono> collectionWithTimeoutMono(final Mongo
public static Mono> collectionWithTimeoutDeferred(final MongoCollection collection,
@Nullable final Timeout timeout) {
- return Mono.defer(() -> collectionWithTimeoutMono(collection, timeout));
+ return collectionWithTimeoutDeferred(collection, timeout, DEFAULT_TIMEOUT_MESSAGE);
}
+ public static Mono> collectionWithTimeoutDeferred(final MongoCollection collection,
+ @Nullable final Timeout timeout,
+ final String message) {
+ return Mono.defer(() -> collectionWithTimeoutMono(collection, timeout, message));
+ }
public static MongoDatabase databaseWithTimeout(final MongoDatabase database,
@Nullable final Timeout timeout) {
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java
index 7d9a46cdf3f..50586e92102 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java
@@ -54,7 +54,8 @@
*/
public final class GridFSUploadPublisherImpl implements GridFSUploadPublisher {
- private static final String TIMEOUT_ERROR_MESSAGE = "Saving chunks exceeded the timeout limit.";
+ private static final String TIMEOUT_ERROR_MESSAGE_CHUNKS_SAVING = "Saving chunks exceeded the timeout limit.";
+ private static final String TIMEOUT_ERROR_MESSAGE_UPLOAD_CANCELLATION = "Upload cancellation exceeded the timeout limit.";
private static final Document PROJECTION = new Document("_id", 1);
private static final Document FILES_INDEX = new Document("filename", 1).append("uploadDate", 1);
private static final Document CHUNKS_INDEX = new Document("files_id", 1).append("n", 1);
@@ -226,8 +227,8 @@ private Mono createSaveChunksMono(final AtomicBoolean terminated, @Nullabl
.append("data", data);
Publisher insertOnePublisher = clientSession == null
- ? collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).insertOne(chunkDocument)
- : collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE)
+ ? collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE_CHUNKS_SAVING).insertOne(chunkDocument)
+ : collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE_CHUNKS_SAVING)
.insertOne(clientSession, chunkDocument);
return Mono.from(insertOnePublisher).thenReturn(data.length());
@@ -270,7 +271,8 @@ private Mono createSaveFileDataMono(final AtomicBoolean termina
}
private Mono createCancellationMono(final AtomicBoolean terminated, @Nullable final Timeout timeout) {
- Mono> chunksCollectionMono = collectionWithTimeoutDeferred(chunksCollection, timeout);
+ Mono> chunksCollectionMono = collectionWithTimeoutDeferred(chunksCollection, timeout,
+ TIMEOUT_ERROR_MESSAGE_UPLOAD_CANCELLATION);
if (terminated.compareAndSet(false, true)) {
if (clientSession != null) {
return chunksCollectionMono.flatMap(collection -> Mono.from(collection
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java
index b922ec20b71..90446953fc1 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java
@@ -16,7 +16,6 @@
package com.mongodb.reactivestreams.client;
-import com.mongodb.ClusterFixture;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCommandException;
import com.mongodb.MongoNamespace;
@@ -24,7 +23,6 @@
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.client.AbstractClientSideOperationsTimeoutProseTest;
-import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.event.CommandFailedEvent;
import com.mongodb.event.CommandStartedEvent;
@@ -43,6 +41,7 @@
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
+import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.nio.ByteBuffer;
@@ -58,12 +57,16 @@
import static com.mongodb.ClusterFixture.TIMEOUT_DURATION;
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet;
+import static com.mongodb.ClusterFixture.isStandalone;
import static com.mongodb.ClusterFixture.serverVersionAtLeast;
import static com.mongodb.ClusterFixture.sleep;
+import static com.mongodb.assertions.Assertions.assertTrue;
import static java.util.Collections.singletonList;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assumptions.assumeFalse;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
@@ -104,7 +107,6 @@ protected boolean isAsync() {
@Override
public void testGridFSUploadViaOpenUploadStreamTimeout() {
assumeTrue(serverVersionAtLeast(4, 4));
- long rtt = ClusterFixture.getPrimaryRTT();
//given
collectionHelper.runAdminCommand("{"
@@ -113,12 +115,12 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() {
+ " data: {"
+ " failCommands: [\"insert\"],"
+ " blockConnection: true,"
- + " blockTimeMS: " + (rtt + 405)
+ + " blockTimeMS: " + 600
+ " }"
+ "}");
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
- .timeout(rtt + 400, TimeUnit.MILLISECONDS))) {
+ .timeout(600, TimeUnit.MILLISECONDS))) {
MongoDatabase database = client.getDatabase(gridFsFileNamespace.getDatabaseName());
GridFSBucket gridFsBucket = createReaciveGridFsBucket(database, GRID_FS_BUCKET_NAME);
@@ -158,7 +160,6 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() {
@Override
public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, InterruptedException, TimeoutException {
assumeTrue(serverVersionAtLeast(4, 4));
- long rtt = ClusterFixture.getPrimaryRTT();
//given
CompletableFuture droppedErrorFuture = new CompletableFuture<>();
@@ -170,12 +171,12 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I
+ " data: {"
+ " failCommands: [\"delete\"],"
+ " blockConnection: true,"
- + " blockTimeMS: " + (rtt + 405)
+ + " blockTimeMS: " + 405
+ " }"
+ "}");
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
- .timeout(rtt + 400, TimeUnit.MILLISECONDS))) {
+ .timeout(400, TimeUnit.MILLISECONDS))) {
MongoDatabase database = client.getDatabase(gridFsFileNamespace.getDatabaseName());
GridFSBucket gridFsBucket = createReaciveGridFsBucket(database, GRID_FS_BUCKET_NAME);
@@ -198,12 +199,25 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I
//then
Throwable droppedError = droppedErrorFuture.get(TIMEOUT_DURATION.toMillis(), TimeUnit.MILLISECONDS);
Throwable commandError = droppedError.getCause();
- assertInstanceOf(MongoOperationTimeoutException.class, commandError);
CommandFailedEvent deleteFailedEvent = commandListener.getCommandFailedEvent("delete");
assertNotNull(deleteFailedEvent);
- assertEquals(commandError, commandListener.getCommandFailedEvent("delete").getThrowable());
+ CommandStartedEvent deleteStartedEvent = commandListener.getCommandStartedEvent("delete");
+ assertTrue(deleteStartedEvent.getCommand().containsKey("maxTimeMS"), "Expected delete command to have maxTimeMS");
+ long deleteMaxTimeMS = deleteStartedEvent
+ .getCommand()
+ .get("maxTimeMS")
+ .asNumber()
+ .longValue();
+
+ assertTrue(deleteMaxTimeMS <= 420
+ // some leeway for timing variations, when compression is used it is often less then 300.
+ // Without it, it is more than 300.
+ && deleteMaxTimeMS >= 150,
+ "Expected maxTimeMS for delete command to be between 150s and 420ms, " + "but was: " + deleteMaxTimeMS + "ms");
+ assertEquals(commandError, deleteFailedEvent.getThrowable());
+
// When subscription is cancelled, we should not receive any more events.
testSubscriber.assertNoTerminalEvent();
}
@@ -219,9 +233,8 @@ public void testTimeoutMSAppliesToFullResumeAttemptInNextCall() {
assumeTrue(isDiscoverableReplicaSet());
//given
- long rtt = ClusterFixture.getPrimaryRTT();
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
- .timeout(rtt + 500, TimeUnit.MILLISECONDS))) {
+ .timeout(500, TimeUnit.MILLISECONDS))) {
MongoNamespace namespace = generateNamespace();
MongoCollection collection = client.getDatabase(namespace.getDatabaseName())
@@ -273,9 +286,8 @@ public void testTimeoutMSAppliedToInitialAggregate() {
assumeTrue(isDiscoverableReplicaSet());
//given
- long rtt = ClusterFixture.getPrimaryRTT();
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
- .timeout(rtt + 200, TimeUnit.MILLISECONDS))) {
+ .timeout(200, TimeUnit.MILLISECONDS))) {
MongoNamespace namespace = generateNamespace();
MongoCollection collection = client.getDatabase(namespace.getDatabaseName())
@@ -290,7 +302,7 @@ public void testTimeoutMSAppliedToInitialAggregate() {
+ " data: {"
+ " failCommands: [\"aggregate\" ],"
+ " blockConnection: true,"
- + " blockTimeMS: " + (rtt + 201)
+ + " blockTimeMS: " + 201
+ " }"
+ "}");
@@ -321,13 +333,10 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() {
//given
BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0);
- collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions());
sleep(2000);
-
- long rtt = ClusterFixture.getPrimaryRTT();
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
- .timeout(rtt + 300, TimeUnit.MILLISECONDS))) {
+ .timeout(500, TimeUnit.MILLISECONDS))) {
MongoCollection collection = client.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary());
@@ -338,7 +347,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() {
+ " data: {"
+ " failCommands: [\"getMore\", \"aggregate\"],"
+ " blockConnection: true,"
- + " blockTimeMS: " + (rtt + 200)
+ + " blockTimeMS: " + 200
+ " }"
+ "}");
@@ -389,12 +398,10 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() {
//given
BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0);
- collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions());
sleep(2000);
- long rtt = ClusterFixture.getPrimaryRTT();
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
- .timeout(rtt + 300, TimeUnit.MILLISECONDS))) {
+ .timeout(500, TimeUnit.MILLISECONDS))) {
MongoCollection collection = client.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName())
@@ -406,7 +413,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() {
+ " data: {"
+ " failCommands: [\"aggregate\", \"getMore\"],"
+ " blockConnection: true,"
- + " blockTimeMS: " + (rtt + 200)
+ + " blockTimeMS: " + 200
+ " }"
+ "}");
@@ -449,9 +456,8 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt
assumeTrue(isDiscoverableReplicaSet());
//given
- long rtt = ClusterFixture.getPrimaryRTT();
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
- .timeout(rtt + 2500, TimeUnit.MILLISECONDS))) {
+ .timeout(2500, TimeUnit.MILLISECONDS))) {
MongoCollection collection = client.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary());
@@ -468,7 +474,78 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt
List commandStartedEvents = commandListener.getCommandStartedEvents();
assertCommandStartedEventsInOder(Arrays.asList("aggregate", "getMore", "getMore", "getMore", "killCursors"),
commandStartedEvents);
- assertOnlyOneCommandTimeoutFailure("getMore");
+
+ }
+ }
+
+ @DisplayName("9. End Session. The timeout specified via the MongoClient timeoutMS option")
+ @Test
+ @Override
+ public void test9EndSessionClientTimeout() {
+ assumeTrue(serverVersionAtLeast(4, 4));
+ assumeFalse(isStandalone());
+
+ collectionHelper.runAdminCommand("{"
+ + " configureFailPoint: \"failCommand\","
+ + " mode: { times: 1 },"
+ + " data: {"
+ + " failCommands: [\"abortTransaction\"],"
+ + " blockConnection: true,"
+ + " blockTimeMS: " + 400
+ + " }"
+ + "}");
+
+ try (MongoClient mongoClient = createReactiveClient(getMongoClientSettingsBuilder().retryWrites(false)
+ .timeout(300, TimeUnit.MILLISECONDS))) {
+ MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName())
+ .getCollection(namespace.getCollectionName());
+
+ try (ClientSession session = Mono.from(mongoClient.startSession()).block()) {
+ session.startTransaction();
+ Mono.from(collection.insertOne(session, new Document("x", 1))).block();
+ }
+
+ sleep(postSessionCloseSleep());
+ CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> commandListener.getCommandFailedEvent("abortTransaction"));
+ long elapsedTime = abortTransactionEvent.getElapsedTime(TimeUnit.MILLISECONDS);
+ assertInstanceOf(MongoOperationTimeoutException.class, abortTransactionEvent.getThrowable());
+ assertTrue(elapsedTime <= 400, "Took too long to time out, elapsedMS: " + elapsedTime);
+ }
+ }
+
+ @Test
+ @DisplayName("9. End Session. The timeout specified via the ClientSession defaultTimeoutMS option")
+ @Override
+ public void test9EndSessionSessionTimeout() {
+ assumeTrue(serverVersionAtLeast(4, 4));
+ assumeFalse(isStandalone());
+
+ collectionHelper.runAdminCommand("{"
+ + " configureFailPoint: \"failCommand\","
+ + " mode: { times: 1 },"
+ + " data: {"
+ + " failCommands: [\"abortTransaction\"],"
+ + " blockConnection: true,"
+ + " blockTimeMS: " + 400
+ + " }"
+ + "}");
+
+ try (MongoClient mongoClient = createReactiveClient(getMongoClientSettingsBuilder())) {
+ MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName())
+ .getCollection(namespace.getCollectionName());
+
+ try (ClientSession session = Mono.from(mongoClient.startSession(com.mongodb.ClientSessionOptions.builder()
+ .defaultTimeout(300, TimeUnit.MILLISECONDS).build())).block()) {
+
+ session.startTransaction();
+ Mono.from(collection.insertOne(session, new Document("x", 1))).block();
+ }
+
+ sleep(postSessionCloseSleep());
+ CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> commandListener.getCommandFailedEvent("abortTransaction"));
+ long elapsedTime = abortTransactionEvent.getElapsedTime(TimeUnit.MILLISECONDS);
+ assertInstanceOf(MongoOperationTimeoutException.class, abortTransactionEvent.getThrowable());
+ assertTrue(elapsedTime <= 400, "Took too long to time out, elapsedMS: " + elapsedTime);
}
}
@@ -512,6 +589,6 @@ public void tearDown() throws InterruptedException {
@Override
protected int postSessionCloseSleep() {
- return 256;
+ return 1000;
}
}
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/observability/MicrometerProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/observability/MicrometerProseTest.java
new file mode 100644
index 00000000000..c58bb98f2cc
--- /dev/null
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/observability/MicrometerProseTest.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.reactivestreams.client.observability;
+
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.AbstractMicrometerProseTest;
+import com.mongodb.client.MongoClient;
+import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient;
+
+/**
+ * Reactive Streams driver implementation of the Micrometer prose tests.
+ */
+public class MicrometerProseTest extends AbstractMicrometerProseTest {
+ @Override
+ protected MongoClient createMongoClient(final MongoClientSettings settings) {
+ return new SyncMongoClient(settings);
+ }
+}
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncClientSession.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncClientSession.java
index e1d765150a7..473d57a3878 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncClientSession.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncClientSession.java
@@ -192,7 +192,7 @@ public TimeoutContext getTimeoutContext() {
@Override
@Nullable
public TransactionSpan getTransactionSpan() {
- return null;
+ return wrapped.getTransactionSpan();
}
private static void sleep(final long millis) {
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/MicrometerTracingTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/MicrometerTracingTest.java
new file mode 100644
index 00000000000..bf2e6205ad6
--- /dev/null
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/MicrometerTracingTest.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.reactivestreams.client.unified;
+
+import org.junit.jupiter.params.provider.Arguments;
+
+import java.util.Collection;
+
+final class MicrometerTracingTest extends UnifiedReactiveStreamsTest {
+ private static Collection data() {
+ return getTestData("open-telemetry/tests");
+ }
+}
diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MongoClientImplTest.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MongoClientImplTest.java
index c192ae17896..0fda131f4ff 100644
--- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MongoClientImplTest.java
+++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MongoClientImplTest.java
@@ -25,6 +25,7 @@
import com.mongodb.internal.connection.ClientMetadata;
import com.mongodb.internal.connection.Cluster;
import com.mongodb.internal.mockito.MongoMockito;
+import com.mongodb.internal.observability.micrometer.TracingManager;
import com.mongodb.internal.session.ServerSessionPool;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import com.mongodb.reactivestreams.client.ClientSession;
@@ -179,7 +180,7 @@ void testWatch() {
@Test
void testStartSession() {
ServerSessionPool serverSessionPool = mock(ServerSessionPool.class);
- ClientSessionHelper clientSessionHelper = new ClientSessionHelper(mongoClient, serverSessionPool);
+ ClientSessionHelper clientSessionHelper = new ClientSessionHelper(mongoClient, serverSessionPool, TracingManager.NO_OP);
assertAll("Start Session Tests",
() -> assertAll("check validation",
diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java
index 920feb1f986..eb36678761a 100644
--- a/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java
+++ b/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java
@@ -22,7 +22,6 @@
import com.mongodb.MongoClientException;
import com.mongodb.MongoException;
import com.mongodb.MongoInternalException;
-import com.mongodb.MongoNamespace;
import com.mongodb.MongoQueryException;
import com.mongodb.MongoSocketException;
import com.mongodb.MongoTimeoutException;
@@ -53,17 +52,14 @@
import com.mongodb.internal.connection.Cluster;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext;
+import com.mongodb.internal.observability.micrometer.Span;
+import com.mongodb.internal.observability.micrometer.TracingManager;
import com.mongodb.internal.operation.OperationHelper;
import com.mongodb.internal.operation.Operations;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteOperation;
import com.mongodb.internal.session.ServerSessionPool;
-import com.mongodb.internal.observability.micrometer.Span;
-import com.mongodb.internal.observability.micrometer.TraceContext;
-import com.mongodb.internal.observability.micrometer.TracingManager;
-import com.mongodb.internal.observability.micrometer.TransactionSpan;
import com.mongodb.lang.Nullable;
-import io.micrometer.common.KeyValues;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.UuidRepresentation;
@@ -77,17 +73,11 @@
import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL;
import static com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL;
-import static com.mongodb.internal.MongoNamespaceHelper.COMMAND_COLLECTION_NAME;
import static com.mongodb.ReadPreference.primary;
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.TimeoutContext.createTimeoutContext;
-import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.COLLECTION;
-import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.NAMESPACE;
-import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.OPERATION_NAME;
-import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.OPERATION_SUMMARY;
-import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.SYSTEM;
final class MongoClusterImpl implements MongoCluster {
@Nullable
@@ -434,7 +424,8 @@ public T execute(final ReadOperation operation, final ReadPreference r
boolean implicitSession = isImplicitSession(session);
OperationContext operationContext = getOperationContext(actualClientSession, readConcern, operation.getCommandName())
.withSessionContext(new ClientSessionBinding.SyncClientSessionContext(actualClientSession, readConcern, implicitSession));
- Span span = createOperationSpan(actualClientSession, operationContext, operation.getCommandName(), operation.getNamespace());
+ Span span = operationContext.getTracingManager().createOperationSpan(
+ actualClientSession.getTransactionSpan(), operationContext, operation.getCommandName(), operation.getNamespace());
ReadBinding binding = getReadBinding(readPreference, actualClientSession, implicitSession);
@@ -469,7 +460,8 @@ public T execute(final WriteOperation operation, final ReadConcern readCo
ClientSession actualClientSession = getClientSession(session);
OperationContext operationContext = getOperationContext(actualClientSession, readConcern, operation.getCommandName())
.withSessionContext(new ClientSessionBinding.SyncClientSessionContext(actualClientSession, readConcern, isImplicitSession(session)));
- Span span = createOperationSpan(actualClientSession, operationContext, operation.getCommandName(), operation.getNamespace());
+ Span span = operationContext.getTracingManager().createOperationSpan(
+ actualClientSession.getTransactionSpan(), operationContext, operation.getCommandName(), operation.getNamespace());
WriteBinding binding = getWriteBinding(actualClientSession, isImplicitSession(session));
try {
@@ -587,48 +579,6 @@ ClientSession getClientSession(@Nullable final ClientSession clientSessionFromOp
return session;
}
- /**
- * Create a tracing span for the given operation, and set it on operation context.
- *
- * @param actualClientSession the session that the operation is part of
- * @param operationContext the operation context for the operation
- * @param commandName the name of the command
- * @param namespace the namespace of the command
- * @return the created span, or null if tracing is not enabled
- */
- @Nullable
- private Span createOperationSpan(final ClientSession actualClientSession, final OperationContext operationContext, final String commandName, final MongoNamespace namespace) {
- TracingManager tracingManager = operationContext.getTracingManager();
- if (tracingManager.isEnabled()) {
- TraceContext parentContext = null;
- TransactionSpan transactionSpan = actualClientSession.getTransactionSpan();
- if (transactionSpan != null) {
- parentContext = transactionSpan.getContext();
- }
- String name = commandName + " " + namespace.getDatabaseName() + (COMMAND_COLLECTION_NAME.equalsIgnoreCase(namespace.getCollectionName())
- ? ""
- : "." + namespace.getCollectionName());
-
- KeyValues keyValues = KeyValues.of(
- SYSTEM.withValue("mongodb"),
- NAMESPACE.withValue(namespace.getDatabaseName()));
- if (!COMMAND_COLLECTION_NAME.equalsIgnoreCase(namespace.getCollectionName())) {
- keyValues = keyValues.and(COLLECTION.withValue(namespace.getCollectionName()));
- }
- keyValues = keyValues.and(OPERATION_NAME.withValue(commandName),
- OPERATION_SUMMARY.withValue(name));
-
- Span span = tracingManager.addSpan(name, parentContext, namespace);
-
- span.tagLowCardinality(keyValues);
-
- operationContext.setTracingSpan(span);
- return span;
-
- } else {
- return null;
- }
- }
}
private boolean isImplicitSession(@Nullable final ClientSession session) {
diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java
index 9ce58b1654f..7828ecde684 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java
@@ -56,11 +56,8 @@
import com.mongodb.internal.connection.TestCommandListener;
import com.mongodb.internal.connection.TestConnectionPoolListener;
import com.mongodb.test.FlakyTest;
-import org.bson.BsonArray;
-import org.bson.BsonBoolean;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
-import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.codecs.BsonDocumentCodec;
@@ -256,7 +253,6 @@ public void testBlockingIterationMethodsChangeStream() {
assumeFalse(isAsync()); // Async change stream cursor is non-deterministic for cursor::next
BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0);
- collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions());
sleep(2000);
collectionHelper.insertDocuments(singletonList(BsonDocument.parse("{x: 1}")), WriteConcern.MAJORITY);
@@ -298,7 +294,6 @@ public void testBlockingIterationMethodsChangeStream() {
@FlakyTest(maxAttempts = 3)
public void testGridFSUploadViaOpenUploadStreamTimeout() {
assumeTrue(serverVersionAtLeast(4, 4));
- long rtt = ClusterFixture.getPrimaryRTT();
collectionHelper.runAdminCommand("{"
+ " configureFailPoint: \"failCommand\","
@@ -306,7 +301,7 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() {
+ " data: {"
+ " failCommands: [\"insert\"],"
+ " blockConnection: true,"
- + " blockTimeMS: " + (rtt + 205)
+ + " blockTimeMS: " + 205
+ " }"
+ "}");
@@ -314,7 +309,7 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() {
filesCollectionHelper.create();
try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder()
- .timeout(rtt + 200, TimeUnit.MILLISECONDS))) {
+ .timeout(200, TimeUnit.MILLISECONDS))) {
MongoDatabase database = client.getDatabase(namespace.getDatabaseName());
GridFSBucket gridFsBucket = createGridFsBucket(database, GRID_FS_BUCKET_NAME);
@@ -329,7 +324,6 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() {
@Test
public void testAbortingGridFsUploadStreamTimeout() throws Throwable {
assumeTrue(serverVersionAtLeast(4, 4));
- long rtt = ClusterFixture.getPrimaryRTT();
collectionHelper.runAdminCommand("{"
+ " configureFailPoint: \"failCommand\","
@@ -337,7 +331,7 @@ public void testAbortingGridFsUploadStreamTimeout() throws Throwable {
+ " data: {"
+ " failCommands: [\"delete\"],"
+ " blockConnection: true,"
- + " blockTimeMS: " + (rtt + 305)
+ + " blockTimeMS: " + 320
+ " }"
+ "}");
@@ -345,7 +339,7 @@ public void testAbortingGridFsUploadStreamTimeout() throws Throwable {
filesCollectionHelper.create();
try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder()
- .timeout(rtt + 300, TimeUnit.MILLISECONDS))) {
+ .timeout(300, TimeUnit.MILLISECONDS))) {
MongoDatabase database = client.getDatabase(namespace.getDatabaseName());
GridFSBucket gridFsBucket = createGridFsBucket(database, GRID_FS_BUCKET_NAME).withChunkSizeBytes(2);
@@ -360,7 +354,6 @@ public void testAbortingGridFsUploadStreamTimeout() throws Throwable {
@Test
public void testGridFsDownloadStreamTimeout() {
assumeTrue(serverVersionAtLeast(4, 4));
- long rtt = ClusterFixture.getPrimaryRTT();
chunksCollectionHelper.create();
filesCollectionHelper.create();
@@ -382,18 +375,19 @@ public void testGridFsDownloadStreamTimeout() {
+ " metadata: {}"
+ "}"
)), WriteConcern.MAJORITY);
+
collectionHelper.runAdminCommand("{"
+ " configureFailPoint: \"failCommand\","
+ " mode: { skip: 1 },"
+ " data: {"
+ " failCommands: [\"find\"],"
+ " blockConnection: true,"
- + " blockTimeMS: " + (rtt + 95)
+ + " blockTimeMS: " + 500
+ " }"
+ "}");
try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder()
- .timeout(rtt + 100, TimeUnit.MILLISECONDS))) {
+ .timeout(300, TimeUnit.MILLISECONDS))) {
MongoDatabase database = client.getDatabase(namespace.getDatabaseName());
GridFSBucket gridFsBucket = createGridFsBucket(database, GRID_FS_BUCKET_NAME).withChunkSizeBytes(2);
@@ -401,7 +395,9 @@ public void testGridFsDownloadStreamTimeout() {
assertThrows(MongoOperationTimeoutException.class, downloadStream::read);
List events = commandListener.getCommandStartedEvents();
- List findCommands = events.stream().filter(e -> e.getCommandName().equals("find")).collect(Collectors.toList());
+ List findCommands = events.stream()
+ .filter(e -> e.getCommandName().equals("find"))
+ .collect(Collectors.toList());
assertEquals(2, findCommands.size());
assertEquals(gridFsFileNamespace.getCollectionName(), findCommands.get(0).getCommand().getString("find").getValue());
@@ -414,7 +410,7 @@ public void testGridFsDownloadStreamTimeout() {
@ParameterizedTest(name = "[{index}] {0}")
@MethodSource("test8ServerSelectionArguments")
public void test8ServerSelection(final String connectionString) {
- int timeoutBuffer = 100; // 5 in spec, Java is slower
+ int timeoutBuffer = 150; // 5 in spec, Java is slower
// 1. Create a MongoClient
try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder()
.applyConnectionString(new ConnectionString(connectionString)))
@@ -450,7 +446,7 @@ public void test8ServerSelectionHandshake(final String ignoredTestName, final in
+ " data: {"
+ " failCommands: [\"saslContinue\"],"
+ " blockConnection: true,"
- + " blockTimeMS: 350"
+ + " blockTimeMS: 600"
+ " }"
+ "}");
@@ -466,7 +462,7 @@ public void test8ServerSelectionHandshake(final String ignoredTestName, final in
.insertOne(new Document("x", 1));
});
long elapsed = msElapsedSince(start);
- assertTrue(elapsed <= 310, "Took too long to time out, elapsedMS: " + elapsed);
+ assertTrue(elapsed <= 350, "Took too long to time out, elapsedMS: " + elapsed);
}
}
@@ -483,23 +479,23 @@ public void test9EndSessionClientTimeout() {
+ " data: {"
+ " failCommands: [\"abortTransaction\"],"
+ " blockConnection: true,"
- + " blockTimeMS: " + 150
+ + " blockTimeMS: " + 500
+ " }"
+ "}");
try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder().retryWrites(false)
- .timeout(100, TimeUnit.MILLISECONDS))) {
- MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName())
+ .timeout(250, TimeUnit.MILLISECONDS))) {
+ MongoDatabase database = mongoClient.getDatabase(namespace.getDatabaseName());
+ MongoCollection collection = database
.getCollection(namespace.getCollectionName());
try (ClientSession session = mongoClient.startSession()) {
session.startTransaction();
collection.insertOne(session, new Document("x", 1));
-
long start = System.nanoTime();
session.close();
- long elapsed = msElapsedSince(start) - postSessionCloseSleep();
- assertTrue(elapsed <= 150, "Took too long to time out, elapsedMS: " + elapsed);
+ long elapsed = msElapsedSince(start);
+ assertTrue(elapsed <= 300, "Took too long to time out, elapsedMS: " + elapsed);
}
}
CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() ->
@@ -520,7 +516,7 @@ public void test9EndSessionSessionTimeout() {
+ " data: {"
+ " failCommands: [\"abortTransaction\"],"
+ " blockConnection: true,"
- + " blockTimeMS: " + 150
+ + " blockTimeMS: " + 400
+ " }"
+ "}");
@@ -529,14 +525,14 @@ public void test9EndSessionSessionTimeout() {
.getCollection(namespace.getCollectionName());
try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder()
- .defaultTimeout(100, TimeUnit.MILLISECONDS).build())) {
+ .defaultTimeout(300, TimeUnit.MILLISECONDS).build())) {
session.startTransaction();
collection.insertOne(session, new Document("x", 1));
long start = System.nanoTime();
session.close();
- long elapsed = msElapsedSince(start) - postSessionCloseSleep();
- assertTrue(elapsed <= 150, "Took too long to time out, elapsedMS: " + elapsed);
+ long elapsed = msElapsedSince(start);
+ assertTrue(elapsed <= 400, "Took too long to time out, elapsedMS: " + elapsed);
}
}
CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() ->
@@ -563,11 +559,12 @@ public void test9EndSessionCustomTesEachOperationHasItsOwnTimeoutWithCommit() {
MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName());
+ int defaultTimeout = 300;
try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder()
- .defaultTimeout(200, TimeUnit.MILLISECONDS).build())) {
+ .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS).build())) {
session.startTransaction();
collection.insertOne(session, new Document("x", 1));
- sleep(200);
+ sleep(defaultTimeout);
assertDoesNotThrow(session::commitTransaction);
}
@@ -594,11 +591,12 @@ public void test9EndSessionCustomTesEachOperationHasItsOwnTimeoutWithAbort() {
MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName());
+ int defaultTimeout = 300;
try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder()
- .defaultTimeout(200, TimeUnit.MILLISECONDS).build())) {
+ .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS).build())) {
session.startTransaction();
collection.insertOne(session, new Document("x", 1));
- sleep(200);
+ sleep(defaultTimeout);
assertDoesNotThrow(session::close);
}
@@ -618,12 +616,12 @@ public void test10ConvenientTransactions() {
+ " data: {"
+ " failCommands: [\"insert\", \"abortTransaction\"],"
+ " blockConnection: true,"
- + " blockTimeMS: " + 150
+ + " blockTimeMS: " + 200
+ " }"
+ "}");
try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder()
- .timeout(100, TimeUnit.MILLISECONDS))) {
+ .timeout(150, TimeUnit.MILLISECONDS))) {
MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName());
@@ -661,12 +659,13 @@ public void test10CustomTestWithTransactionUsesASingleTimeout() {
MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName());
+ int defaultTimeout = 200;
try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder()
- .defaultTimeout(200, TimeUnit.MILLISECONDS).build())) {
+ .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS).build())) {
assertThrows(MongoOperationTimeoutException.class,
() -> session.withTransaction(() -> {
collection.insertOne(session, new Document("x", 1));
- sleep(200);
+ sleep(defaultTimeout);
return true;
})
);
@@ -696,12 +695,13 @@ public void test10CustomTestWithTransactionUsesASingleTimeoutWithLock() {
MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName());
+ int defaultTimeout = 200;
try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder()
- .defaultTimeout(200, TimeUnit.MILLISECONDS).build())) {
+ .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS).build())) {
assertThrows(MongoOperationTimeoutException.class,
() -> session.withTransaction(() -> {
collection.insertOne(session, new Document("x", 1));
- sleep(200);
+ sleep(defaultTimeout);
return true;
})
);
@@ -710,7 +710,7 @@ public void test10CustomTestWithTransactionUsesASingleTimeoutWithLock() {
}
@DisplayName("11. Multi-batch bulkWrites")
- @Test
+ @FlakyTest(maxAttempts = 3)
@SuppressWarnings("try")
protected void test11MultiBatchBulkWrites() throws InterruptedException {
assumeTrue(serverVersionAtLeast(8, 0));
@@ -718,12 +718,18 @@ protected void test11MultiBatchBulkWrites() throws InterruptedException {
// a workaround for https://jira.mongodb.org/browse/DRIVERS-2997, remove this block when the aforementioned bug is fixed
client.getDatabase(namespace.getDatabaseName()).drop();
}
- BsonDocument failPointDocument = new BsonDocument("configureFailPoint", new BsonString("failCommand"))
- .append("mode", new BsonDocument("times", new BsonInt32(2)))
- .append("data", new BsonDocument("failCommands", new BsonArray(singletonList(new BsonString("bulkWrite"))))
- .append("blockConnection", BsonBoolean.TRUE)
- .append("blockTimeMS", new BsonInt32(2020)));
- try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder().timeout(4000, TimeUnit.MILLISECONDS));
+ BsonDocument failPointDocument = BsonDocument.parse("{"
+ + " configureFailPoint: \"failCommand\","
+ + " mode: { times: 2},"
+ + " data: {"
+ + " failCommands: [\"bulkWrite\" ],"
+ + " blockConnection: true,"
+ + " blockTimeMS: " + 2020
+ + " }"
+ + "}");
+
+ long timeout = 4000;
+ try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder().timeout(timeout, TimeUnit.MILLISECONDS));
FailPoint ignored = FailPoint.enable(failPointDocument, getPrimary())) {
MongoDatabase db = client.getDatabase(namespace.getDatabaseName());
db.drop();
@@ -746,8 +752,8 @@ protected void test11MultiBatchBulkWrites() throws InterruptedException {
* Not a prose spec test. However, it is additional test case for better coverage.
*/
@Test
- @DisplayName("Should ignore wTimeoutMS of WriteConcern to initial and subsequent commitTransaction operations")
- public void shouldIgnoreWtimeoutMsOfWriteConcernToInitialAndSubsequentCommitTransactionOperations() {
+ @DisplayName("Should not include wTimeoutMS of WriteConcern to initial and subsequent commitTransaction operations")
+ public void shouldNotIncludeWtimeoutMsOfWriteConcernToInitialAndSubsequentCommitTransactionOperations() {
assumeTrue(serverVersionAtLeast(4, 4));
assumeFalse(isStandalone());
@@ -755,14 +761,15 @@ public void shouldIgnoreWtimeoutMsOfWriteConcernToInitialAndSubsequentCommitTran
MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName());
+ int defaultTimeout = 200;
try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder()
- .defaultTimeout(200, TimeUnit.MILLISECONDS)
+ .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS)
.build())) {
session.startTransaction(TransactionOptions.builder()
.writeConcern(WriteConcern.ACKNOWLEDGED.withWTimeout(100, TimeUnit.MILLISECONDS))
.build());
collection.insertOne(session, new Document("x", 1));
- sleep(200);
+ sleep(defaultTimeout);
assertDoesNotThrow(session::commitTransaction);
//repeat commit.
@@ -805,12 +812,12 @@ public void shouldIgnoreWaitQueueTimeoutMSWhenTimeoutMsIsSet() {
+ " data: {"
+ " failCommands: [\"find\" ],"
+ " blockConnection: true,"
- + " blockTimeMS: " + 300
+ + " blockTimeMS: " + 450
+ " }"
+ "}");
- executor.submit(() -> collection.find().first());
- sleep(100);
+ executor.execute(() -> collection.find().first());
+ sleep(150);
//when && then
assertDoesNotThrow(() -> collection.find().first());
@@ -844,7 +851,7 @@ public void shouldThrowOperationTimeoutExceptionWhenConnectionIsNotAvailableAndT
+ " }"
+ "}");
- executor.submit(() -> collection.withTimeout(0, TimeUnit.MILLISECONDS).find().first());
+ executor.execute(() -> collection.withTimeout(0, TimeUnit.MILLISECONDS).find().first());
sleep(100);
//when && then
@@ -863,7 +870,7 @@ public void shouldUseWaitQueueTimeoutMSWhenTimeoutIsNotSet() {
//given
try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder()
.applyToConnectionPoolSettings(builder -> builder
- .maxWaitTime(100, TimeUnit.MILLISECONDS)
+ .maxWaitTime(20, TimeUnit.MILLISECONDS)
.maxSize(1)
))) {
MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName())
@@ -875,12 +882,12 @@ public void shouldUseWaitQueueTimeoutMSWhenTimeoutIsNotSet() {
+ " data: {"
+ " failCommands: [\"find\" ],"
+ " blockConnection: true,"
- + " blockTimeMS: " + 300
+ + " blockTimeMS: " + 400
+ " }"
+ "}");
- executor.submit(() -> collection.find().first());
- sleep(100);
+ executor.execute(() -> collection.find().first());
+ sleep(200);
//when & then
assertThrows(MongoTimeoutException.class, () -> collection.find().first());
@@ -896,7 +903,6 @@ public void testKillCursorsIsNotExecutedAfterGetMoreNetworkErrorWhenTimeoutMsIsN
assumeTrue(serverVersionAtLeast(4, 4));
assumeTrue(isLoadBalanced());
- long rtt = ClusterFixture.getPrimaryRTT();
collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions());
collectionHelper.insertDocuments(new Document(), new Document());
collectionHelper.runAdminCommand("{"
@@ -905,7 +911,7 @@ public void testKillCursorsIsNotExecutedAfterGetMoreNetworkErrorWhenTimeoutMsIsN
+ " data: {"
+ " failCommands: [\"getMore\" ],"
+ " blockConnection: true,"
- + " blockTimeMS: " + (rtt + 600)
+ + " blockTimeMS: " + 600
+ " }"
+ "}");
@@ -943,7 +949,6 @@ public void testKillCursorsIsNotExecutedAfterGetMoreNetworkError() {
assumeTrue(serverVersionAtLeast(4, 4));
assumeTrue(isLoadBalanced());
- long rtt = ClusterFixture.getPrimaryRTT();
collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions());
collectionHelper.insertDocuments(new Document(), new Document());
collectionHelper.runAdminCommand("{"
@@ -952,7 +957,7 @@ public void testKillCursorsIsNotExecutedAfterGetMoreNetworkError() {
+ " data: {"
+ " failCommands: [\"getMore\" ],"
+ " blockConnection: true,"
- + " blockTimeMS: " + (rtt + 600)
+ + " blockTimeMS: " + 600
+ " }"
+ "}");
@@ -1040,11 +1045,16 @@ public void shouldUseConnectTimeoutMsWhenEstablishingConnectionInBackground() {
+ " data: {"
+ " failCommands: [\"hello\", \"isMaster\"],"
+ " blockConnection: true,"
- + " blockTimeMS: " + 500
+ + " blockTimeMS: " + 500 + ","
+ // The appName is unique to prevent this failpoint from affecting ClusterFixture's ServerMonitor.
+ // Without the appName, ClusterFixture's heartbeats would be blocked, polluting RTT measurements with 500ms values,
+ // which would cause flakiness in other prose tests that use ClusterFixture.getPrimaryRTT() for timeout adjustments.
+ + " appName: \"connectTimeoutBackgroundTest\""
+ " }"
+ "}");
try (MongoClient ignored = createMongoClient(getMongoClientSettingsBuilder()
+ .applicationName("connectTimeoutBackgroundTest")
.applyToConnectionPoolSettings(builder -> builder.minSize(1))
// Use a very short timeout to ensure that the connection establishment will fail on the first handshake command.
.timeout(10, TimeUnit.MILLISECONDS))) {
@@ -1075,9 +1085,10 @@ private static Stream test8ServerSelectionArguments() {
}
private static Stream test8ServerSelectionHandshakeArguments() {
+
return Stream.of(
- Arguments.of("timeoutMS honored for connection handshake commands if it's lower than serverSelectionTimeoutMS", 200, 300),
- Arguments.of("serverSelectionTimeoutMS honored for connection handshake commands if it's lower than timeoutMS", 300, 200)
+ Arguments.of("timeoutMS honored for connection handshake commands if it's lower than serverSelectionTimeoutMS", 200, 500),
+ Arguments.of("serverSelectionTimeoutMS honored for connection handshake commands if it's lower than timeoutMS", 500, 200)
);
}
@@ -1088,7 +1099,8 @@ protected MongoNamespace generateNamespace() {
protected MongoClientSettings.Builder getMongoClientSettingsBuilder() {
commandListener.reset();
- return Fixture.getMongoClientSettingsBuilder()
+ MongoClientSettings.Builder mongoClientSettingsBuilder = Fixture.getMongoClientSettingsBuilder();
+ return mongoClientSettingsBuilder
.readConcern(ReadConcern.MAJORITY)
.writeConcern(WriteConcern.MAJORITY)
.readPreference(ReadPreference.primary())
@@ -1103,6 +1115,9 @@ public void setUp() {
gridFsChunksNamespace = new MongoNamespace(getDefaultDatabaseName(), GRID_FS_BUCKET_NAME + ".chunks");
collectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), namespace);
+ // in some test collection might not have been created yet, thus dropping it in afterEach will throw an error
+ collectionHelper.create();
+
filesCollectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), gridFsFileNamespace);
chunksCollectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), gridFsChunksNamespace);
commandListener = new TestCommandListener();
@@ -1112,10 +1127,13 @@ public void setUp() {
public void tearDown() throws InterruptedException {
ClusterFixture.disableFailPoint(FAIL_COMMAND_NAME);
if (collectionHelper != null) {
+ // Due to testing abortTransaction via failpoint, there may be open transactions
+ // after the test finishes, thus drop() command hangs for 60 seconds until transaction
+ // is automatically rolled back.
+ collectionHelper.runAdminCommand("{killAllSessions: []}");
collectionHelper.drop();
filesCollectionHelper.drop();
chunksCollectionHelper.drop();
- commandListener.reset();
try {
ServerHelper.checkPool(getPrimary());
} catch (InterruptedException e) {
@@ -1139,7 +1157,7 @@ private MongoClient createMongoClient(final MongoClientSettings.Builder builder)
return createMongoClient(builder.build());
}
- private long msElapsedSince(final long t1) {
+ protected long msElapsedSince(final long t1) {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t1);
}
diff --git a/driver-sync/src/test/functional/com/mongodb/observability/MicrometerProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractMicrometerProseTest.java
similarity index 57%
rename from driver-sync/src/test/functional/com/mongodb/observability/MicrometerProseTest.java
rename to driver-sync/src/test/functional/com/mongodb/client/AbstractMicrometerProseTest.java
index d4239aa44d7..746b0ffd8d9 100644
--- a/driver-sync/src/test/functional/com/mongodb/observability/MicrometerProseTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractMicrometerProseTest.java
@@ -14,44 +14,59 @@
* limitations under the License.
*/
-package com.mongodb.observability;
+package com.mongodb.client;
import com.mongodb.MongoClientSettings;
-import com.mongodb.client.Fixture;
-import com.mongodb.client.MongoClient;
-import com.mongodb.client.MongoClients;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
+import com.mongodb.lang.Nullable;
+import com.mongodb.observability.ObservabilitySettings;
+import com.mongodb.client.observability.SpanTree;
+import com.mongodb.client.observability.SpanTree.SpanNode;
import com.mongodb.observability.micrometer.MicrometerObservabilitySettings;
import io.micrometer.observation.ObservationRegistry;
+import io.micrometer.tracing.exporter.FinishedSpan;
import io.micrometer.tracing.test.reporter.inmemory.InMemoryOtelSetup;
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static com.mongodb.ClusterFixture.getDefaultDatabaseName;
+import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder;
+import static com.mongodb.internal.observability.micrometer.MongodbObservation.HighCardinalityKeyNames.QUERY_TEXT;
import static com.mongodb.internal.observability.micrometer.TracingManager.ENV_OBSERVABILITY_ENABLED;
import static com.mongodb.internal.observability.micrometer.TracingManager.ENV_OBSERVABILITY_QUERY_TEXT_MAX_LENGTH;
-import static com.mongodb.internal.observability.micrometer.MongodbObservation.HighCardinalityKeyNames.QUERY_TEXT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
- * Implementation of the prose tests for Micrometer OpenTelemetry tracing.
+ * Implementation of the prose tests
+ * for Micrometer OpenTelemetry tracing.
*/
-public class MicrometerProseTest {
+public abstract class AbstractMicrometerProseTest {
private final ObservationRegistry observationRegistry = ObservationRegistry.create();
private InMemoryOtelSetup memoryOtelSetup;
private InMemoryOtelSetup.Builder.OtelBuildingBlocks inMemoryOtel;
private static String previousEnvVarMdbTracingEnabled;
private static String previousEnvVarMdbQueryTextLength;
+ protected abstract MongoClient createMongoClient(MongoClientSettings settings);
+
@BeforeAll
static void beforeAll() {
// preserve original env var values
@@ -77,18 +92,19 @@ void tearDown() {
memoryOtelSetup.close();
}
+ @DisplayName("Test 1: Tracing Enable/Disable via Environment Variable")
@Test
void testControlOtelInstrumentationViaEnvironmentVariable() throws Exception {
setEnv(ENV_OBSERVABILITY_ENABLED, "false");
// don't enable command payload by default
- MongoClientSettings clientSettings = Fixture.getMongoClientSettingsBuilder()
+ MongoClientSettings clientSettings = getMongoClientSettingsBuilder()
.observabilitySettings(ObservabilitySettings.micrometerBuilder()
.observationRegistry(observationRegistry)
.build())
.build();
- try (MongoClient client = MongoClients.create(clientSettings)) {
+ try (MongoClient client = createMongoClient(clientSettings)) {
MongoDatabase database = client.getDatabase(getDefaultDatabaseName());
MongoCollection collection = database.getCollection("test");
collection.find().first();
@@ -98,7 +114,7 @@ void testControlOtelInstrumentationViaEnvironmentVariable() throws Exception {
}
setEnv(ENV_OBSERVABILITY_ENABLED, "true");
- try (MongoClient client = MongoClients.create(clientSettings)) {
+ try (MongoClient client = createMongoClient(clientSettings)) {
MongoDatabase database = client.getDatabase(getDefaultDatabaseName());
MongoCollection collection = database.getCollection("test");
collection.find().first();
@@ -114,6 +130,7 @@ void testControlOtelInstrumentationViaEnvironmentVariable() throws Exception {
}
}
+ @DisplayName("Test 2: Command Payload Emission via Environment Variable")
@Test
void testControlCommandPayloadViaEnvironmentVariable() throws Exception {
setEnv(ENV_OBSERVABILITY_QUERY_TEXT_MAX_LENGTH, "42");
@@ -123,13 +140,13 @@ void testControlCommandPayloadViaEnvironmentVariable() throws Exception {
.maxQueryTextLength(75) // should be overridden by env var
.build();
- MongoClientSettings clientSettings = Fixture.getMongoClientSettingsBuilder()
+ MongoClientSettings clientSettings = getMongoClientSettingsBuilder()
.observabilitySettings(ObservabilitySettings.micrometerBuilder()
.applySettings(settings)
.build()).
build();
- try (MongoClient client = MongoClients.create(clientSettings)) {
+ try (MongoClient client = createMongoClient(clientSettings)) {
MongoDatabase database = client.getDatabase(getDefaultDatabaseName());
MongoCollection collection = database.getCollection("test");
collection.find().first();
@@ -153,14 +170,14 @@ void testControlCommandPayloadViaEnvironmentVariable() throws Exception {
setEnv(ENV_OBSERVABILITY_QUERY_TEXT_MAX_LENGTH, null); // Unset the environment variable
- clientSettings = Fixture.getMongoClientSettingsBuilder()
+ clientSettings = getMongoClientSettingsBuilder()
.observabilitySettings(ObservabilitySettings.micrometerBuilder()
.observationRegistry(observationRegistry)
.maxQueryTextLength(42) // setting this will not matter since env var is not set and enableCommandPayloadTracing is false
.build())
.build();
- try (MongoClient client = MongoClients.create(clientSettings)) {
+ try (MongoClient client = createMongoClient(clientSettings)) {
MongoDatabase database = client.getDatabase(getDefaultDatabaseName());
MongoCollection collection = database.getCollection("test");
collection.find().first();
@@ -182,11 +199,11 @@ void testControlCommandPayloadViaEnvironmentVariable() throws Exception {
.maxQueryTextLength(7) // setting this will be used;
.build();
- clientSettings = Fixture.getMongoClientSettingsBuilder()
+ clientSettings = getMongoClientSettingsBuilder()
.observabilitySettings(settings)
.build();
- try (MongoClient client = MongoClients.create(clientSettings)) {
+ try (MongoClient client = createMongoClient(clientSettings)) {
MongoDatabase database = client.getDatabase(getDefaultDatabaseName());
MongoCollection collection = database.getCollection("test");
collection.find().first();
@@ -200,8 +217,108 @@ void testControlCommandPayloadViaEnvironmentVariable() throws Exception {
}
}
+ /**
+ * Verifies that concurrent operations produce isolated span trees with no cross-contamination.
+ * Each operation should get its own trace ID, correct parent-child linkage, and collection-specific tags,
+ * even when multiple operations execute simultaneously on the same client.
+ *
+ * This test is not from the specification.
+ */
+ @Test
+ void testConcurrentOperationsHaveSeparateSpans() throws Exception {
+ setEnv(ENV_OBSERVABILITY_ENABLED, "true");
+ int nbrConcurrentOps = 10;
+ MongoClientSettings clientSettings = getMongoClientSettingsBuilder()
+ .applyToConnectionPoolSettings(pool -> pool.maxSize(nbrConcurrentOps))
+ .observabilitySettings(ObservabilitySettings.micrometerBuilder()
+ .observationRegistry(observationRegistry)
+ .build())
+ .build();
+
+ try (MongoClient client = createMongoClient(clientSettings)) {
+ MongoDatabase database = client.getDatabase(getDefaultDatabaseName());
+
+ // Warm up connections so the concurrent phase doesn't include handshake overhead
+ for (int i = 0; i < nbrConcurrentOps; i++) {
+ database.getCollection("concurrent_test_" + i).find().first();
+ }
+ // Clear spans from warm-up before the actual concurrent test
+ memoryOtelSetup.close();
+ memoryOtelSetup = InMemoryOtelSetup.builder().register(observationRegistry);
+ inMemoryOtel = memoryOtelSetup.getBuildingBlocks();
+
+ ExecutorService executor = Executors.newFixedThreadPool(nbrConcurrentOps);
+ try {
+ CountDownLatch startLatch = new CountDownLatch(1);
+ List> futures = new ArrayList<>();
+
+ for (int i = 0; i < nbrConcurrentOps; i++) {
+ String collectionName = "concurrent_test_" + i;
+ futures.add(executor.submit(() -> {
+ try {
+ startLatch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ database.getCollection(collectionName).find().first();
+ }));
+ }
+
+ // Release all threads simultaneously to maximize concurrency
+ startLatch.countDown();
+
+ for (Future> future : futures) {
+ future.get(30, TimeUnit.SECONDS);
+ }
+ } finally {
+ executor.shutdown();
+ }
+
+ List allSpans = inMemoryOtel.getFinishedSpans();
+
+ // Each find() produces 2 spans: operation-level span + command-level span
+ assertEquals(nbrConcurrentOps * 2, allSpans.size(),
+ "Each concurrent operation should produce exactly 2 spans (operation + command).");
+
+ // Verify trace isolation: each independent operation should get its own traceId
+ Map> spansByTrace = allSpans.stream()
+ .collect(Collectors.groupingBy(FinishedSpan::getTraceId));
+ assertEquals(nbrConcurrentOps, spansByTrace.size(),
+ "Each concurrent operation should have its own distinct trace ID.");
+
+ // Use SpanTree to validate parent-child structure built from spanId/parentId linkage
+ SpanTree spanTree = SpanTree.from(allSpans);
+ List roots = spanTree.getRoots();
+
+ // Each operation span is a root; its command span is a child
+ assertEquals(nbrConcurrentOps, roots.size(),
+ "SpanTree should have one root per concurrent operation.");
+
+ Set observedCollections = new HashSet<>();
+ for (SpanNode root : roots) {
+ assertTrue(root.getName().startsWith("find " + getDefaultDatabaseName() + ".concurrent_test_"),
+ "Root span should be an operation span, but was: " + root.getName());
+
+ assertEquals(1, root.getChildren().size(),
+ "Each operation span should have exactly one child (command span).");
+ assertEquals("find", root.getChildren().get(0).getName(),
+ "Child span should be the command span 'find'.");
+
+ // Extract collection name from the operation span name to verify no cross-contamination
+ String collectionName = root.getName().substring(
+ ("find " + getDefaultDatabaseName() + ".").length());
+ assertTrue(observedCollections.add(collectionName),
+ "Each operation should target a unique collection, but found duplicate: " + collectionName);
+ }
+
+ assertEquals(nbrConcurrentOps, observedCollections.size(),
+ "All " + nbrConcurrentOps + " concurrent operations should be represented in distinct traces.");
+ }
+ }
+
@SuppressWarnings("unchecked")
- private static void setEnv(final String key, final String value) throws Exception {
+ private static void setEnv(final String key, @Nullable final String value) throws Exception {
// Get the unmodifiable Map from System.getenv()
Map env = System.getenv();
diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java
index 3682bd64ff0..910cf57edfd 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java
@@ -93,7 +93,7 @@ public void shouldCreateServerSessionOnlyAfterConnectionCheckout() throws Interr
.addCommandListener(new CommandListener() {
@Override
public void commandStarted(final CommandStartedEvent event) {
- lsidSet.add(event.getCommand().getDocument("lsid"));
+ lsidSet.add(event.getCommand().getDocument("lsid").clone());
}
})
.build())) {
diff --git a/driver-sync/src/test/functional/com/mongodb/client/csot/AbstractClientSideOperationsEncryptionTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/csot/AbstractClientSideOperationsEncryptionTimeoutProseTest.java
index dd45bc8ae2c..04303833bf5 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/csot/AbstractClientSideOperationsEncryptionTimeoutProseTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/csot/AbstractClientSideOperationsEncryptionTimeoutProseTest.java
@@ -93,14 +93,13 @@ public abstract class AbstractClientSideOperationsEncryptionTimeoutProseTest {
@Test
void shouldThrowOperationTimeoutExceptionWhenCreateDataKey() {
assumeTrue(serverVersionAtLeast(4, 4));
- long rtt = ClusterFixture.getPrimaryRTT();
Map> kmsProviders = new HashMap<>();
Map localProviderMap = new HashMap<>();
localProviderMap.put("key", Base64.getDecoder().decode(MASTER_KEY));
kmsProviders.put("local", localProviderMap);
- try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(rtt + 100))) {
+ try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(100))) {
keyVaultCollectionHelper.runAdminCommand("{"
+ " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\","
@@ -108,7 +107,7 @@ void shouldThrowOperationTimeoutExceptionWhenCreateDataKey() {
+ " data: {"
+ " failCommands: [\"insert\"],"
+ " blockConnection: true,"
- + " blockTimeMS: " + (rtt + 100)
+ + " blockTimeMS: " + 100
+ " }"
+ "}");
@@ -126,9 +125,8 @@ void shouldThrowOperationTimeoutExceptionWhenCreateDataKey() {
@Test
void shouldThrowOperationTimeoutExceptionWhenEncryptData() {
assumeTrue(serverVersionAtLeast(4, 4));
- long rtt = ClusterFixture.getPrimaryRTT();
- try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(rtt + 150))) {
+ try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(150))) {
clientEncryption.createDataKey("local");
@@ -138,7 +136,7 @@ void shouldThrowOperationTimeoutExceptionWhenEncryptData() {
+ " data: {"
+ " failCommands: [\"find\"],"
+ " blockConnection: true,"
- + " blockTimeMS: " + (rtt + 150)
+ + " blockTimeMS: " + 150
+ " }"
+ "}");
@@ -160,10 +158,9 @@ void shouldThrowOperationTimeoutExceptionWhenEncryptData() {
@Test
void shouldThrowOperationTimeoutExceptionWhenDecryptData() {
assumeTrue(serverVersionAtLeast(4, 4));
- long rtt = ClusterFixture.getPrimaryRTT();
BsonBinary encrypted;
- try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(rtt + 400))) {
+ try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(400))) {
clientEncryption.createDataKey("local");
BsonBinary dataKey = clientEncryption.createDataKey("local");
EncryptOptions encryptOptions = new EncryptOptions("AEAD_AES_256_CBC_HMAC_SHA_512-Deterministic");
@@ -171,14 +168,14 @@ void shouldThrowOperationTimeoutExceptionWhenDecryptData() {
encrypted = clientEncryption.encrypt(new BsonString("hello"), encryptOptions);
}
- try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(rtt + 400))) {
+ try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(400))) {
keyVaultCollectionHelper.runAdminCommand("{"
- + " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\","
+ + " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\","
+ " mode: { times: 1 },"
+ " data: {"
+ " failCommands: [\"find\"],"
+ " blockConnection: true,"
- + " blockTimeMS: " + (rtt + 500)
+ + " blockTimeMS: " + 500
+ " }"
+ "}");
commandListener.reset();
@@ -197,8 +194,7 @@ void shouldThrowOperationTimeoutExceptionWhenDecryptData() {
@Test
void shouldDecreaseOperationTimeoutForSubsequentOperations() {
assumeTrue(serverVersionAtLeast(4, 4));
- long rtt = ClusterFixture.getPrimaryRTT();
- long initialTimeoutMS = rtt + 2500;
+ long initialTimeoutMS = 2500;
keyVaultCollectionHelper.runAdminCommand("{"
+ " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\","
@@ -206,7 +202,7 @@ void shouldDecreaseOperationTimeoutForSubsequentOperations() {
+ " data: {"
+ " failCommands: [\"insert\", \"find\", \"listCollections\"],"
+ " blockConnection: true,"
- + " blockTimeMS: " + (rtt + 10)
+ + " blockTimeMS: " + 10
+ " }"
+ "}");
@@ -272,8 +268,7 @@ void shouldDecreaseOperationTimeoutForSubsequentOperations() {
void shouldThrowTimeoutExceptionWhenCreateEncryptedCollection(final String commandToTimeout) {
assumeTrue(serverVersionAtLeast(7, 0));
//given
- long rtt = ClusterFixture.getPrimaryRTT();
- long initialTimeoutMS = rtt + 200;
+ long initialTimeoutMS = 200;
try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder()
.timeout(initialTimeoutMS, MILLISECONDS))) {
diff --git a/driver-sync/src/test/functional/com/mongodb/client/observability/MicrometerProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/observability/MicrometerProseTest.java
new file mode 100644
index 00000000000..38bd4350b1d
--- /dev/null
+++ b/driver-sync/src/test/functional/com/mongodb/client/observability/MicrometerProseTest.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.client.observability;
+
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.AbstractMicrometerProseTest;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+
+/**
+ * Sync driver implementation of the Micrometer prose tests.
+ */
+public class MicrometerProseTest extends AbstractMicrometerProseTest {
+ @Override
+ protected MongoClient createMongoClient(final MongoClientSettings settings) {
+ return MongoClients.create(settings);
+ }
+}
diff --git a/driver-sync/src/test/functional/com/mongodb/observability/SpanTree.java b/driver-sync/src/test/functional/com/mongodb/client/observability/SpanTree.java
similarity index 98%
rename from driver-sync/src/test/functional/com/mongodb/observability/SpanTree.java
rename to driver-sync/src/test/functional/com/mongodb/client/observability/SpanTree.java
index aa6697bf3ad..7d3bff3224d 100644
--- a/driver-sync/src/test/functional/com/mongodb/observability/SpanTree.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/observability/SpanTree.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package com.mongodb.observability;
+package com.mongodb.client.observability;
import com.mongodb.lang.Nullable;
import io.micrometer.tracing.exporter.FinishedSpan;
@@ -204,6 +204,10 @@ private static void assertValid(final SpanNode reportedNode, final SpanNode expe
}
}
+ public List getRoots() {
+ return Collections.unmodifiableList(roots);
+ }
+
@Override
public String toString() {
return "SpanTree{"
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
index cf003078f04..35189aef455 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
@@ -28,7 +28,7 @@
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.model.Filters;
import com.mongodb.client.test.CollectionHelper;
-import com.mongodb.observability.SpanTree;
+import com.mongodb.client.observability.SpanTree;
import com.mongodb.client.unified.UnifiedTestModifications.TestDef;
import com.mongodb.client.vault.ClientEncryption;
import com.mongodb.connection.ClusterDescription;
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java
index 2225f837ec5..328c8298b6c 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java
@@ -63,6 +63,25 @@ public static void applyCustomizations(final TestDef def) {
.file("client-side-encryption/tests/unified", "client bulkWrite with queryable encryption");
// client-side-operation-timeout (CSOT)
+ def.retry("Unified CSOT tests do not account for RTT which varies in TLS vs non-TLS runs")
+ .whenFailureContains("timeout")
+ .test("client-side-operations-timeout",
+ "timeoutMS behaves correctly for non-tailable cursors",
+ "timeoutMS is refreshed for getMore if timeoutMode is iteration - success");
+
+ def.retry("Unified CSOT tests do not account for RTT which varies in TLS vs non-TLS runs")
+ .whenFailureContains("timeout")
+ .test("client-side-operations-timeout",
+ "timeoutMS behaves correctly for tailable non-awaitData cursors",
+ "timeoutMS is refreshed for getMore - success");
+
+ def.retry("Unified CSOT tests do not account for RTT which varies in TLS vs non-TLS runs")
+ .whenFailureContains("timeout")
+ .test("client-side-operations-timeout",
+ "timeoutMS behaves correctly for tailable non-awaitData cursors",
+ "timeoutMS is refreshed for getMore - success");
+
+ //TODO-invistigate
/*
As to the background connection pooling section:
timeoutMS set at the MongoClient level MUST be used as the timeout for all commands sent as part of the handshake.
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 8a08c34f213..b5e561c7f7e 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -18,10 +18,10 @@ aws-sdk-v2 = "2.30.31"
graal-sdk = "24.0.0"
jna = "5.11.0"
jnr-unixsocket = "0.38.17"
-netty-bom = "4.1.87.Final"
+netty-bom = "4.2.9.Final"
project-reactor-bom = "2022.0.0"
reactive-streams = "1.0.4"
-snappy = "1.1.10.3"
+snappy = "1.1.10.4"
zstd = "1.5.5-3"
jetbrains-annotations = "26.0.2"
micrometer-tracing = "1.6.0-M3" # This version has a fix for https://github.com/micrometer-metrics/tracing/issues/1092
diff --git a/testing/resources/specifications b/testing/resources/specifications
index de684cf1ef9..bb9dddd8176 160000
--- a/testing/resources/specifications
+++ b/testing/resources/specifications
@@ -1 +1 @@
-Subproject commit de684cf1ef9feede71d358cbb7d253840f1a8647
+Subproject commit bb9dddd8176eddbb9424f9bebedfe8c6bbf28c3a