diff --git a/.evergreen/.evg.yml b/.evergreen/.evg.yml index 525861928f3..da9d720de40 100644 --- a/.evergreen/.evg.yml +++ b/.evergreen/.evg.yml @@ -1939,16 +1939,18 @@ task_groups: setup_group: - func: "fetch-source" - func: "prepare-resources" + - func: "assume-aws-test-secrets-role" - command: subprocess.exec type: "setup" params: working_dir: "src" binary: bash - add_expansions_to_env: true + include_expansions_in_env: [ "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_SESSION_TOKEN" ] env: + CLUSTER_PREFIX: "dbx-java" MONGODB_VERSION: "8.0" args: - - ${DRIVERS_TOOLS}/.evergreen/atlas/setup-atlas-cluster.sh + - ${DRIVERS_TOOLS}/.evergreen/atlas/setup.sh - command: expansions.update params: file: src/atlas-expansion.yml @@ -1960,7 +1962,7 @@ task_groups: binary: bash add_expansions_to_env: true args: - - ${DRIVERS_TOOLS}/.evergreen/atlas/teardown-atlas-cluster.sh + - ${DRIVERS_TOOLS}/.evergreen/atlas/teardown.sh tasks: - "atlas-search-index-management-task" - "aws-lambda-deployed-task" diff --git a/bson/src/main/org/bson/BinaryVector.java b/bson/src/main/org/bson/BinaryVector.java index a1914601a9d..f5d57f5b241 100644 --- a/bson/src/main/org/bson/BinaryVector.java +++ b/bson/src/main/org/bson/BinaryVector.java @@ -32,6 +32,9 @@ * @since 5.3 */ public abstract class BinaryVector { + /** + * The BinaryVector logger + */ protected static final Logger LOGGER = Loggers.getLogger("BinaryVector"); private final DataType dataType; diff --git a/bson/src/main/org/bson/BsonBinary.java b/bson/src/main/org/bson/BsonBinary.java index 833a1b5ad29..0ece148eb2d 100644 --- a/bson/src/main/org/bson/BsonBinary.java +++ b/bson/src/main/org/bson/BsonBinary.java @@ -127,9 +127,14 @@ public BsonBinary(final UUID uuid, final UuidRepresentation uuidRepresentation) } /** - * Returns the binary as a UUID. The binary type must be 4. + * Returns the binary as a UUID. + * + *

Note:The BsonBinary subtype must be {@link BsonBinarySubType#UUID_STANDARD}.

* * @return the uuid + * @throws BsonInvalidOperationException if BsonBinary subtype is not {@link BsonBinarySubType#UUID_STANDARD} + * @see #asUuid(UuidRepresentation) + * @see BsonBinarySubType * @since 3.9 */ public UUID asUuid() { @@ -162,8 +167,15 @@ public BinaryVector asVector() { /** * Returns the binary as a UUID. * - * @param uuidRepresentation the UUID representation + *

Note:The BsonBinary subtype must be either {@link BsonBinarySubType#UUID_STANDARD} or + * {@link BsonBinarySubType#UUID_LEGACY}.

+ * + * @param uuidRepresentation the UUID representation, must be {@link UuidRepresentation#STANDARD} or + * {@link UuidRepresentation#JAVA_LEGACY} * @return the uuid + * @throws BsonInvalidOperationException if the BsonBinary subtype is incompatible with the given {@code uuidRepresentation}, or if + * the {@code uuidRepresentation} is not {@link UuidRepresentation#STANDARD} or + * {@link UuidRepresentation#JAVA_LEGACY}. * @since 3.9 */ public UUID asUuid(final UuidRepresentation uuidRepresentation) { diff --git a/bson/src/main/org/bson/BsonBinarySubType.java b/bson/src/main/org/bson/BsonBinarySubType.java index 08c29e2ef09..2a6eed1f5de 100644 --- a/bson/src/main/org/bson/BsonBinarySubType.java +++ b/bson/src/main/org/bson/BsonBinarySubType.java @@ -93,7 +93,7 @@ public enum BsonBinarySubType { * Returns true if the given value is a UUID subtype. * * @param value the subtype value as a byte. - * @return true if value is a UUID subtype. + * @return true if value has a {@link #UUID_STANDARD} or {@link #UUID_LEGACY} subtype. * @since 3.4 */ public static boolean isUuid(final byte value) { diff --git a/bson/src/main/org/bson/codecs/BsonDocumentCodec.java b/bson/src/main/org/bson/codecs/BsonDocumentCodec.java index 75bd3b7a2b0..172b0c94338 100644 --- a/bson/src/main/org/bson/codecs/BsonDocumentCodec.java +++ b/bson/src/main/org/bson/codecs/BsonDocumentCodec.java @@ -22,6 +22,7 @@ import org.bson.BsonType; import org.bson.BsonValue; import org.bson.BsonWriter; +import org.bson.RawBsonDocument; import org.bson.codecs.configuration.CodecRegistry; import org.bson.types.ObjectId; @@ -40,6 +41,7 @@ public class BsonDocumentCodec implements CollectibleCodec { private static final String ID_FIELD_NAME = "_id"; private static final CodecRegistry DEFAULT_REGISTRY = fromProviders(new BsonValueCodecProvider()); private static final BsonTypeCodecMap DEFAULT_BSON_TYPE_CODEC_MAP = new BsonTypeCodecMap(getBsonTypeClassMap(), DEFAULT_REGISTRY); + private static final RawBsonDocumentCodec RAW_BSON_DOCUMENT_CODEC = new RawBsonDocumentCodec(); private final CodecRegistry codecRegistry; private final BsonTypeCodecMap bsonTypeCodecMap; @@ -101,6 +103,10 @@ protected BsonValue readValue(final BsonReader reader, final DecoderContext deco @Override public void encode(final BsonWriter writer, final BsonDocument value, final EncoderContext encoderContext) { + if (value instanceof RawBsonDocument) { + RAW_BSON_DOCUMENT_CODEC.encode(writer, (RawBsonDocument) value, encoderContext); + return; + } writer.writeStartDocument(); beforeFields(writer, encoderContext, value); diff --git a/bson/src/test/unit/util/ThreadTestHelpers.java b/bson/src/test/unit/util/ThreadTestHelpers.java index e2115da079f..2428ee9074e 100644 --- a/bson/src/test/unit/util/ThreadTestHelpers.java +++ b/bson/src/test/unit/util/ThreadTestHelpers.java @@ -41,7 +41,7 @@ public static void executeAll(final Runnable... runnables) { CountDownLatch latch = new CountDownLatch(runnables.length); List failures = Collections.synchronizedList(new ArrayList<>()); for (final Runnable runnable : runnables) { - service.submit(() -> { + service.execute(() -> { try { runnable.run(); } catch (Throwable e) { diff --git a/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/AbstractBsonDocumentBenchmark.java b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/AbstractBsonDocumentBenchmark.java index 89f932f03cd..78e6e37f7f9 100644 --- a/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/AbstractBsonDocumentBenchmark.java +++ b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/AbstractBsonDocumentBenchmark.java @@ -61,7 +61,7 @@ public int getBytesPerRun() { return fileLength * NUM_INTERNAL_ITERATIONS; } - private byte[] getDocumentAsBuffer(final T document) throws IOException { + protected byte[] getDocumentAsBuffer(final T document) throws IOException { BasicOutputBuffer buffer = new BasicOutputBuffer(); codec.encode(new BsonBinaryWriter(buffer), document, EncoderContext.builder().build()); diff --git a/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/BenchmarkSuite.java b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/BenchmarkSuite.java index 2595568f148..c2a8ed9bafe 100644 --- a/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/BenchmarkSuite.java +++ b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/BenchmarkSuite.java @@ -71,6 +71,9 @@ private static void runBenchmarks() runBenchmark(new BsonDecodingBenchmark<>("Deep", "extended_bson/deep_bson.json", DOCUMENT_CODEC)); runBenchmark(new BsonDecodingBenchmark<>("Full", "extended_bson/full_bson.json", DOCUMENT_CODEC)); + runBenchmark(new RawBsonNestedEncodingBenchmark("Full RawBsonDocument in BsonDocument BSON Encoding", "extended_bson/full_bson.json")); + runBenchmark(new RawBsonArrayEncodingBenchmark("Full RawBsonDocument Array in BsonDocument BSON Encoding", "extended_bson/full_bson.json", 10)); + runBenchmark(new RunCommandBenchmark<>(DOCUMENT_CODEC)); runBenchmark(new FindOneBenchmark("single_and_multi_document/tweet.json", BenchmarkSuite.DOCUMENT_CLASS)); diff --git a/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/GridFSMultiFileDownloadBenchmark.java b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/GridFSMultiFileDownloadBenchmark.java index e39c0fb46ba..f8f66fe8b90 100644 --- a/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/GridFSMultiFileDownloadBenchmark.java +++ b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/GridFSMultiFileDownloadBenchmark.java @@ -97,7 +97,7 @@ public void run() throws Exception { CountDownLatch latch = new CountDownLatch(50); for (int i = 0; i < 50; i++) { - gridFSService.submit(exportFile(latch, i)); + gridFSService.execute(exportFile(latch, i)); } latch.await(1, TimeUnit.MINUTES); @@ -107,7 +107,7 @@ private Runnable exportFile(final CountDownLatch latch, final int fileId) { return () -> { UnsafeByteArrayOutputStream outputStream = new UnsafeByteArrayOutputStream(5242880); bucket.downloadToStream(GridFSMultiFileDownloadBenchmark.this.getFileName(fileId), outputStream); - fileService.submit(() -> { + fileService.execute(() -> { try { FileOutputStream fos = new FileOutputStream(new File(tempDirectory, String.format("%02d", fileId) + ".txt")); fos.write(outputStream.getByteArray()); @@ -124,7 +124,7 @@ private void importFiles() throws Exception { CountDownLatch latch = new CountDownLatch(50); for (int i = 0; i < 50; i++) { - fileService.submit(importFile(latch, i)); + fileService.execute(importFile(latch, i)); } latch.await(1, TimeUnit.MINUTES); diff --git a/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/GridFSMultiFileUploadBenchmark.java b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/GridFSMultiFileUploadBenchmark.java index cefdc7eaf1c..e2ee177847d 100644 --- a/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/GridFSMultiFileUploadBenchmark.java +++ b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/GridFSMultiFileUploadBenchmark.java @@ -75,7 +75,7 @@ public void run() throws Exception { CountDownLatch latch = new CountDownLatch(50); for (int i = 0; i < 50; i++) { - fileService.submit(importFile(latch, i)); + fileService.execute(importFile(latch, i)); } latch.await(1, TimeUnit.MINUTES); diff --git a/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/MultiFileExportBenchmark.java b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/MultiFileExportBenchmark.java index 30c74084419..d57829de45b 100644 --- a/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/MultiFileExportBenchmark.java +++ b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/MultiFileExportBenchmark.java @@ -109,7 +109,7 @@ public void run() throws Exception { CountDownLatch latch = new CountDownLatch(100); for (int i = 0; i < 100; i++) { - documentReadingService.submit(exportJsonFile(i, latch)); + documentReadingService.execute(exportJsonFile(i, latch)); } latch.await(1, TimeUnit.MINUTES); @@ -125,7 +125,7 @@ private Runnable exportJsonFile(final int fileId, final CountDownLatch latch) { List documents = collection.find(new BsonDocument("fileId", new BsonInt32(fileId))) .batchSize(5000) .into(new ArrayList<>(5000)); - fileWritingService.submit(writeJsonFile(fileId, documents, latch)); + fileWritingService.execute(writeJsonFile(fileId, documents, latch)); }; } @@ -154,7 +154,7 @@ private void importJsonFiles() throws InterruptedException { for (int i = 0; i < 100; i++) { int fileId = i; - importService.submit(() -> { + importService.execute(() -> { String resourcePath = "parallel/ldjson_multi/ldjson" + String.format("%03d", fileId) + ".txt"; try (BufferedReader reader = new BufferedReader(readFromRelativePath(resourcePath), 1024 * 64)) { String json; diff --git a/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/MultiFileImportBenchmark.java b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/MultiFileImportBenchmark.java index 03d1a721bee..d7afc54496d 100644 --- a/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/MultiFileImportBenchmark.java +++ b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/MultiFileImportBenchmark.java @@ -86,7 +86,7 @@ public void run() throws InterruptedException { CountDownLatch latch = new CountDownLatch(500); for (int i = 0; i < 100; i++) { - fileReadingService.submit(importJsonFile(latch, i)); + fileReadingService.execute(importJsonFile(latch, i)); } latch.await(1, TimeUnit.MINUTES); @@ -104,7 +104,7 @@ private Runnable importJsonFile(final CountDownLatch latch, final int fileId) { documents.add(document); if (documents.size() == 1000) { List documentsToInsert = documents; - documentWritingService.submit(() -> { + documentWritingService.execute(() -> { collection.insertMany(documentsToInsert, new InsertManyOptions().ordered(false)); latch.countDown(); }); diff --git a/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/RawBsonArrayEncodingBenchmark.java b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/RawBsonArrayEncodingBenchmark.java new file mode 100644 index 00000000000..0768f4f63c6 --- /dev/null +++ b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/RawBsonArrayEncodingBenchmark.java @@ -0,0 +1,55 @@ +/* + * Copyright 2016-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.mongodb.benchmark.benchmarks; + +import org.bson.BsonArray;import org.bson.BsonDocument; +import org.bson.RawBsonDocument; +import org.bson.codecs.BsonDocumentCodec; + +import java.io.IOException; + +public class RawBsonArrayEncodingBenchmark extends BsonEncodingBenchmark { + + private final int arraySize; + + public RawBsonArrayEncodingBenchmark(final String name, final String resourcePath, final int arraySize) { + super(name, resourcePath, new BsonDocumentCodec()); + this.arraySize = arraySize; + } + + @Override + public void setUp() throws IOException { + super.setUp(); + RawBsonDocument rawDoc = new RawBsonDocument(document, codec); + + BsonArray array = new BsonArray(); + for (int i = 0; i < arraySize; i++) { + array.add(rawDoc); + } + document = new BsonDocument("results", array); + + // Recalculate documentBytes for accurate throughput reporting + documentBytes = getDocumentAsBuffer(document); + + } + + @Override + public int getBytesPerRun() { + return documentBytes.length * NUM_INTERNAL_ITERATIONS; + } +} \ No newline at end of file diff --git a/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/RawBsonNestedEncodingBenchmark.java b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/RawBsonNestedEncodingBenchmark.java new file mode 100644 index 00000000000..3872c5888d9 --- /dev/null +++ b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/RawBsonNestedEncodingBenchmark.java @@ -0,0 +1,46 @@ +/* + * Copyright 2016-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.mongodb.benchmark.benchmarks; + +import org.bson.BsonDocument; +import org.bson.RawBsonDocument; +import org.bson.codecs.BsonDocumentCodec; + +import java.io.IOException; + +public class RawBsonNestedEncodingBenchmark extends BsonEncodingBenchmark { + + public RawBsonNestedEncodingBenchmark(final String name, final String resourcePath) { + super(name, resourcePath, new BsonDocumentCodec()); + } + + @Override + public void setUp() throws IOException { + super.setUp(); + + RawBsonDocument rawDoc = new RawBsonDocument(document, codec); + document = new BsonDocument("nested", rawDoc); + + documentBytes = getDocumentAsBuffer(document); + } + + @Override + public int getBytesPerRun() { + return documentBytes.length * NUM_INTERNAL_ITERATIONS; + } +} \ No newline at end of file diff --git a/driver-benchmarks/src/main/com/mongodb/benchmark/framework/MongoCryptBenchmarkRunner.java b/driver-benchmarks/src/main/com/mongodb/benchmark/framework/MongoCryptBenchmarkRunner.java index 718ab9f21af..a6c623364db 100644 --- a/driver-benchmarks/src/main/com/mongodb/benchmark/framework/MongoCryptBenchmarkRunner.java +++ b/driver-benchmarks/src/main/com/mongodb/benchmark/framework/MongoCryptBenchmarkRunner.java @@ -177,7 +177,7 @@ public List run() throws InterruptedException { for (int i = 0; i < threadCount; i++) { DecryptTask decryptTask = new DecryptTask(mongoCrypt, encrypted, NUM_SECS, doneSignal); decryptTasks.add(decryptTask); - executorService.submit(decryptTask); + executorService.execute(decryptTask); } // Await completion of all tasks. Tasks are expected to complete shortly after NUM_SECS. Time out `await` if time exceeds 2 * NUM_SECS. diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java index 81a0e59e277..2339cf18b86 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java @@ -1321,7 +1321,7 @@ private boolean initUnlessClosed() { boolean result = true; if (state == State.NEW) { worker = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncGetter")); - worker.submit(() -> runAndLogUncaught(this::workerRun)); + worker.execute(() -> runAndLogUncaught(this::workerRun)); state = State.INITIALIZED; } else if (state == State.CLOSED) { result = false; diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index aeef4e0a6a1..f10f471881b 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -454,9 +454,7 @@ private T sendAndReceiveInternal(final CommandMessage message, final Decoder () -> getDescription().getConnectionId() ); - boolean isLoggingCommandNeeded = isLoggingCommandNeeded(); - - if (isLoggingCommandNeeded) { + if (isLoggingCommandNeeded()) { commandEventSender = new LoggingCommandEventSender( SECURITY_SENSITIVE_COMMANDS, SECURITY_SENSITIVE_HELLO_COMMANDS, description, commandListener, operationContext, message, commandDocument, @@ -618,37 +616,71 @@ private void sendAndReceiveAsyncInternal(final CommandMessage message, final // Async try with resources release after the write ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(this); + Span tracingSpan = null; try { message.encode(bsonOutput, operationContext); - - String commandName; CommandEventSender commandEventSender; try (ByteBufBsonDocument commandDocument = message.getCommandDocument(bsonOutput)) { - commandName = commandDocument.getFirstKey(); + tracingSpan = operationContext + .getTracingManager() + .createTracingSpan(message, + operationContext, + commandDocument, + cmdName -> SECURITY_SENSITIVE_COMMANDS.contains(cmdName) + || SECURITY_SENSITIVE_HELLO_COMMANDS.contains(cmdName), + () -> getDescription().getServerAddress(), + () -> getDescription().getConnectionId() + ); + if (isLoggingCommandNeeded()) { commandEventSender = new LoggingCommandEventSender( SECURITY_SENSITIVE_COMMANDS, SECURITY_SENSITIVE_HELLO_COMMANDS, description, commandListener, operationContext, message, commandDocument, COMMAND_PROTOCOL_LOGGER, loggerSettings); + commandEventSender.sendStartedEvent(); } else { commandEventSender = new NoOpCommandEventSender(); } - commandEventSender.sendStartedEvent(); - } - List messageByteBuffers = getMessageByteBuffers(commandName, message, bsonOutput, operationContext); + boolean isTracingCommandPayloadNeeded = tracingSpan != null && operationContext.getTracingManager().isCommandPayloadEnabled(); + if (isTracingCommandPayloadNeeded) { + tracingSpan.tagHighCardinality(QUERY_TEXT.asString(), commandDocument); + } + + final Span commandSpan = tracingSpan; + SingleResultCallback tracingCallback = commandSpan == null ? callback : (result, t) -> { + try { + if (t != null) { + if (t instanceof MongoCommandException) { + commandSpan.tagLowCardinality( + RESPONSE_STATUS_CODE.withValue(String.valueOf(((MongoCommandException) t).getErrorCode()))); + } + commandSpan.error(t); + } + } finally { + commandSpan.end(); + callback.onResult(result, t); + } + }; + + List messageByteBuffers = getMessageByteBuffers(commandDocument.getFirstKey(), message, bsonOutput, operationContext); sendCommandMessageAsync(messageByteBuffers, message.getId(), decoder, operationContext, commandEventSender, message.isResponseExpected(), (r, t) -> { ResourceUtil.release(messageByteBuffers); bsonOutput.close(); // Close AFTER async write completes if (t != null) { - callback.onResult(null, t); + tracingCallback.onResult(null, t); } else { - callback.onResult(r, null); + tracingCallback.onResult(r, null); } }); + } } catch (Throwable t) { bsonOutput.close(); + if (tracingSpan != null) { + tracingSpan.error(t); + tracingSpan.end(); + } callback.onResult(null, t); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java index 72235b46760..99233dcc77e 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java +++ b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java @@ -251,12 +251,12 @@ public ByteBuf limit(final int newLimit) { @Override public ByteBuf asReadOnly() { - return new NettyByteBuf(proxied.asReadOnly().retain(), false); + return this; } @Override public ByteBuf duplicate() { - return new NettyByteBuf(proxied.retainedDuplicate(), isWriting); + return new NettyByteBuf(proxied.duplicate().retain(), isWriting); } @Override diff --git a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java index 76e10653454..e480363fc82 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java @@ -307,7 +307,8 @@ private void readAsync(final int numBytes, final AsyncCompletionHandler composite.addComponent(next); iter.remove(); } else { - composite.addComponent(next.readRetainedSlice(bytesNeededFromCurrentBuffer)); + next.retain(); + composite.addComponent(next.readSlice(bytesNeededFromCurrentBuffer)); } composite.writerIndex(composite.writerIndex() + bytesNeededFromCurrentBuffer); bytesNeeded -= bytesNeededFromCurrentBuffer; diff --git a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannel.java b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannel.java index 04114318f92..c1e3f067335 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannel.java +++ b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannel.java @@ -98,8 +98,8 @@ public void read(ByteBuffer dst, A attach, CompletionHandler group.submit(() -> handler.completed((int) c, attach)), - e -> group.submit(() -> handler.failed(e, attach))); + c -> group.execute(() -> handler.completed((int) c, attach)), + e -> group.execute(() -> handler.failed(e, attach))); } @Override @@ -119,8 +119,8 @@ public void read( new ByteBufferSet(dst), timeout, unit, - c -> group.submit(() -> handler.completed((int) c, attach)), - e -> group.submit(() -> handler.failed(e, attach))); + c -> group.execute(() -> handler.completed((int) c, attach)), + e -> group.execute(() -> handler.failed(e, attach))); } @Override @@ -145,8 +145,8 @@ public void read( bufferSet, timeout, unit, - c -> group.submit(() -> handler.completed(c, attach)), - e -> group.submit(() -> handler.failed(e, attach))); + c -> group.execute(() -> handler.completed(c, attach)), + e -> group.execute(() -> handler.failed(e, attach))); } @Override @@ -185,8 +185,8 @@ public void write(ByteBuffer src, A attach, CompletionHandler group.submit(() -> handler.completed((int) c, attach)), - e -> group.submit(() -> handler.failed(e, attach))); + c -> group.execute(() -> handler.completed((int) c, attach)), + e -> group.execute(() -> handler.failed(e, attach))); } @Override @@ -205,8 +205,8 @@ public void write( new ByteBufferSet(src), timeout, unit, - c -> group.submit(() -> handler.completed((int) c, attach)), - e -> group.submit(() -> handler.failed(e, attach))); + c -> group.execute(() -> handler.completed((int) c, attach)), + e -> group.execute(() -> handler.failed(e, attach))); } @Override @@ -228,8 +228,8 @@ public void write( bufferSet, timeout, unit, - c -> group.submit(() -> handler.completed(c, attach)), - e -> group.submit(() -> handler.failed(e, attach))); + c -> group.execute(() -> handler.completed(c, attach)), + e -> group.execute(() -> handler.failed(e, attach))); } @Override @@ -251,11 +251,11 @@ public Future write(ByteBuffer src) { } private void completeWithZeroInt(A attach, CompletionHandler handler) { - group.submit(() -> handler.completed(0, attach)); + group.execute(() -> handler.completed(0, attach)); } private void completeWithZeroLong(A attach, CompletionHandler handler) { - group.submit(() -> handler.completed(0L, attach)); + group.execute(() -> handler.completed(0L, attach)); } /** diff --git a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java index d9b1420a6e3..5150149fa6a 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java +++ b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java @@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -65,7 +66,7 @@ * instance of this class is a singleton-like object that manages a thread pool that makes it * possible to run a group of asynchronous channels. */ -public class AsynchronousTlsChannelGroup { +public class AsynchronousTlsChannelGroup implements Executor { private static final Logger LOGGER = Loggers.getLogger("connection.tls"); @@ -224,8 +225,16 @@ public AsynchronousTlsChannelGroup(@Nullable final ExecutorService executorServi selectorThread.start(); } - void submit(final Runnable r) { - executor.submit(r); + + @Override + public void execute(final Runnable r) { + executor.execute(() -> { + try { + r.run(); + } catch (Throwable t) { + LOGGER.error(null, t); + } + }); } RegisteredSocket registerSocket(TlsChannel reader, SocketChannel socketChannel) { diff --git a/driver-core/src/main/com/mongodb/internal/observability/micrometer/TracingManager.java b/driver-core/src/main/com/mongodb/internal/observability/micrometer/TracingManager.java index b26cb396e7b..4b08dd9a15c 100644 --- a/driver-core/src/main/com/mongodb/internal/observability/micrometer/TracingManager.java +++ b/driver-core/src/main/com/mongodb/internal/observability/micrometer/TracingManager.java @@ -39,6 +39,8 @@ import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.COMMAND_NAME; import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.CURSOR_ID; import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.NAMESPACE; +import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.OPERATION_NAME; +import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.OPERATION_SUMMARY; import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.NETWORK_TRANSPORT; import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.QUERY_SUMMARY; import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.SERVER_ADDRESS; @@ -266,4 +268,47 @@ public Span createTracingSpan(final CommandMessage message, return span; } + + /** + * Creates an operation-level tracing span for a database command. + *

+ * The span is named "{commandName} {database}[.{collection}]" and tagged with standard + * low-cardinality attributes (system, namespace, collection, operation name, operation summary). + * The span is also set on the {@link OperationContext} for use by downstream command-level tracing. + * + * @param transactionSpan the active transaction span (for parent context), or null + * @param operationContext the operation context to attach the span to + * @param commandName the name of the command (e.g. "find", "insert") + * @param namespace the MongoDB namespace for the operation + * @return the created span, or null if tracing is disabled + */ + @Nullable + public Span createOperationSpan(@Nullable final TransactionSpan transactionSpan, + final OperationContext operationContext, final String commandName, final MongoNamespace namespace) { + if (!isEnabled()) { + return null; + } + TraceContext parentContext = null; + if (transactionSpan != null) { + parentContext = transactionSpan.getContext(); + } + String name = commandName + " " + namespace.getDatabaseName() + + (MongoNamespaceHelper.COMMAND_COLLECTION_NAME.equalsIgnoreCase(namespace.getCollectionName()) + ? "" + : "." + namespace.getCollectionName()); + + KeyValues keyValues = KeyValues.of( + SYSTEM.withValue("mongodb"), + NAMESPACE.withValue(namespace.getDatabaseName())); + if (!MongoNamespaceHelper.COMMAND_COLLECTION_NAME.equalsIgnoreCase(namespace.getCollectionName())) { + keyValues = keyValues.and(COLLECTION.withValue(namespace.getCollectionName())); + } + keyValues = keyValues.and(OPERATION_NAME.withValue(commandName), + OPERATION_SUMMARY.withValue(name)); + + Span span = addSpan(name, parentContext, namespace); + span.tagLowCardinality(keyValues); + operationContext.setTracingSpan(span); + return span; + } } diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy index 2d7dc04d758..f1585f82595 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy @@ -52,6 +52,7 @@ class CommandHelperSpecification extends Specification { } def cleanup() { + InternalStreamConnection.setRecordEverything(false) connection?.close() } @@ -81,5 +82,4 @@ class CommandHelperSpecification extends Specification { !receivedDocument receivedException instanceof MongoCommandException } - } diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java b/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java index fc5926b3bad..81e778b4a61 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java @@ -127,7 +127,7 @@ public void shouldThrowOnTimeout() throws InterruptedException { // when TimeoutTrackingConnectionGetter connectionGetter = new TimeoutTrackingConnectionGetter(provider, timeoutSettings); - cachedExecutor.submit(connectionGetter); + cachedExecutor.execute(connectionGetter); connectionGetter.getLatch().await(); @@ -152,7 +152,7 @@ public void shouldNotUseMaxAwaitTimeMSWhenTimeoutMsIsSet() throws InterruptedExc // when TimeoutTrackingConnectionGetter connectionGetter = new TimeoutTrackingConnectionGetter(provider, timeoutSettings); - cachedExecutor.submit(connectionGetter); + cachedExecutor.execute(connectionGetter); sleep(70); // wait for more than maxWaitTimeMS but less than timeoutMs. internalConnection.close(); diff --git a/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java b/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java index be4526aada7..5f736f421c2 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java @@ -331,9 +331,10 @@ static Stream shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS() ); } - @ParameterizedTest - @MethodSource @DisplayName("should choose timeoutMS when timeoutMS is less than connectTimeoutMS") + @ParameterizedTest(name = "should choose timeoutMS when timeoutMS is less than connectTimeoutMS. " + + "Parameters: connectTimeoutMS: {0}, timeoutMS: {1}, expected: {2}") + @MethodSource void shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS(final Long connectTimeoutMS, final Long timeoutMS, final long expected) { @@ -345,7 +346,7 @@ void shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS(final Long connectTim 0)); long calculatedTimeoutMS = timeoutContext.getConnectTimeoutMs(); - assertTrue(expected - calculatedTimeoutMS <= 1); + assertTrue(expected - calculatedTimeoutMS <= 2); } private TimeoutContextTest() { diff --git a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ClientSession.kt b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ClientSession.kt index 6c53a1faf47..cbe308eece0 100644 --- a/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ClientSession.kt +++ b/driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ClientSession.kt @@ -19,6 +19,7 @@ import com.mongodb.ClientSessionOptions import com.mongodb.ServerAddress import com.mongodb.TransactionOptions import com.mongodb.internal.TimeoutContext +import com.mongodb.internal.observability.micrometer.TransactionSpan import com.mongodb.reactivestreams.client.ClientSession as reactiveClientSession import com.mongodb.session.ClientSession as jClientSession import com.mongodb.session.ServerSession @@ -58,6 +59,9 @@ public class ClientSession(public val wrapped: reactiveClientSession) : jClientS */ public fun notifyOperationInitiated(operation: Any): Unit = wrapped.notifyOperationInitiated(operation) + /** Get the transaction span (if started). */ + public fun getTransactionSpan(): TransactionSpan? = wrapped.transactionSpan + /** * Get the server address of the pinned mongos on this session. For internal use only. * diff --git a/driver-reactive-streams/build.gradle.kts b/driver-reactive-streams/build.gradle.kts index dab192e2583..b55dd95d683 100644 --- a/driver-reactive-streams/build.gradle.kts +++ b/driver-reactive-streams/build.gradle.kts @@ -15,6 +15,7 @@ */ import ProjectExtensions.configureJarManifest import ProjectExtensions.configureMavenPublication +import project.DEFAULT_JAVA_VERSION plugins { id("project.java") @@ -36,6 +37,9 @@ dependencies { implementation(libs.project.reactor.core) compileOnly(project(path = ":mongodb-crypt", configuration = "default")) + optionalImplementation(platform(libs.micrometer.observation.bom)) + optionalImplementation(libs.micrometer.observation) + testImplementation(libs.project.reactor.test) testImplementation(project(path = ":driver-sync", configuration = "default")) testImplementation(project(path = ":bson", configuration = "testArtifacts")) @@ -45,11 +49,20 @@ dependencies { // Reactive Streams TCK testing testImplementation(libs.reactive.streams.tck) - // Tracing + // Tracing testing testImplementation(platform(libs.micrometer.tracing.integration.test.bom)) testImplementation(libs.micrometer.tracing.integration.test) { exclude(group = "org.junit.jupiter") } } +tasks.withType { + // Needed for MicrometerProseTest to set env variable programmatically (calls + // `field.setAccessible(true)`) + val testJavaVersion: Int = findProperty("javaVersion")?.toString()?.toInt() ?: DEFAULT_JAVA_VERSION + if (testJavaVersion >= DEFAULT_JAVA_VERSION) { + jvmArgs("--add-opens=java.base/java.util=ALL-UNNAMED") + } +} + configureMavenPublication { pom { name.set("The MongoDB Reactive Streams Driver") diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/ClientSession.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/ClientSession.java index 3d9354e9ae9..fe58864fad0 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/ClientSession.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/ClientSession.java @@ -18,6 +18,8 @@ package com.mongodb.reactivestreams.client; import com.mongodb.TransactionOptions; +import com.mongodb.internal.observability.micrometer.TransactionSpan; +import com.mongodb.lang.Nullable; import org.reactivestreams.Publisher; /** @@ -94,4 +96,13 @@ public interface ClientSession extends com.mongodb.session.ClientSession { * @mongodb.server.release 4.0 */ Publisher abortTransaction(); + + /** + * Get the transaction span (if started). + * + * @return the transaction span + * @since 5.7 + */ + @Nullable + TransactionSpan getTransactionSpan(); } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionHelper.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionHelper.java index 30714a6a576..b5e94c02975 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionHelper.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionHelper.java @@ -18,6 +18,7 @@ import com.mongodb.ClientSessionOptions; import com.mongodb.TransactionOptions; +import com.mongodb.internal.observability.micrometer.TracingManager; import com.mongodb.internal.session.ServerSessionPool; import com.mongodb.lang.Nullable; import com.mongodb.reactivestreams.client.ClientSession; @@ -31,10 +32,13 @@ public class ClientSessionHelper { private final MongoClientImpl mongoClient; private final ServerSessionPool serverSessionPool; + private final TracingManager tracingManager; - public ClientSessionHelper(final MongoClientImpl mongoClient, final ServerSessionPool serverSessionPool) { + public ClientSessionHelper(final MongoClientImpl mongoClient, final ServerSessionPool serverSessionPool, + final TracingManager tracingManager) { this.mongoClient = mongoClient; this.serverSessionPool = serverSessionPool; + this.tracingManager = tracingManager; } Mono withClientSession(@Nullable final ClientSession clientSessionFromOperation, final OperationExecutor executor) { @@ -62,6 +66,6 @@ ClientSession createClientSession(final ClientSessionOptions options, final Oper .readPreference(mongoClient.getSettings().getReadPreference()) .build())) .build(); - return new ClientSessionPublisherImpl(serverSessionPool, mongoClient, mergedOptions, executor); + return new ClientSessionPublisherImpl(serverSessionPool, mongoClient, mergedOptions, executor, tracingManager); } } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java index 5cf0ea103bd..511f9f62c6b 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java @@ -24,6 +24,8 @@ import com.mongodb.TransactionOptions; import com.mongodb.WriteConcern; import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.observability.micrometer.TracingManager; +import com.mongodb.internal.observability.micrometer.TransactionSpan; import com.mongodb.internal.operation.AbortTransactionOperation; import com.mongodb.internal.operation.CommitTransactionOperation; import com.mongodb.internal.operation.ReadOperation; @@ -48,17 +50,21 @@ final class ClientSessionPublisherImpl extends BaseClientSessionImpl implements private final MongoClientImpl mongoClient; private final OperationExecutor executor; + private final TracingManager tracingManager; private TransactionState transactionState = TransactionState.NONE; private boolean messageSentInCurrentTransaction; private boolean commitInProgress; private TransactionOptions transactionOptions; + @Nullable + private TransactionSpan transactionSpan; ClientSessionPublisherImpl(final ServerSessionPool serverSessionPool, final MongoClientImpl mongoClient, - final ClientSessionOptions options, final OperationExecutor executor) { + final ClientSessionOptions options, final OperationExecutor executor, final TracingManager tracingManager) { super(serverSessionPool, mongoClient, options); this.executor = executor; this.mongoClient = mongoClient; + this.tracingManager = tracingManager; } @Override @@ -128,6 +134,10 @@ public void startTransaction(final TransactionOptions transactionOptions) { if (!writeConcern.isAcknowledged()) { throw new MongoClientException("Transactions do not support unacknowledged write concern"); } + + if (tracingManager.isEnabled()) { + transactionSpan = new TransactionSpan(tracingManager); + } clearTransactionContext(); setTimeoutContext(timeoutContext); } @@ -152,6 +162,9 @@ public Publisher commitTransaction() { } if (!messageSentInCurrentTransaction) { cleanupTransaction(TransactionState.COMMITTED); + if (transactionSpan != null) { + transactionSpan.finalizeTransactionSpan(TransactionState.COMMITTED.name()); + } return Mono.create(MonoSink::success); } else { ReadConcern readConcern = transactionOptions.getReadConcern(); @@ -171,7 +184,17 @@ public Publisher commitTransaction() { commitInProgress = false; transactionState = TransactionState.COMMITTED; }) - .doOnError(MongoException.class, this::clearTransactionContextOnError); + .doOnError(MongoException.class, e -> { + clearTransactionContextOnError(e); + if (transactionSpan != null) { + transactionSpan.handleTransactionSpanError(e); + } + }) + .doOnSuccess(v -> { + if (transactionSpan != null) { + transactionSpan.finalizeTransactionSpan(TransactionState.COMMITTED.name()); + } + }); } }); } @@ -191,6 +214,9 @@ public Publisher abortTransaction() { } if (!messageSentInCurrentTransaction) { cleanupTransaction(TransactionState.ABORTED); + if (transactionSpan != null) { + transactionSpan.finalizeTransactionSpan(TransactionState.ABORTED.name()); + } return Mono.create(MonoSink::success); } else { ReadConcern readConcern = transactionOptions.getReadConcern(); @@ -208,6 +234,9 @@ public Publisher abortTransaction() { .doOnTerminate(() -> { clearTransactionContext(); cleanupTransaction(TransactionState.ABORTED); + if (transactionSpan != null) { + transactionSpan.finalizeTransactionSpan(TransactionState.ABORTED.name()); + } }); } }); @@ -219,6 +248,12 @@ private void clearTransactionContextOnError(final MongoException e) { } } + @Override + @Nullable + public TransactionSpan getTransactionSpan() { + return transactionSpan; + } + @Override public void close() { if (transactionState == TransactionState.IN) { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java index 07a17badcd7..8fda2e9294d 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java @@ -33,6 +33,7 @@ import com.mongodb.internal.connection.Cluster; import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; +import com.mongodb.internal.observability.micrometer.TracingManager; import com.mongodb.internal.session.ServerSessionPool; import com.mongodb.lang.Nullable; import com.mongodb.reactivestreams.client.ChangeStreamPublisher; @@ -88,9 +89,10 @@ private MongoClientImpl(final MongoClientSettings settings, final MongoDriverInf notNull("settings", settings); notNull("cluster", cluster); + TracingManager tracingManager = new TracingManager(settings.getObservabilitySettings()); TimeoutSettings timeoutSettings = TimeoutSettings.create(settings); ServerSessionPool serverSessionPool = new ServerSessionPool(cluster, timeoutSettings, settings.getServerApi()); - ClientSessionHelper clientSessionHelper = new ClientSessionHelper(this, serverSessionPool); + ClientSessionHelper clientSessionHelper = new ClientSessionHelper(this, serverSessionPool, tracingManager); AutoEncryptionSettings autoEncryptSettings = settings.getAutoEncryptionSettings(); Crypt crypt = autoEncryptSettings != null ? Crypts.createCrypt(settings, autoEncryptSettings) : null; @@ -100,7 +102,8 @@ private MongoClientImpl(final MongoClientSettings settings, final MongoDriverInf + ReactiveContextProvider.class.getName() + " when using the Reactive Streams driver"); } OperationExecutor operationExecutor = executor != null ? executor - : new OperationExecutorImpl(this, clientSessionHelper, timeoutSettings, (ReactiveContextProvider) contextProvider); + : new OperationExecutorImpl(this, clientSessionHelper, timeoutSettings, (ReactiveContextProvider) contextProvider, + tracingManager); MongoOperationPublisher mongoOperationPublisher = new MongoOperationPublisher<>(Document.class, withUuidRepresentation(settings.getCodecRegistry(), settings.getUuidRepresentation()), diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java index ef18c2c6b1f..62a4431cc9a 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java @@ -31,10 +31,11 @@ import com.mongodb.internal.binding.AsyncReadWriteBinding; import com.mongodb.internal.connection.OperationContext; import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext; +import com.mongodb.internal.observability.micrometer.Span; +import com.mongodb.internal.observability.micrometer.TracingManager; import com.mongodb.internal.operation.OperationHelper; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; -import com.mongodb.internal.observability.micrometer.TracingManager; import com.mongodb.lang.Nullable; import com.mongodb.reactivestreams.client.ClientSession; import com.mongodb.reactivestreams.client.ReactiveContextProvider; @@ -63,13 +64,16 @@ public class OperationExecutorImpl implements OperationExecutor { @Nullable private final ReactiveContextProvider contextProvider; private final TimeoutSettings timeoutSettings; + private final TracingManager tracingManager; OperationExecutorImpl(final MongoClientImpl mongoClient, final ClientSessionHelper clientSessionHelper, - final TimeoutSettings timeoutSettings, @Nullable final ReactiveContextProvider contextProvider) { + final TimeoutSettings timeoutSettings, @Nullable final ReactiveContextProvider contextProvider, + final TracingManager tracingManager) { this.mongoClient = mongoClient; this.clientSessionHelper = clientSessionHelper; this.timeoutSettings = timeoutSettings; this.contextProvider = contextProvider; + this.tracingManager = tracingManager; } @Override @@ -93,22 +97,37 @@ public Mono execute(final ReadOperation operation, final ReadPrefer OperationContext operationContext = getOperationContext(requestContext, actualClientSession, readConcern, operation.getCommandName()) .withSessionContext(new ClientSessionBinding.AsyncClientSessionContext(actualClientSession, isImplicitSession(session), readConcern)); + Span span = tracingManager.createOperationSpan(actualClientSession.getTransactionSpan(), + operationContext, operation.getCommandName(), operation.getNamespace()); if (session != null && session.hasActiveTransaction() && !binding.getReadPreference().equals(primary())) { binding.release(); - return Mono.error(new MongoClientException("Read preference in a transaction must be primary")); + MongoClientException error = new MongoClientException("Read preference in a transaction must be primary"); + if (span != null) { + span.error(error); + span.end(); + } + return Mono.error(error); } else { return Mono.create(sink -> operation.executeAsync(binding, operationContext, (result, t) -> { try { binding.release(); } finally { + if (t != null) { + Throwable exceptionToHandle = t instanceof MongoException + ? OperationHelper.unwrap((MongoException) t) : t; + labelException(session, exceptionToHandle); + unpinServerAddressOnTransientTransactionError(session, exceptionToHandle); + if (span != null) { + span.error(t); + } + } + if (span != null) { + span.end(); + } sinkToCallback(sink).onResult(result, t); } - })).doOnError((t) -> { - Throwable exceptionToHandle = t instanceof MongoException ? OperationHelper.unwrap((MongoException) t) : t; - labelException(session, exceptionToHandle); - unpinServerAddressOnTransientTransactionError(session, exceptionToHandle); - }); + })); } }).subscribe(subscriber) ); @@ -133,18 +152,28 @@ public Mono execute(final WriteOperation operation, final ReadConcern OperationContext operationContext = getOperationContext(requestContext, actualClientSession, readConcern, operation.getCommandName()) .withSessionContext(new ClientSessionBinding.AsyncClientSessionContext(actualClientSession, isImplicitSession(session), readConcern)); + Span span = tracingManager.createOperationSpan(actualClientSession.getTransactionSpan(), + operationContext, operation.getCommandName(), operation.getNamespace()); return Mono.create(sink -> operation.executeAsync(binding, operationContext, (result, t) -> { try { binding.release(); } finally { + if (t != null) { + Throwable exceptionToHandle = t instanceof MongoException + ? OperationHelper.unwrap((MongoException) t) : t; + labelException(session, exceptionToHandle); + unpinServerAddressOnTransientTransactionError(session, exceptionToHandle); + if (span != null) { + span.error(t); + } + } + if (span != null) { + span.end(); + } sinkToCallback(sink).onResult(result, t); } - })).doOnError((t) -> { - Throwable exceptionToHandle = t instanceof MongoException ? OperationHelper.unwrap((MongoException) t) : t; - labelException(session, exceptionToHandle); - unpinServerAddressOnTransientTransactionError(session, exceptionToHandle); - }); + })); } ).subscribe(subscriber) ); @@ -155,7 +184,7 @@ public OperationExecutor withTimeoutSettings(final TimeoutSettings newTimeoutSet if (Objects.equals(timeoutSettings, newTimeoutSettings)) { return this; } - return new OperationExecutorImpl(mongoClient, clientSessionHelper, newTimeoutSettings, contextProvider); + return new OperationExecutorImpl(mongoClient, clientSessionHelper, newTimeoutSettings, contextProvider, tracingManager); } @Override @@ -214,7 +243,7 @@ private OperationContext getOperationContext(final RequestContext requestContext requestContext, new ReadConcernAwareNoOpSessionContext(readConcern), createTimeoutContext(session, timeoutSettings), - TracingManager.NO_OP, + tracingManager, mongoClient.getSettings().getServerApi(), commandName); } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/TimeoutHelper.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/TimeoutHelper.java index bc4da3026a9..cefdf7184d8 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/TimeoutHelper.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/TimeoutHelper.java @@ -55,8 +55,14 @@ public static MongoCollection collectionWithTimeout(final MongoCollection public static Mono> collectionWithTimeoutMono(final MongoCollection collection, @Nullable final Timeout timeout) { + return collectionWithTimeoutMono(collection, timeout, DEFAULT_TIMEOUT_MESSAGE); + } + + public static Mono> collectionWithTimeoutMono(final MongoCollection collection, + @Nullable final Timeout timeout, + final String message) { try { - return Mono.just(collectionWithTimeout(collection, timeout)); + return Mono.just(collectionWithTimeout(collection, timeout, message)); } catch (MongoOperationTimeoutException e) { return Mono.error(e); } @@ -64,9 +70,14 @@ public static Mono> collectionWithTimeoutMono(final Mongo public static Mono> collectionWithTimeoutDeferred(final MongoCollection collection, @Nullable final Timeout timeout) { - return Mono.defer(() -> collectionWithTimeoutMono(collection, timeout)); + return collectionWithTimeoutDeferred(collection, timeout, DEFAULT_TIMEOUT_MESSAGE); } + public static Mono> collectionWithTimeoutDeferred(final MongoCollection collection, + @Nullable final Timeout timeout, + final String message) { + return Mono.defer(() -> collectionWithTimeoutMono(collection, timeout, message)); + } public static MongoDatabase databaseWithTimeout(final MongoDatabase database, @Nullable final Timeout timeout) { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java index 7d9a46cdf3f..50586e92102 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java @@ -54,7 +54,8 @@ */ public final class GridFSUploadPublisherImpl implements GridFSUploadPublisher { - private static final String TIMEOUT_ERROR_MESSAGE = "Saving chunks exceeded the timeout limit."; + private static final String TIMEOUT_ERROR_MESSAGE_CHUNKS_SAVING = "Saving chunks exceeded the timeout limit."; + private static final String TIMEOUT_ERROR_MESSAGE_UPLOAD_CANCELLATION = "Upload cancellation exceeded the timeout limit."; private static final Document PROJECTION = new Document("_id", 1); private static final Document FILES_INDEX = new Document("filename", 1).append("uploadDate", 1); private static final Document CHUNKS_INDEX = new Document("files_id", 1).append("n", 1); @@ -226,8 +227,8 @@ private Mono createSaveChunksMono(final AtomicBoolean terminated, @Nullabl .append("data", data); Publisher insertOnePublisher = clientSession == null - ? collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).insertOne(chunkDocument) - : collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE) + ? collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE_CHUNKS_SAVING).insertOne(chunkDocument) + : collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE_CHUNKS_SAVING) .insertOne(clientSession, chunkDocument); return Mono.from(insertOnePublisher).thenReturn(data.length()); @@ -270,7 +271,8 @@ private Mono createSaveFileDataMono(final AtomicBoolean termina } private Mono createCancellationMono(final AtomicBoolean terminated, @Nullable final Timeout timeout) { - Mono> chunksCollectionMono = collectionWithTimeoutDeferred(chunksCollection, timeout); + Mono> chunksCollectionMono = collectionWithTimeoutDeferred(chunksCollection, timeout, + TIMEOUT_ERROR_MESSAGE_UPLOAD_CANCELLATION); if (terminated.compareAndSet(false, true)) { if (clientSession != null) { return chunksCollectionMono.flatMap(collection -> Mono.from(collection diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java index b922ec20b71..90446953fc1 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java @@ -16,7 +16,6 @@ package com.mongodb.reactivestreams.client; -import com.mongodb.ClusterFixture; import com.mongodb.MongoClientSettings; import com.mongodb.MongoCommandException; import com.mongodb.MongoNamespace; @@ -24,7 +23,6 @@ import com.mongodb.ReadPreference; import com.mongodb.WriteConcern; import com.mongodb.client.AbstractClientSideOperationsTimeoutProseTest; -import com.mongodb.client.model.CreateCollectionOptions; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.event.CommandFailedEvent; import com.mongodb.event.CommandStartedEvent; @@ -43,6 +41,7 @@ import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Hooks; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import java.nio.ByteBuffer; @@ -58,12 +57,16 @@ import static com.mongodb.ClusterFixture.TIMEOUT_DURATION; import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet; +import static com.mongodb.ClusterFixture.isStandalone; import static com.mongodb.ClusterFixture.serverVersionAtLeast; import static com.mongodb.ClusterFixture.sleep; +import static com.mongodb.assertions.Assertions.assertTrue; import static java.util.Collections.singletonList; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assumptions.assumeFalse; import static org.junit.jupiter.api.Assumptions.assumeTrue; @@ -104,7 +107,6 @@ protected boolean isAsync() { @Override public void testGridFSUploadViaOpenUploadStreamTimeout() { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); //given collectionHelper.runAdminCommand("{" @@ -113,12 +115,12 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { + " data: {" + " failCommands: [\"insert\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 405) + + " blockTimeMS: " + 600 + " }" + "}"); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(rtt + 400, TimeUnit.MILLISECONDS))) { + .timeout(600, TimeUnit.MILLISECONDS))) { MongoDatabase database = client.getDatabase(gridFsFileNamespace.getDatabaseName()); GridFSBucket gridFsBucket = createReaciveGridFsBucket(database, GRID_FS_BUCKET_NAME); @@ -158,7 +160,6 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { @Override public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, InterruptedException, TimeoutException { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); //given CompletableFuture droppedErrorFuture = new CompletableFuture<>(); @@ -170,12 +171,12 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I + " data: {" + " failCommands: [\"delete\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 405) + + " blockTimeMS: " + 405 + " }" + "}"); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(rtt + 400, TimeUnit.MILLISECONDS))) { + .timeout(400, TimeUnit.MILLISECONDS))) { MongoDatabase database = client.getDatabase(gridFsFileNamespace.getDatabaseName()); GridFSBucket gridFsBucket = createReaciveGridFsBucket(database, GRID_FS_BUCKET_NAME); @@ -198,12 +199,25 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I //then Throwable droppedError = droppedErrorFuture.get(TIMEOUT_DURATION.toMillis(), TimeUnit.MILLISECONDS); Throwable commandError = droppedError.getCause(); - assertInstanceOf(MongoOperationTimeoutException.class, commandError); CommandFailedEvent deleteFailedEvent = commandListener.getCommandFailedEvent("delete"); assertNotNull(deleteFailedEvent); - assertEquals(commandError, commandListener.getCommandFailedEvent("delete").getThrowable()); + CommandStartedEvent deleteStartedEvent = commandListener.getCommandStartedEvent("delete"); + assertTrue(deleteStartedEvent.getCommand().containsKey("maxTimeMS"), "Expected delete command to have maxTimeMS"); + long deleteMaxTimeMS = deleteStartedEvent + .getCommand() + .get("maxTimeMS") + .asNumber() + .longValue(); + + assertTrue(deleteMaxTimeMS <= 420 + // some leeway for timing variations, when compression is used it is often less then 300. + // Without it, it is more than 300. + && deleteMaxTimeMS >= 150, + "Expected maxTimeMS for delete command to be between 150s and 420ms, " + "but was: " + deleteMaxTimeMS + "ms"); + assertEquals(commandError, deleteFailedEvent.getThrowable()); + // When subscription is cancelled, we should not receive any more events. testSubscriber.assertNoTerminalEvent(); } @@ -219,9 +233,8 @@ public void testTimeoutMSAppliesToFullResumeAttemptInNextCall() { assumeTrue(isDiscoverableReplicaSet()); //given - long rtt = ClusterFixture.getPrimaryRTT(); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(rtt + 500, TimeUnit.MILLISECONDS))) { + .timeout(500, TimeUnit.MILLISECONDS))) { MongoNamespace namespace = generateNamespace(); MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) @@ -273,9 +286,8 @@ public void testTimeoutMSAppliedToInitialAggregate() { assumeTrue(isDiscoverableReplicaSet()); //given - long rtt = ClusterFixture.getPrimaryRTT(); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(rtt + 200, TimeUnit.MILLISECONDS))) { + .timeout(200, TimeUnit.MILLISECONDS))) { MongoNamespace namespace = generateNamespace(); MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) @@ -290,7 +302,7 @@ public void testTimeoutMSAppliedToInitialAggregate() { + " data: {" + " failCommands: [\"aggregate\" ]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 201) + + " blockTimeMS: " + 201 + " }" + "}"); @@ -321,13 +333,10 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { //given BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); - collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); sleep(2000); - - long rtt = ClusterFixture.getPrimaryRTT(); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(rtt + 300, TimeUnit.MILLISECONDS))) { + .timeout(500, TimeUnit.MILLISECONDS))) { MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); @@ -338,7 +347,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { + " data: {" + " failCommands: [\"getMore\", \"aggregate\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 200) + + " blockTimeMS: " + 200 + " }" + "}"); @@ -389,12 +398,10 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() { //given BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); - collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); sleep(2000); - long rtt = ClusterFixture.getPrimaryRTT(); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(rtt + 300, TimeUnit.MILLISECONDS))) { + .timeout(500, TimeUnit.MILLISECONDS))) { MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()) @@ -406,7 +413,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() { + " data: {" + " failCommands: [\"aggregate\", \"getMore\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 200) + + " blockTimeMS: " + 200 + " }" + "}"); @@ -449,9 +456,8 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt assumeTrue(isDiscoverableReplicaSet()); //given - long rtt = ClusterFixture.getPrimaryRTT(); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(rtt + 2500, TimeUnit.MILLISECONDS))) { + .timeout(2500, TimeUnit.MILLISECONDS))) { MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); @@ -468,7 +474,78 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt List commandStartedEvents = commandListener.getCommandStartedEvents(); assertCommandStartedEventsInOder(Arrays.asList("aggregate", "getMore", "getMore", "getMore", "killCursors"), commandStartedEvents); - assertOnlyOneCommandTimeoutFailure("getMore"); + + } + } + + @DisplayName("9. End Session. The timeout specified via the MongoClient timeoutMS option") + @Test + @Override + public void test9EndSessionClientTimeout() { + assumeTrue(serverVersionAtLeast(4, 4)); + assumeFalse(isStandalone()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 1 }," + + " data: {" + + " failCommands: [\"abortTransaction\"]," + + " blockConnection: true," + + " blockTimeMS: " + 400 + + " }" + + "}"); + + try (MongoClient mongoClient = createReactiveClient(getMongoClientSettingsBuilder().retryWrites(false) + .timeout(300, TimeUnit.MILLISECONDS))) { + MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()); + + try (ClientSession session = Mono.from(mongoClient.startSession()).block()) { + session.startTransaction(); + Mono.from(collection.insertOne(session, new Document("x", 1))).block(); + } + + sleep(postSessionCloseSleep()); + CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> commandListener.getCommandFailedEvent("abortTransaction")); + long elapsedTime = abortTransactionEvent.getElapsedTime(TimeUnit.MILLISECONDS); + assertInstanceOf(MongoOperationTimeoutException.class, abortTransactionEvent.getThrowable()); + assertTrue(elapsedTime <= 400, "Took too long to time out, elapsedMS: " + elapsedTime); + } + } + + @Test + @DisplayName("9. End Session. The timeout specified via the ClientSession defaultTimeoutMS option") + @Override + public void test9EndSessionSessionTimeout() { + assumeTrue(serverVersionAtLeast(4, 4)); + assumeFalse(isStandalone()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 1 }," + + " data: {" + + " failCommands: [\"abortTransaction\"]," + + " blockConnection: true," + + " blockTimeMS: " + 400 + + " }" + + "}"); + + try (MongoClient mongoClient = createReactiveClient(getMongoClientSettingsBuilder())) { + MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()); + + try (ClientSession session = Mono.from(mongoClient.startSession(com.mongodb.ClientSessionOptions.builder() + .defaultTimeout(300, TimeUnit.MILLISECONDS).build())).block()) { + + session.startTransaction(); + Mono.from(collection.insertOne(session, new Document("x", 1))).block(); + } + + sleep(postSessionCloseSleep()); + CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> commandListener.getCommandFailedEvent("abortTransaction")); + long elapsedTime = abortTransactionEvent.getElapsedTime(TimeUnit.MILLISECONDS); + assertInstanceOf(MongoOperationTimeoutException.class, abortTransactionEvent.getThrowable()); + assertTrue(elapsedTime <= 400, "Took too long to time out, elapsedMS: " + elapsedTime); } } @@ -512,6 +589,6 @@ public void tearDown() throws InterruptedException { @Override protected int postSessionCloseSleep() { - return 256; + return 1000; } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/observability/MicrometerProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/observability/MicrometerProseTest.java new file mode 100644 index 00000000000..c58bb98f2cc --- /dev/null +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/observability/MicrometerProseTest.java @@ -0,0 +1,32 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.reactivestreams.client.observability; + +import com.mongodb.MongoClientSettings; +import com.mongodb.client.AbstractMicrometerProseTest; +import com.mongodb.client.MongoClient; +import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; + +/** + * Reactive Streams driver implementation of the Micrometer prose tests. + */ +public class MicrometerProseTest extends AbstractMicrometerProseTest { + @Override + protected MongoClient createMongoClient(final MongoClientSettings settings) { + return new SyncMongoClient(settings); + } +} diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncClientSession.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncClientSession.java index e1d765150a7..473d57a3878 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncClientSession.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncClientSession.java @@ -192,7 +192,7 @@ public TimeoutContext getTimeoutContext() { @Override @Nullable public TransactionSpan getTransactionSpan() { - return null; + return wrapped.getTransactionSpan(); } private static void sleep(final long millis) { diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/MicrometerTracingTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/MicrometerTracingTest.java new file mode 100644 index 00000000000..bf2e6205ad6 --- /dev/null +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/MicrometerTracingTest.java @@ -0,0 +1,27 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.reactivestreams.client.unified; + +import org.junit.jupiter.params.provider.Arguments; + +import java.util.Collection; + +final class MicrometerTracingTest extends UnifiedReactiveStreamsTest { + private static Collection data() { + return getTestData("open-telemetry/tests"); + } +} diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MongoClientImplTest.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MongoClientImplTest.java index c192ae17896..0fda131f4ff 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MongoClientImplTest.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MongoClientImplTest.java @@ -25,6 +25,7 @@ import com.mongodb.internal.connection.ClientMetadata; import com.mongodb.internal.connection.Cluster; import com.mongodb.internal.mockito.MongoMockito; +import com.mongodb.internal.observability.micrometer.TracingManager; import com.mongodb.internal.session.ServerSessionPool; import com.mongodb.reactivestreams.client.ChangeStreamPublisher; import com.mongodb.reactivestreams.client.ClientSession; @@ -179,7 +180,7 @@ void testWatch() { @Test void testStartSession() { ServerSessionPool serverSessionPool = mock(ServerSessionPool.class); - ClientSessionHelper clientSessionHelper = new ClientSessionHelper(mongoClient, serverSessionPool); + ClientSessionHelper clientSessionHelper = new ClientSessionHelper(mongoClient, serverSessionPool, TracingManager.NO_OP); assertAll("Start Session Tests", () -> assertAll("check validation", diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java index 920feb1f986..eb36678761a 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java @@ -22,7 +22,6 @@ import com.mongodb.MongoClientException; import com.mongodb.MongoException; import com.mongodb.MongoInternalException; -import com.mongodb.MongoNamespace; import com.mongodb.MongoQueryException; import com.mongodb.MongoSocketException; import com.mongodb.MongoTimeoutException; @@ -53,17 +52,14 @@ import com.mongodb.internal.connection.Cluster; import com.mongodb.internal.connection.OperationContext; import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext; +import com.mongodb.internal.observability.micrometer.Span; +import com.mongodb.internal.observability.micrometer.TracingManager; import com.mongodb.internal.operation.OperationHelper; import com.mongodb.internal.operation.Operations; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.internal.session.ServerSessionPool; -import com.mongodb.internal.observability.micrometer.Span; -import com.mongodb.internal.observability.micrometer.TraceContext; -import com.mongodb.internal.observability.micrometer.TracingManager; -import com.mongodb.internal.observability.micrometer.TransactionSpan; import com.mongodb.lang.Nullable; -import io.micrometer.common.KeyValues; import org.bson.BsonDocument; import org.bson.Document; import org.bson.UuidRepresentation; @@ -77,17 +73,11 @@ import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL; import static com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL; -import static com.mongodb.internal.MongoNamespaceHelper.COMMAND_COLLECTION_NAME; import static com.mongodb.ReadPreference.primary; import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.TimeoutContext.createTimeoutContext; -import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.COLLECTION; -import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.NAMESPACE; -import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.OPERATION_NAME; -import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.OPERATION_SUMMARY; -import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.SYSTEM; final class MongoClusterImpl implements MongoCluster { @Nullable @@ -434,7 +424,8 @@ public T execute(final ReadOperation operation, final ReadPreference r boolean implicitSession = isImplicitSession(session); OperationContext operationContext = getOperationContext(actualClientSession, readConcern, operation.getCommandName()) .withSessionContext(new ClientSessionBinding.SyncClientSessionContext(actualClientSession, readConcern, implicitSession)); - Span span = createOperationSpan(actualClientSession, operationContext, operation.getCommandName(), operation.getNamespace()); + Span span = operationContext.getTracingManager().createOperationSpan( + actualClientSession.getTransactionSpan(), operationContext, operation.getCommandName(), operation.getNamespace()); ReadBinding binding = getReadBinding(readPreference, actualClientSession, implicitSession); @@ -469,7 +460,8 @@ public T execute(final WriteOperation operation, final ReadConcern readCo ClientSession actualClientSession = getClientSession(session); OperationContext operationContext = getOperationContext(actualClientSession, readConcern, operation.getCommandName()) .withSessionContext(new ClientSessionBinding.SyncClientSessionContext(actualClientSession, readConcern, isImplicitSession(session))); - Span span = createOperationSpan(actualClientSession, operationContext, operation.getCommandName(), operation.getNamespace()); + Span span = operationContext.getTracingManager().createOperationSpan( + actualClientSession.getTransactionSpan(), operationContext, operation.getCommandName(), operation.getNamespace()); WriteBinding binding = getWriteBinding(actualClientSession, isImplicitSession(session)); try { @@ -587,48 +579,6 @@ ClientSession getClientSession(@Nullable final ClientSession clientSessionFromOp return session; } - /** - * Create a tracing span for the given operation, and set it on operation context. - * - * @param actualClientSession the session that the operation is part of - * @param operationContext the operation context for the operation - * @param commandName the name of the command - * @param namespace the namespace of the command - * @return the created span, or null if tracing is not enabled - */ - @Nullable - private Span createOperationSpan(final ClientSession actualClientSession, final OperationContext operationContext, final String commandName, final MongoNamespace namespace) { - TracingManager tracingManager = operationContext.getTracingManager(); - if (tracingManager.isEnabled()) { - TraceContext parentContext = null; - TransactionSpan transactionSpan = actualClientSession.getTransactionSpan(); - if (transactionSpan != null) { - parentContext = transactionSpan.getContext(); - } - String name = commandName + " " + namespace.getDatabaseName() + (COMMAND_COLLECTION_NAME.equalsIgnoreCase(namespace.getCollectionName()) - ? "" - : "." + namespace.getCollectionName()); - - KeyValues keyValues = KeyValues.of( - SYSTEM.withValue("mongodb"), - NAMESPACE.withValue(namespace.getDatabaseName())); - if (!COMMAND_COLLECTION_NAME.equalsIgnoreCase(namespace.getCollectionName())) { - keyValues = keyValues.and(COLLECTION.withValue(namespace.getCollectionName())); - } - keyValues = keyValues.and(OPERATION_NAME.withValue(commandName), - OPERATION_SUMMARY.withValue(name)); - - Span span = tracingManager.addSpan(name, parentContext, namespace); - - span.tagLowCardinality(keyValues); - - operationContext.setTracingSpan(span); - return span; - - } else { - return null; - } - } } private boolean isImplicitSession(@Nullable final ClientSession session) { diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java index 9ce58b1654f..7828ecde684 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java @@ -56,11 +56,8 @@ import com.mongodb.internal.connection.TestCommandListener; import com.mongodb.internal.connection.TestConnectionPoolListener; import com.mongodb.test.FlakyTest; -import org.bson.BsonArray; -import org.bson.BsonBoolean; import org.bson.BsonDocument; import org.bson.BsonInt32; -import org.bson.BsonString; import org.bson.BsonTimestamp; import org.bson.Document; import org.bson.codecs.BsonDocumentCodec; @@ -256,7 +253,6 @@ public void testBlockingIterationMethodsChangeStream() { assumeFalse(isAsync()); // Async change stream cursor is non-deterministic for cursor::next BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); - collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); sleep(2000); collectionHelper.insertDocuments(singletonList(BsonDocument.parse("{x: 1}")), WriteConcern.MAJORITY); @@ -298,7 +294,6 @@ public void testBlockingIterationMethodsChangeStream() { @FlakyTest(maxAttempts = 3) public void testGridFSUploadViaOpenUploadStreamTimeout() { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); collectionHelper.runAdminCommand("{" + " configureFailPoint: \"failCommand\"," @@ -306,7 +301,7 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { + " data: {" + " failCommands: [\"insert\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 205) + + " blockTimeMS: " + 205 + " }" + "}"); @@ -314,7 +309,7 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { filesCollectionHelper.create(); try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder() - .timeout(rtt + 200, TimeUnit.MILLISECONDS))) { + .timeout(200, TimeUnit.MILLISECONDS))) { MongoDatabase database = client.getDatabase(namespace.getDatabaseName()); GridFSBucket gridFsBucket = createGridFsBucket(database, GRID_FS_BUCKET_NAME); @@ -329,7 +324,6 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { @Test public void testAbortingGridFsUploadStreamTimeout() throws Throwable { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); collectionHelper.runAdminCommand("{" + " configureFailPoint: \"failCommand\"," @@ -337,7 +331,7 @@ public void testAbortingGridFsUploadStreamTimeout() throws Throwable { + " data: {" + " failCommands: [\"delete\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 305) + + " blockTimeMS: " + 320 + " }" + "}"); @@ -345,7 +339,7 @@ public void testAbortingGridFsUploadStreamTimeout() throws Throwable { filesCollectionHelper.create(); try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder() - .timeout(rtt + 300, TimeUnit.MILLISECONDS))) { + .timeout(300, TimeUnit.MILLISECONDS))) { MongoDatabase database = client.getDatabase(namespace.getDatabaseName()); GridFSBucket gridFsBucket = createGridFsBucket(database, GRID_FS_BUCKET_NAME).withChunkSizeBytes(2); @@ -360,7 +354,6 @@ public void testAbortingGridFsUploadStreamTimeout() throws Throwable { @Test public void testGridFsDownloadStreamTimeout() { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); chunksCollectionHelper.create(); filesCollectionHelper.create(); @@ -382,18 +375,19 @@ public void testGridFsDownloadStreamTimeout() { + " metadata: {}" + "}" )), WriteConcern.MAJORITY); + collectionHelper.runAdminCommand("{" + " configureFailPoint: \"failCommand\"," + " mode: { skip: 1 }," + " data: {" + " failCommands: [\"find\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 95) + + " blockTimeMS: " + 500 + " }" + "}"); try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder() - .timeout(rtt + 100, TimeUnit.MILLISECONDS))) { + .timeout(300, TimeUnit.MILLISECONDS))) { MongoDatabase database = client.getDatabase(namespace.getDatabaseName()); GridFSBucket gridFsBucket = createGridFsBucket(database, GRID_FS_BUCKET_NAME).withChunkSizeBytes(2); @@ -401,7 +395,9 @@ public void testGridFsDownloadStreamTimeout() { assertThrows(MongoOperationTimeoutException.class, downloadStream::read); List events = commandListener.getCommandStartedEvents(); - List findCommands = events.stream().filter(e -> e.getCommandName().equals("find")).collect(Collectors.toList()); + List findCommands = events.stream() + .filter(e -> e.getCommandName().equals("find")) + .collect(Collectors.toList()); assertEquals(2, findCommands.size()); assertEquals(gridFsFileNamespace.getCollectionName(), findCommands.get(0).getCommand().getString("find").getValue()); @@ -414,7 +410,7 @@ public void testGridFsDownloadStreamTimeout() { @ParameterizedTest(name = "[{index}] {0}") @MethodSource("test8ServerSelectionArguments") public void test8ServerSelection(final String connectionString) { - int timeoutBuffer = 100; // 5 in spec, Java is slower + int timeoutBuffer = 150; // 5 in spec, Java is slower // 1. Create a MongoClient try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder() .applyConnectionString(new ConnectionString(connectionString))) @@ -450,7 +446,7 @@ public void test8ServerSelectionHandshake(final String ignoredTestName, final in + " data: {" + " failCommands: [\"saslContinue\"]," + " blockConnection: true," - + " blockTimeMS: 350" + + " blockTimeMS: 600" + " }" + "}"); @@ -466,7 +462,7 @@ public void test8ServerSelectionHandshake(final String ignoredTestName, final in .insertOne(new Document("x", 1)); }); long elapsed = msElapsedSince(start); - assertTrue(elapsed <= 310, "Took too long to time out, elapsedMS: " + elapsed); + assertTrue(elapsed <= 350, "Took too long to time out, elapsedMS: " + elapsed); } } @@ -483,23 +479,23 @@ public void test9EndSessionClientTimeout() { + " data: {" + " failCommands: [\"abortTransaction\"]," + " blockConnection: true," - + " blockTimeMS: " + 150 + + " blockTimeMS: " + 500 + " }" + "}"); try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder().retryWrites(false) - .timeout(100, TimeUnit.MILLISECONDS))) { - MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) + .timeout(250, TimeUnit.MILLISECONDS))) { + MongoDatabase database = mongoClient.getDatabase(namespace.getDatabaseName()); + MongoCollection collection = database .getCollection(namespace.getCollectionName()); try (ClientSession session = mongoClient.startSession()) { session.startTransaction(); collection.insertOne(session, new Document("x", 1)); - long start = System.nanoTime(); session.close(); - long elapsed = msElapsedSince(start) - postSessionCloseSleep(); - assertTrue(elapsed <= 150, "Took too long to time out, elapsedMS: " + elapsed); + long elapsed = msElapsedSince(start); + assertTrue(elapsed <= 300, "Took too long to time out, elapsedMS: " + elapsed); } } CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> @@ -520,7 +516,7 @@ public void test9EndSessionSessionTimeout() { + " data: {" + " failCommands: [\"abortTransaction\"]," + " blockConnection: true," - + " blockTimeMS: " + 150 + + " blockTimeMS: " + 400 + " }" + "}"); @@ -529,14 +525,14 @@ public void test9EndSessionSessionTimeout() { .getCollection(namespace.getCollectionName()); try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() - .defaultTimeout(100, TimeUnit.MILLISECONDS).build())) { + .defaultTimeout(300, TimeUnit.MILLISECONDS).build())) { session.startTransaction(); collection.insertOne(session, new Document("x", 1)); long start = System.nanoTime(); session.close(); - long elapsed = msElapsedSince(start) - postSessionCloseSleep(); - assertTrue(elapsed <= 150, "Took too long to time out, elapsedMS: " + elapsed); + long elapsed = msElapsedSince(start); + assertTrue(elapsed <= 400, "Took too long to time out, elapsedMS: " + elapsed); } } CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> @@ -563,11 +559,12 @@ public void test9EndSessionCustomTesEachOperationHasItsOwnTimeoutWithCommit() { MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); + int defaultTimeout = 300; try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() - .defaultTimeout(200, TimeUnit.MILLISECONDS).build())) { + .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS).build())) { session.startTransaction(); collection.insertOne(session, new Document("x", 1)); - sleep(200); + sleep(defaultTimeout); assertDoesNotThrow(session::commitTransaction); } @@ -594,11 +591,12 @@ public void test9EndSessionCustomTesEachOperationHasItsOwnTimeoutWithAbort() { MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); + int defaultTimeout = 300; try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() - .defaultTimeout(200, TimeUnit.MILLISECONDS).build())) { + .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS).build())) { session.startTransaction(); collection.insertOne(session, new Document("x", 1)); - sleep(200); + sleep(defaultTimeout); assertDoesNotThrow(session::close); } @@ -618,12 +616,12 @@ public void test10ConvenientTransactions() { + " data: {" + " failCommands: [\"insert\", \"abortTransaction\"]," + " blockConnection: true," - + " blockTimeMS: " + 150 + + " blockTimeMS: " + 200 + " }" + "}"); try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder() - .timeout(100, TimeUnit.MILLISECONDS))) { + .timeout(150, TimeUnit.MILLISECONDS))) { MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); @@ -661,12 +659,13 @@ public void test10CustomTestWithTransactionUsesASingleTimeout() { MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); + int defaultTimeout = 200; try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() - .defaultTimeout(200, TimeUnit.MILLISECONDS).build())) { + .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS).build())) { assertThrows(MongoOperationTimeoutException.class, () -> session.withTransaction(() -> { collection.insertOne(session, new Document("x", 1)); - sleep(200); + sleep(defaultTimeout); return true; }) ); @@ -696,12 +695,13 @@ public void test10CustomTestWithTransactionUsesASingleTimeoutWithLock() { MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); + int defaultTimeout = 200; try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() - .defaultTimeout(200, TimeUnit.MILLISECONDS).build())) { + .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS).build())) { assertThrows(MongoOperationTimeoutException.class, () -> session.withTransaction(() -> { collection.insertOne(session, new Document("x", 1)); - sleep(200); + sleep(defaultTimeout); return true; }) ); @@ -710,7 +710,7 @@ public void test10CustomTestWithTransactionUsesASingleTimeoutWithLock() { } @DisplayName("11. Multi-batch bulkWrites") - @Test + @FlakyTest(maxAttempts = 3) @SuppressWarnings("try") protected void test11MultiBatchBulkWrites() throws InterruptedException { assumeTrue(serverVersionAtLeast(8, 0)); @@ -718,12 +718,18 @@ protected void test11MultiBatchBulkWrites() throws InterruptedException { // a workaround for https://jira.mongodb.org/browse/DRIVERS-2997, remove this block when the aforementioned bug is fixed client.getDatabase(namespace.getDatabaseName()).drop(); } - BsonDocument failPointDocument = new BsonDocument("configureFailPoint", new BsonString("failCommand")) - .append("mode", new BsonDocument("times", new BsonInt32(2))) - .append("data", new BsonDocument("failCommands", new BsonArray(singletonList(new BsonString("bulkWrite")))) - .append("blockConnection", BsonBoolean.TRUE) - .append("blockTimeMS", new BsonInt32(2020))); - try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder().timeout(4000, TimeUnit.MILLISECONDS)); + BsonDocument failPointDocument = BsonDocument.parse("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 2}," + + " data: {" + + " failCommands: [\"bulkWrite\" ]," + + " blockConnection: true," + + " blockTimeMS: " + 2020 + + " }" + + "}"); + + long timeout = 4000; + try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder().timeout(timeout, TimeUnit.MILLISECONDS)); FailPoint ignored = FailPoint.enable(failPointDocument, getPrimary())) { MongoDatabase db = client.getDatabase(namespace.getDatabaseName()); db.drop(); @@ -746,8 +752,8 @@ protected void test11MultiBatchBulkWrites() throws InterruptedException { * Not a prose spec test. However, it is additional test case for better coverage. */ @Test - @DisplayName("Should ignore wTimeoutMS of WriteConcern to initial and subsequent commitTransaction operations") - public void shouldIgnoreWtimeoutMsOfWriteConcernToInitialAndSubsequentCommitTransactionOperations() { + @DisplayName("Should not include wTimeoutMS of WriteConcern to initial and subsequent commitTransaction operations") + public void shouldNotIncludeWtimeoutMsOfWriteConcernToInitialAndSubsequentCommitTransactionOperations() { assumeTrue(serverVersionAtLeast(4, 4)); assumeFalse(isStandalone()); @@ -755,14 +761,15 @@ public void shouldIgnoreWtimeoutMsOfWriteConcernToInitialAndSubsequentCommitTran MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); + int defaultTimeout = 200; try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() - .defaultTimeout(200, TimeUnit.MILLISECONDS) + .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS) .build())) { session.startTransaction(TransactionOptions.builder() .writeConcern(WriteConcern.ACKNOWLEDGED.withWTimeout(100, TimeUnit.MILLISECONDS)) .build()); collection.insertOne(session, new Document("x", 1)); - sleep(200); + sleep(defaultTimeout); assertDoesNotThrow(session::commitTransaction); //repeat commit. @@ -805,12 +812,12 @@ public void shouldIgnoreWaitQueueTimeoutMSWhenTimeoutMsIsSet() { + " data: {" + " failCommands: [\"find\" ]," + " blockConnection: true," - + " blockTimeMS: " + 300 + + " blockTimeMS: " + 450 + " }" + "}"); - executor.submit(() -> collection.find().first()); - sleep(100); + executor.execute(() -> collection.find().first()); + sleep(150); //when && then assertDoesNotThrow(() -> collection.find().first()); @@ -844,7 +851,7 @@ public void shouldThrowOperationTimeoutExceptionWhenConnectionIsNotAvailableAndT + " }" + "}"); - executor.submit(() -> collection.withTimeout(0, TimeUnit.MILLISECONDS).find().first()); + executor.execute(() -> collection.withTimeout(0, TimeUnit.MILLISECONDS).find().first()); sleep(100); //when && then @@ -863,7 +870,7 @@ public void shouldUseWaitQueueTimeoutMSWhenTimeoutIsNotSet() { //given try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder() .applyToConnectionPoolSettings(builder -> builder - .maxWaitTime(100, TimeUnit.MILLISECONDS) + .maxWaitTime(20, TimeUnit.MILLISECONDS) .maxSize(1) ))) { MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) @@ -875,12 +882,12 @@ public void shouldUseWaitQueueTimeoutMSWhenTimeoutIsNotSet() { + " data: {" + " failCommands: [\"find\" ]," + " blockConnection: true," - + " blockTimeMS: " + 300 + + " blockTimeMS: " + 400 + " }" + "}"); - executor.submit(() -> collection.find().first()); - sleep(100); + executor.execute(() -> collection.find().first()); + sleep(200); //when & then assertThrows(MongoTimeoutException.class, () -> collection.find().first()); @@ -896,7 +903,6 @@ public void testKillCursorsIsNotExecutedAfterGetMoreNetworkErrorWhenTimeoutMsIsN assumeTrue(serverVersionAtLeast(4, 4)); assumeTrue(isLoadBalanced()); - long rtt = ClusterFixture.getPrimaryRTT(); collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); collectionHelper.insertDocuments(new Document(), new Document()); collectionHelper.runAdminCommand("{" @@ -905,7 +911,7 @@ public void testKillCursorsIsNotExecutedAfterGetMoreNetworkErrorWhenTimeoutMsIsN + " data: {" + " failCommands: [\"getMore\" ]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 600) + + " blockTimeMS: " + 600 + " }" + "}"); @@ -943,7 +949,6 @@ public void testKillCursorsIsNotExecutedAfterGetMoreNetworkError() { assumeTrue(serverVersionAtLeast(4, 4)); assumeTrue(isLoadBalanced()); - long rtt = ClusterFixture.getPrimaryRTT(); collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); collectionHelper.insertDocuments(new Document(), new Document()); collectionHelper.runAdminCommand("{" @@ -952,7 +957,7 @@ public void testKillCursorsIsNotExecutedAfterGetMoreNetworkError() { + " data: {" + " failCommands: [\"getMore\" ]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 600) + + " blockTimeMS: " + 600 + " }" + "}"); @@ -1040,11 +1045,16 @@ public void shouldUseConnectTimeoutMsWhenEstablishingConnectionInBackground() { + " data: {" + " failCommands: [\"hello\", \"isMaster\"]," + " blockConnection: true," - + " blockTimeMS: " + 500 + + " blockTimeMS: " + 500 + "," + // The appName is unique to prevent this failpoint from affecting ClusterFixture's ServerMonitor. + // Without the appName, ClusterFixture's heartbeats would be blocked, polluting RTT measurements with 500ms values, + // which would cause flakiness in other prose tests that use ClusterFixture.getPrimaryRTT() for timeout adjustments. + + " appName: \"connectTimeoutBackgroundTest\"" + " }" + "}"); try (MongoClient ignored = createMongoClient(getMongoClientSettingsBuilder() + .applicationName("connectTimeoutBackgroundTest") .applyToConnectionPoolSettings(builder -> builder.minSize(1)) // Use a very short timeout to ensure that the connection establishment will fail on the first handshake command. .timeout(10, TimeUnit.MILLISECONDS))) { @@ -1075,9 +1085,10 @@ private static Stream test8ServerSelectionArguments() { } private static Stream test8ServerSelectionHandshakeArguments() { + return Stream.of( - Arguments.of("timeoutMS honored for connection handshake commands if it's lower than serverSelectionTimeoutMS", 200, 300), - Arguments.of("serverSelectionTimeoutMS honored for connection handshake commands if it's lower than timeoutMS", 300, 200) + Arguments.of("timeoutMS honored for connection handshake commands if it's lower than serverSelectionTimeoutMS", 200, 500), + Arguments.of("serverSelectionTimeoutMS honored for connection handshake commands if it's lower than timeoutMS", 500, 200) ); } @@ -1088,7 +1099,8 @@ protected MongoNamespace generateNamespace() { protected MongoClientSettings.Builder getMongoClientSettingsBuilder() { commandListener.reset(); - return Fixture.getMongoClientSettingsBuilder() + MongoClientSettings.Builder mongoClientSettingsBuilder = Fixture.getMongoClientSettingsBuilder(); + return mongoClientSettingsBuilder .readConcern(ReadConcern.MAJORITY) .writeConcern(WriteConcern.MAJORITY) .readPreference(ReadPreference.primary()) @@ -1103,6 +1115,9 @@ public void setUp() { gridFsChunksNamespace = new MongoNamespace(getDefaultDatabaseName(), GRID_FS_BUCKET_NAME + ".chunks"); collectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), namespace); + // in some test collection might not have been created yet, thus dropping it in afterEach will throw an error + collectionHelper.create(); + filesCollectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), gridFsFileNamespace); chunksCollectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), gridFsChunksNamespace); commandListener = new TestCommandListener(); @@ -1112,10 +1127,13 @@ public void setUp() { public void tearDown() throws InterruptedException { ClusterFixture.disableFailPoint(FAIL_COMMAND_NAME); if (collectionHelper != null) { + // Due to testing abortTransaction via failpoint, there may be open transactions + // after the test finishes, thus drop() command hangs for 60 seconds until transaction + // is automatically rolled back. + collectionHelper.runAdminCommand("{killAllSessions: []}"); collectionHelper.drop(); filesCollectionHelper.drop(); chunksCollectionHelper.drop(); - commandListener.reset(); try { ServerHelper.checkPool(getPrimary()); } catch (InterruptedException e) { @@ -1139,7 +1157,7 @@ private MongoClient createMongoClient(final MongoClientSettings.Builder builder) return createMongoClient(builder.build()); } - private long msElapsedSince(final long t1) { + protected long msElapsedSince(final long t1) { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t1); } diff --git a/driver-sync/src/test/functional/com/mongodb/observability/MicrometerProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractMicrometerProseTest.java similarity index 57% rename from driver-sync/src/test/functional/com/mongodb/observability/MicrometerProseTest.java rename to driver-sync/src/test/functional/com/mongodb/client/AbstractMicrometerProseTest.java index d4239aa44d7..746b0ffd8d9 100644 --- a/driver-sync/src/test/functional/com/mongodb/observability/MicrometerProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractMicrometerProseTest.java @@ -14,44 +14,59 @@ * limitations under the License. */ -package com.mongodb.observability; +package com.mongodb.client; import com.mongodb.MongoClientSettings; -import com.mongodb.client.Fixture; -import com.mongodb.client.MongoClient; -import com.mongodb.client.MongoClients; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoDatabase; +import com.mongodb.lang.Nullable; +import com.mongodb.observability.ObservabilitySettings; +import com.mongodb.client.observability.SpanTree; +import com.mongodb.client.observability.SpanTree.SpanNode; import com.mongodb.observability.micrometer.MicrometerObservabilitySettings; import io.micrometer.observation.ObservationRegistry; +import io.micrometer.tracing.exporter.FinishedSpan; import io.micrometer.tracing.test.reporter.inmemory.InMemoryOtelSetup; import org.bson.Document; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static com.mongodb.ClusterFixture.getDefaultDatabaseName; +import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder; +import static com.mongodb.internal.observability.micrometer.MongodbObservation.HighCardinalityKeyNames.QUERY_TEXT; import static com.mongodb.internal.observability.micrometer.TracingManager.ENV_OBSERVABILITY_ENABLED; import static com.mongodb.internal.observability.micrometer.TracingManager.ENV_OBSERVABILITY_QUERY_TEXT_MAX_LENGTH; -import static com.mongodb.internal.observability.micrometer.MongodbObservation.HighCardinalityKeyNames.QUERY_TEXT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; /** - * Implementation of the prose tests for Micrometer OpenTelemetry tracing. + * Implementation of the prose tests + * for Micrometer OpenTelemetry tracing. */ -public class MicrometerProseTest { +public abstract class AbstractMicrometerProseTest { private final ObservationRegistry observationRegistry = ObservationRegistry.create(); private InMemoryOtelSetup memoryOtelSetup; private InMemoryOtelSetup.Builder.OtelBuildingBlocks inMemoryOtel; private static String previousEnvVarMdbTracingEnabled; private static String previousEnvVarMdbQueryTextLength; + protected abstract MongoClient createMongoClient(MongoClientSettings settings); + @BeforeAll static void beforeAll() { // preserve original env var values @@ -77,18 +92,19 @@ void tearDown() { memoryOtelSetup.close(); } + @DisplayName("Test 1: Tracing Enable/Disable via Environment Variable") @Test void testControlOtelInstrumentationViaEnvironmentVariable() throws Exception { setEnv(ENV_OBSERVABILITY_ENABLED, "false"); // don't enable command payload by default - MongoClientSettings clientSettings = Fixture.getMongoClientSettingsBuilder() + MongoClientSettings clientSettings = getMongoClientSettingsBuilder() .observabilitySettings(ObservabilitySettings.micrometerBuilder() .observationRegistry(observationRegistry) .build()) .build(); - try (MongoClient client = MongoClients.create(clientSettings)) { + try (MongoClient client = createMongoClient(clientSettings)) { MongoDatabase database = client.getDatabase(getDefaultDatabaseName()); MongoCollection collection = database.getCollection("test"); collection.find().first(); @@ -98,7 +114,7 @@ void testControlOtelInstrumentationViaEnvironmentVariable() throws Exception { } setEnv(ENV_OBSERVABILITY_ENABLED, "true"); - try (MongoClient client = MongoClients.create(clientSettings)) { + try (MongoClient client = createMongoClient(clientSettings)) { MongoDatabase database = client.getDatabase(getDefaultDatabaseName()); MongoCollection collection = database.getCollection("test"); collection.find().first(); @@ -114,6 +130,7 @@ void testControlOtelInstrumentationViaEnvironmentVariable() throws Exception { } } + @DisplayName("Test 2: Command Payload Emission via Environment Variable") @Test void testControlCommandPayloadViaEnvironmentVariable() throws Exception { setEnv(ENV_OBSERVABILITY_QUERY_TEXT_MAX_LENGTH, "42"); @@ -123,13 +140,13 @@ void testControlCommandPayloadViaEnvironmentVariable() throws Exception { .maxQueryTextLength(75) // should be overridden by env var .build(); - MongoClientSettings clientSettings = Fixture.getMongoClientSettingsBuilder() + MongoClientSettings clientSettings = getMongoClientSettingsBuilder() .observabilitySettings(ObservabilitySettings.micrometerBuilder() .applySettings(settings) .build()). build(); - try (MongoClient client = MongoClients.create(clientSettings)) { + try (MongoClient client = createMongoClient(clientSettings)) { MongoDatabase database = client.getDatabase(getDefaultDatabaseName()); MongoCollection collection = database.getCollection("test"); collection.find().first(); @@ -153,14 +170,14 @@ void testControlCommandPayloadViaEnvironmentVariable() throws Exception { setEnv(ENV_OBSERVABILITY_QUERY_TEXT_MAX_LENGTH, null); // Unset the environment variable - clientSettings = Fixture.getMongoClientSettingsBuilder() + clientSettings = getMongoClientSettingsBuilder() .observabilitySettings(ObservabilitySettings.micrometerBuilder() .observationRegistry(observationRegistry) .maxQueryTextLength(42) // setting this will not matter since env var is not set and enableCommandPayloadTracing is false .build()) .build(); - try (MongoClient client = MongoClients.create(clientSettings)) { + try (MongoClient client = createMongoClient(clientSettings)) { MongoDatabase database = client.getDatabase(getDefaultDatabaseName()); MongoCollection collection = database.getCollection("test"); collection.find().first(); @@ -182,11 +199,11 @@ void testControlCommandPayloadViaEnvironmentVariable() throws Exception { .maxQueryTextLength(7) // setting this will be used; .build(); - clientSettings = Fixture.getMongoClientSettingsBuilder() + clientSettings = getMongoClientSettingsBuilder() .observabilitySettings(settings) .build(); - try (MongoClient client = MongoClients.create(clientSettings)) { + try (MongoClient client = createMongoClient(clientSettings)) { MongoDatabase database = client.getDatabase(getDefaultDatabaseName()); MongoCollection collection = database.getCollection("test"); collection.find().first(); @@ -200,8 +217,108 @@ void testControlCommandPayloadViaEnvironmentVariable() throws Exception { } } + /** + * Verifies that concurrent operations produce isolated span trees with no cross-contamination. + * Each operation should get its own trace ID, correct parent-child linkage, and collection-specific tags, + * even when multiple operations execute simultaneously on the same client. + * + *

This test is not from the specification.

+ */ + @Test + void testConcurrentOperationsHaveSeparateSpans() throws Exception { + setEnv(ENV_OBSERVABILITY_ENABLED, "true"); + int nbrConcurrentOps = 10; + MongoClientSettings clientSettings = getMongoClientSettingsBuilder() + .applyToConnectionPoolSettings(pool -> pool.maxSize(nbrConcurrentOps)) + .observabilitySettings(ObservabilitySettings.micrometerBuilder() + .observationRegistry(observationRegistry) + .build()) + .build(); + + try (MongoClient client = createMongoClient(clientSettings)) { + MongoDatabase database = client.getDatabase(getDefaultDatabaseName()); + + // Warm up connections so the concurrent phase doesn't include handshake overhead + for (int i = 0; i < nbrConcurrentOps; i++) { + database.getCollection("concurrent_test_" + i).find().first(); + } + // Clear spans from warm-up before the actual concurrent test + memoryOtelSetup.close(); + memoryOtelSetup = InMemoryOtelSetup.builder().register(observationRegistry); + inMemoryOtel = memoryOtelSetup.getBuildingBlocks(); + + ExecutorService executor = Executors.newFixedThreadPool(nbrConcurrentOps); + try { + CountDownLatch startLatch = new CountDownLatch(1); + List> futures = new ArrayList<>(); + + for (int i = 0; i < nbrConcurrentOps; i++) { + String collectionName = "concurrent_test_" + i; + futures.add(executor.submit(() -> { + try { + startLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + database.getCollection(collectionName).find().first(); + })); + } + + // Release all threads simultaneously to maximize concurrency + startLatch.countDown(); + + for (Future future : futures) { + future.get(30, TimeUnit.SECONDS); + } + } finally { + executor.shutdown(); + } + + List allSpans = inMemoryOtel.getFinishedSpans(); + + // Each find() produces 2 spans: operation-level span + command-level span + assertEquals(nbrConcurrentOps * 2, allSpans.size(), + "Each concurrent operation should produce exactly 2 spans (operation + command)."); + + // Verify trace isolation: each independent operation should get its own traceId + Map> spansByTrace = allSpans.stream() + .collect(Collectors.groupingBy(FinishedSpan::getTraceId)); + assertEquals(nbrConcurrentOps, spansByTrace.size(), + "Each concurrent operation should have its own distinct trace ID."); + + // Use SpanTree to validate parent-child structure built from spanId/parentId linkage + SpanTree spanTree = SpanTree.from(allSpans); + List roots = spanTree.getRoots(); + + // Each operation span is a root; its command span is a child + assertEquals(nbrConcurrentOps, roots.size(), + "SpanTree should have one root per concurrent operation."); + + Set observedCollections = new HashSet<>(); + for (SpanNode root : roots) { + assertTrue(root.getName().startsWith("find " + getDefaultDatabaseName() + ".concurrent_test_"), + "Root span should be an operation span, but was: " + root.getName()); + + assertEquals(1, root.getChildren().size(), + "Each operation span should have exactly one child (command span)."); + assertEquals("find", root.getChildren().get(0).getName(), + "Child span should be the command span 'find'."); + + // Extract collection name from the operation span name to verify no cross-contamination + String collectionName = root.getName().substring( + ("find " + getDefaultDatabaseName() + ".").length()); + assertTrue(observedCollections.add(collectionName), + "Each operation should target a unique collection, but found duplicate: " + collectionName); + } + + assertEquals(nbrConcurrentOps, observedCollections.size(), + "All " + nbrConcurrentOps + " concurrent operations should be represented in distinct traces."); + } + } + @SuppressWarnings("unchecked") - private static void setEnv(final String key, final String value) throws Exception { + private static void setEnv(final String key, @Nullable final String value) throws Exception { // Get the unmodifiable Map from System.getenv() Map env = System.getenv(); diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java index 3682bd64ff0..910cf57edfd 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java @@ -93,7 +93,7 @@ public void shouldCreateServerSessionOnlyAfterConnectionCheckout() throws Interr .addCommandListener(new CommandListener() { @Override public void commandStarted(final CommandStartedEvent event) { - lsidSet.add(event.getCommand().getDocument("lsid")); + lsidSet.add(event.getCommand().getDocument("lsid").clone()); } }) .build())) { diff --git a/driver-sync/src/test/functional/com/mongodb/client/csot/AbstractClientSideOperationsEncryptionTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/csot/AbstractClientSideOperationsEncryptionTimeoutProseTest.java index dd45bc8ae2c..04303833bf5 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/csot/AbstractClientSideOperationsEncryptionTimeoutProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/csot/AbstractClientSideOperationsEncryptionTimeoutProseTest.java @@ -93,14 +93,13 @@ public abstract class AbstractClientSideOperationsEncryptionTimeoutProseTest { @Test void shouldThrowOperationTimeoutExceptionWhenCreateDataKey() { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); Map> kmsProviders = new HashMap<>(); Map localProviderMap = new HashMap<>(); localProviderMap.put("key", Base64.getDecoder().decode(MASTER_KEY)); kmsProviders.put("local", localProviderMap); - try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(rtt + 100))) { + try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(100))) { keyVaultCollectionHelper.runAdminCommand("{" + " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\"," @@ -108,7 +107,7 @@ void shouldThrowOperationTimeoutExceptionWhenCreateDataKey() { + " data: {" + " failCommands: [\"insert\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 100) + + " blockTimeMS: " + 100 + " }" + "}"); @@ -126,9 +125,8 @@ void shouldThrowOperationTimeoutExceptionWhenCreateDataKey() { @Test void shouldThrowOperationTimeoutExceptionWhenEncryptData() { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); - try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(rtt + 150))) { + try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(150))) { clientEncryption.createDataKey("local"); @@ -138,7 +136,7 @@ void shouldThrowOperationTimeoutExceptionWhenEncryptData() { + " data: {" + " failCommands: [\"find\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 150) + + " blockTimeMS: " + 150 + " }" + "}"); @@ -160,10 +158,9 @@ void shouldThrowOperationTimeoutExceptionWhenEncryptData() { @Test void shouldThrowOperationTimeoutExceptionWhenDecryptData() { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); BsonBinary encrypted; - try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(rtt + 400))) { + try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(400))) { clientEncryption.createDataKey("local"); BsonBinary dataKey = clientEncryption.createDataKey("local"); EncryptOptions encryptOptions = new EncryptOptions("AEAD_AES_256_CBC_HMAC_SHA_512-Deterministic"); @@ -171,14 +168,14 @@ void shouldThrowOperationTimeoutExceptionWhenDecryptData() { encrypted = clientEncryption.encrypt(new BsonString("hello"), encryptOptions); } - try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(rtt + 400))) { + try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(400))) { keyVaultCollectionHelper.runAdminCommand("{" - + " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\"," + + " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\"," + " mode: { times: 1 }," + " data: {" + " failCommands: [\"find\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 500) + + " blockTimeMS: " + 500 + " }" + "}"); commandListener.reset(); @@ -197,8 +194,7 @@ void shouldThrowOperationTimeoutExceptionWhenDecryptData() { @Test void shouldDecreaseOperationTimeoutForSubsequentOperations() { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); - long initialTimeoutMS = rtt + 2500; + long initialTimeoutMS = 2500; keyVaultCollectionHelper.runAdminCommand("{" + " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\"," @@ -206,7 +202,7 @@ void shouldDecreaseOperationTimeoutForSubsequentOperations() { + " data: {" + " failCommands: [\"insert\", \"find\", \"listCollections\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 10) + + " blockTimeMS: " + 10 + " }" + "}"); @@ -272,8 +268,7 @@ void shouldDecreaseOperationTimeoutForSubsequentOperations() { void shouldThrowTimeoutExceptionWhenCreateEncryptedCollection(final String commandToTimeout) { assumeTrue(serverVersionAtLeast(7, 0)); //given - long rtt = ClusterFixture.getPrimaryRTT(); - long initialTimeoutMS = rtt + 200; + long initialTimeoutMS = 200; try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder() .timeout(initialTimeoutMS, MILLISECONDS))) { diff --git a/driver-sync/src/test/functional/com/mongodb/client/observability/MicrometerProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/observability/MicrometerProseTest.java new file mode 100644 index 00000000000..38bd4350b1d --- /dev/null +++ b/driver-sync/src/test/functional/com/mongodb/client/observability/MicrometerProseTest.java @@ -0,0 +1,32 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client.observability; + +import com.mongodb.MongoClientSettings; +import com.mongodb.client.AbstractMicrometerProseTest; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; + +/** + * Sync driver implementation of the Micrometer prose tests. + */ +public class MicrometerProseTest extends AbstractMicrometerProseTest { + @Override + protected MongoClient createMongoClient(final MongoClientSettings settings) { + return MongoClients.create(settings); + } +} diff --git a/driver-sync/src/test/functional/com/mongodb/observability/SpanTree.java b/driver-sync/src/test/functional/com/mongodb/client/observability/SpanTree.java similarity index 98% rename from driver-sync/src/test/functional/com/mongodb/observability/SpanTree.java rename to driver-sync/src/test/functional/com/mongodb/client/observability/SpanTree.java index aa6697bf3ad..7d3bff3224d 100644 --- a/driver-sync/src/test/functional/com/mongodb/observability/SpanTree.java +++ b/driver-sync/src/test/functional/com/mongodb/client/observability/SpanTree.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.mongodb.observability; +package com.mongodb.client.observability; import com.mongodb.lang.Nullable; import io.micrometer.tracing.exporter.FinishedSpan; @@ -204,6 +204,10 @@ private static void assertValid(final SpanNode reportedNode, final SpanNode expe } } + public List getRoots() { + return Collections.unmodifiableList(roots); + } + @Override public String toString() { return "SpanTree{" diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java index cf003078f04..35189aef455 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java @@ -28,7 +28,7 @@ import com.mongodb.client.gridfs.GridFSBucket; import com.mongodb.client.model.Filters; import com.mongodb.client.test.CollectionHelper; -import com.mongodb.observability.SpanTree; +import com.mongodb.client.observability.SpanTree; import com.mongodb.client.unified.UnifiedTestModifications.TestDef; import com.mongodb.client.vault.ClientEncryption; import com.mongodb.connection.ClusterDescription; diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java index 2225f837ec5..328c8298b6c 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java @@ -63,6 +63,25 @@ public static void applyCustomizations(final TestDef def) { .file("client-side-encryption/tests/unified", "client bulkWrite with queryable encryption"); // client-side-operation-timeout (CSOT) + def.retry("Unified CSOT tests do not account for RTT which varies in TLS vs non-TLS runs") + .whenFailureContains("timeout") + .test("client-side-operations-timeout", + "timeoutMS behaves correctly for non-tailable cursors", + "timeoutMS is refreshed for getMore if timeoutMode is iteration - success"); + + def.retry("Unified CSOT tests do not account for RTT which varies in TLS vs non-TLS runs") + .whenFailureContains("timeout") + .test("client-side-operations-timeout", + "timeoutMS behaves correctly for tailable non-awaitData cursors", + "timeoutMS is refreshed for getMore - success"); + + def.retry("Unified CSOT tests do not account for RTT which varies in TLS vs non-TLS runs") + .whenFailureContains("timeout") + .test("client-side-operations-timeout", + "timeoutMS behaves correctly for tailable non-awaitData cursors", + "timeoutMS is refreshed for getMore - success"); + + //TODO-invistigate /* As to the background connection pooling section: timeoutMS set at the MongoClient level MUST be used as the timeout for all commands sent as part of the handshake. diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 8a08c34f213..b5e561c7f7e 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -18,10 +18,10 @@ aws-sdk-v2 = "2.30.31" graal-sdk = "24.0.0" jna = "5.11.0" jnr-unixsocket = "0.38.17" -netty-bom = "4.1.87.Final" +netty-bom = "4.2.9.Final" project-reactor-bom = "2022.0.0" reactive-streams = "1.0.4" -snappy = "1.1.10.3" +snappy = "1.1.10.4" zstd = "1.5.5-3" jetbrains-annotations = "26.0.2" micrometer-tracing = "1.6.0-M3" # This version has a fix for https://github.com/micrometer-metrics/tracing/issues/1092 diff --git a/testing/resources/specifications b/testing/resources/specifications index de684cf1ef9..bb9dddd8176 160000 --- a/testing/resources/specifications +++ b/testing/resources/specifications @@ -1 +1 @@ -Subproject commit de684cf1ef9feede71d358cbb7d253840f1a8647 +Subproject commit bb9dddd8176eddbb9424f9bebedfe8c6bbf28c3a