diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java index 5cf0ea103b..9350f6a749 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java @@ -37,6 +37,8 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; +import java.util.concurrent.atomic.AtomicBoolean; + import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL; import static com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL; import static com.mongodb.assertions.Assertions.assertNotNull; @@ -52,10 +54,11 @@ final class ClientSessionPublisherImpl extends BaseClientSessionImpl implements private boolean messageSentInCurrentTransaction; private boolean commitInProgress; private TransactionOptions transactionOptions; + private final AtomicBoolean closeInvoked = new AtomicBoolean(false); ClientSessionPublisherImpl(final ServerSessionPool serverSessionPool, final MongoClientImpl mongoClient, - final ClientSessionOptions options, final OperationExecutor executor) { + final ClientSessionOptions options, final OperationExecutor executor) { super(serverSessionPool, mongoClient, options); this.executor = executor; this.mongoClient = mongoClient; @@ -221,10 +224,12 @@ private void clearTransactionContextOnError(final MongoException e) { @Override public void close() { - if (transactionState == TransactionState.IN) { - Mono.from(abortTransaction()).doFinally(it -> super.close()).subscribe(); - } else { - super.close(); + if (closeInvoked.compareAndSet(false, true)) { + if (transactionState == TransactionState.IN) { + Mono.from(abortTransaction()).doFinally(it -> super.close()).subscribe(); + } else { + super.close(); + } } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoClientSessionFunctionalTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoClientSessionFunctionalTest.java new file mode 100644 index 0000000000..59ba6b984b --- /dev/null +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoClientSessionFunctionalTest.java @@ -0,0 +1,126 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.reactivestreams.client; + +import com.mongodb.ClusterFixture; +import com.mongodb.MongoNamespace; +import com.mongodb.client.test.CollectionHelper; +import com.mongodb.event.CommandStartedEvent; +import com.mongodb.internal.connection.ServerHelper; +import com.mongodb.internal.connection.TestCommandListener; +import org.bson.BsonDocument; +import org.bson.Document; +import org.bson.codecs.BsonDocumentCodec; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.stream.Collectors; + +import static com.mongodb.ClusterFixture.TIMEOUT_DURATION; +import static com.mongodb.ClusterFixture.getDefaultDatabaseName; +import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet; +import static com.mongodb.ClusterFixture.sleep; +import static com.mongodb.client.Fixture.getPrimary; +import static com.mongodb.reactivestreams.client.Fixture.getMongoClientBuilderFromConnectionString; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** + * This is the Java alternative to {@link MongoClientSessionSpecification}. + * New tests should be added here instead of the Groovy specification. + * Tests from the Groovy specification should be gradually migrated to this class. + *
+ */ +public class MongoClientSessionFunctionalTest { + private static final MongoNamespace NAMESPACE = + new MongoNamespace(getDefaultDatabaseName(), + MongoClientSessionFunctionalTest.class.getSimpleName()); + private static final String FAIL_COMMAND_NAME = "failCommand"; + private CollectionHelper