From b7446b53818bf24d3b3260619fc8edf5bcf12990 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Wed, 14 Jan 2026 14:52:01 -0800 Subject: [PATCH 1/2] Make asynchronous ClientSession.close() idempotent. JAVA-6062 --- .../internal/ClientSessionPublisherImpl.java | 15 ++- .../MongoClientSessionFunctionalTest.java | 127 ++++++++++++++++++ .../MongoClientSessionSpecification.groovy | 5 + 3 files changed, 142 insertions(+), 5 deletions(-) create mode 100644 driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoClientSessionFunctionalTest.java diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java index 5cf0ea103bd..9350f6a7493 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java @@ -37,6 +37,8 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; +import java.util.concurrent.atomic.AtomicBoolean; + import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL; import static com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL; import static com.mongodb.assertions.Assertions.assertNotNull; @@ -52,10 +54,11 @@ final class ClientSessionPublisherImpl extends BaseClientSessionImpl implements private boolean messageSentInCurrentTransaction; private boolean commitInProgress; private TransactionOptions transactionOptions; + private final AtomicBoolean closeInvoked = new AtomicBoolean(false); ClientSessionPublisherImpl(final ServerSessionPool serverSessionPool, final MongoClientImpl mongoClient, - final ClientSessionOptions options, final OperationExecutor executor) { + final ClientSessionOptions options, final OperationExecutor executor) { super(serverSessionPool, mongoClient, options); this.executor = executor; this.mongoClient = mongoClient; @@ -221,10 +224,12 @@ private void clearTransactionContextOnError(final MongoException e) { @Override public void close() { - if (transactionState == TransactionState.IN) { - Mono.from(abortTransaction()).doFinally(it -> super.close()).subscribe(); - } else { - super.close(); + if (closeInvoked.compareAndSet(false, true)) { + if (transactionState == TransactionState.IN) { + Mono.from(abortTransaction()).doFinally(it -> super.close()).subscribe(); + } else { + super.close(); + } } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoClientSessionFunctionalTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoClientSessionFunctionalTest.java new file mode 100644 index 00000000000..8d0114326e5 --- /dev/null +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoClientSessionFunctionalTest.java @@ -0,0 +1,127 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.reactivestreams.client; + +import com.mongodb.ClusterFixture; +import com.mongodb.MongoNamespace; +import com.mongodb.client.test.CollectionHelper; +import com.mongodb.event.CommandStartedEvent; +import com.mongodb.internal.connection.ServerHelper; +import com.mongodb.internal.connection.TestCommandListener; +import org.bson.BsonDocument; +import org.bson.Document; +import org.bson.codecs.BsonDocumentCodec; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.stream.Collectors; + +import static com.mongodb.ClusterFixture.TIMEOUT_DURATION; +import static com.mongodb.ClusterFixture.getDefaultDatabaseName; +import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet; +import static com.mongodb.ClusterFixture.sleep; +import static com.mongodb.client.Fixture.getPrimary; +import static com.mongodb.reactivestreams.client.Fixture.getMongoClientBuilderFromConnectionString; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** + * This is the Java alternative to {@link MongoClientSessionSpecification}. + * New tests should be added here instead of the Groovy specification. + * Tests from the Groovy specification should be gradually migrated to this class. + *

+ */ +public class MongoClientSessionFunctionalTest { + private static final MongoNamespace NAMESPACE = + new MongoNamespace(getDefaultDatabaseName(), + MongoClientSessionFunctionalTest.class.getSimpleName()); + private static final String FAIL_COMMAND_NAME = "failCommand"; + private CollectionHelper collectionHelper; + + @BeforeEach + public void setUp() { + collectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), NAMESPACE); + // in some test collection might not have been created yet, thus dropping it in afterEach will throw an error + collectionHelper.create(); + } + + @Test + @DisplayName("should issue only one abortTransaction when close is called multiple times") + void shouldIssueOnlyOneAbortTransactionWhenCloseCalledMultipleTimes() { + assumeTrue(isDiscoverableReplicaSet()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: '" + FAIL_COMMAND_NAME + "'," + + " mode: { times: 1 }," + + " data: {" + + " failCommands: ['abortTransaction']," + + " blockConnection: true," + + " blockTimeMS: 50" + + " }" + + "}"); + + TestCommandListener commandListener = new TestCommandListener(); + try (MongoClient client = MongoClients.create(getMongoClientBuilderFromConnectionString() + .addCommandListener(commandListener) + .build())) { + + MongoCollection collection = client.getDatabase(NAMESPACE.getDatabaseName()) + .getCollection(NAMESPACE.getCollectionName()); + + ClientSession session = Mono.from(client.startSession()).block(TIMEOUT_DURATION); + session.startTransaction(); + Mono.from(collection.insertOne(session, new Document("x", 1))).block(TIMEOUT_DURATION); + + // when + commandListener.reset(); + session.close(); + session.close(); + session.close(); + + // wait for async abort operations to complete + sleep(1000); + + // then + List abortCommands = commandListener.getCommandStartedEvents().stream() + .filter(event -> event.getCommandName().equals("abortTransaction")) + .collect(Collectors.toList()); + + assertEquals(1, abortCommands.size(), "Expected exactly one abortTransaction command but was : " + abortCommands.size()); + } + } + + @AfterEach + public void tearDown() throws InterruptedException { + ClusterFixture.disableFailPoint(FAIL_COMMAND_NAME); + if (collectionHelper != null) { + // Due to testing abortTransaction via failpoint, there may be open transactions + // after the test finishes, thus drop() command hangs for 60 seconds until transaction + // is automatically rolled back. + collectionHelper.runAdminCommand("{killAllSessions: []}"); + collectionHelper.drop(); + try { + ServerHelper.checkPool(getPrimary()); + } catch (InterruptedException e) { + // ignore + } + } + } +} diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoClientSessionSpecification.groovy b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoClientSessionSpecification.groovy index ae35b20cb3b..735ba2c81a7 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoClientSessionSpecification.groovy +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoClientSessionSpecification.groovy @@ -42,6 +42,11 @@ import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet import static com.mongodb.reactivestreams.client.Fixture.getDefaultDatabase import static com.mongodb.reactivestreams.client.Fixture.getMongoClientBuilderFromConnectionString +/** + * @deprecated Do not add new tests to this Groovy specification. + * New tests should be added to {@link MongoClientSessionFunctionalTest} instead. + * Tests from this specification should be gradually migrated to the Java class. + */ class MongoClientSessionSpecification extends FunctionalSpecification { def 'should throw IllegalArgumentException if options are null'() { From 0971bb5209420ef67f25359caa1b30a79293b639 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Wed, 14 Jan 2026 14:53:41 -0800 Subject: [PATCH 2/2] Remove comment. --- .../reactivestreams/client/MongoClientSessionFunctionalTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoClientSessionFunctionalTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoClientSessionFunctionalTest.java index 8d0114326e5..59ba6b984bd 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoClientSessionFunctionalTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoClientSessionFunctionalTest.java @@ -59,7 +59,6 @@ public class MongoClientSessionFunctionalTest { @BeforeEach public void setUp() { collectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), NAMESPACE); - // in some test collection might not have been created yet, thus dropping it in afterEach will throw an error collectionHelper.create(); }