From 23ecd65d696d7890b2e04030b04e10ac1431fc49 Mon Sep 17 00:00:00 2001 From: davidfrigolet Date: Tue, 5 May 2026 18:38:05 +0100 Subject: [PATCH 1/3] feat: add couchbase, dynamodb and mongodbspringdata marker --- .../couchbase/CouchbaseTargetSystem.java | 16 ++- .../couchbase/CouchbaseTargetSystemTest.java | 12 +- .../dynamodb/DynamoDBTargetSystem.java | 10 +- .../DynamoDBCloudTargetSystemTest.java | 10 +- .../dynamodb/DynamoDBTestHelper.java | 9 ++ .../MongoDBSpringDataAuditMarker.java | 129 ++++++++++++++++++ .../MongoDBSpringDataTargetSystem.java | 8 +- .../MongoDBSpringDataTargetSystemTest.java | 10 +- .../sync/MongoDBSyncTargetSystemTest.java | 10 +- 9 files changed, 182 insertions(+), 32 deletions(-) create mode 100644 core/target-systems/flamingock-mongodb-springdata-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataAuditMarker.java diff --git a/core/target-systems/flamingock-couchbase-targetsystem/src/main/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystem.java b/core/target-systems/flamingock-couchbase-targetsystem/src/main/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystem.java index 29dd4bebe..e7d1fca7c 100644 --- a/core/target-systems/flamingock-couchbase-targetsystem/src/main/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystem.java +++ b/core/target-systems/flamingock-couchbase-targetsystem/src/main/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystem.java @@ -25,16 +25,19 @@ import io.flamingock.internal.common.core.context.ContextResolver; import io.flamingock.internal.common.core.error.FlamingockException; import io.flamingock.internal.common.couchbase.CouchbaseUtils; +import io.flamingock.internal.core.builder.FlamingockEdition; import io.flamingock.internal.core.external.targets.TransactionalTargetSystem; import io.flamingock.internal.core.external.targets.mark.NoOpTargetSystemAuditMarker; import io.flamingock.internal.core.transaction.TransactionManager; import io.flamingock.internal.core.transaction.TransactionWrapper; +import java.util.function.Supplier; import java.util.Objects; import java.util.Optional; import static io.flamingock.internal.common.core.audit.AuditReaderType.MONGOCK; import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_ORIGIN_PROPERTY_KEY; +import static io.flamingock.internal.core.builder.FlamingockEdition.COMMUNITY; public class CouchbaseTargetSystem extends TransactionalTargetSystem implements CouchbaseExternalSystem { @@ -79,11 +82,18 @@ public void initialize(ContextResolver baseContext) { targetSystemContext.addDependency(bucket); - TransactionManager txManager = new TransactionManager<>(null); //TODO: update as needed + Supplier couchbaseTxSupplier = () -> { + throw new FlamingockException( + "Couchbase TransactionAttemptContext can only be obtained inside cluster.transactions().run(); " + + "the wrapper must register the session via TransactionManager.startSession(sessionId, ctx)."); + }; + TransactionManager txManager = new TransactionManager<>(couchbaseTxSupplier); txWrapper = new CouchbaseTxWrapper(cluster, txManager); - //TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class)) - auditMarker = new NoOpTargetSystemAuditMarker(this.getId()); + FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class).orElse(COMMUNITY); + auditMarker = edition == COMMUNITY + ? new NoOpTargetSystemAuditMarker(this.getId()) + : CouchbaseTargetSystemAuditMarker.builder(cluster, bucket, txManager).build(); } private void validate() { diff --git a/core/target-systems/flamingock-couchbase-targetsystem/src/test/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystemTest.java b/core/target-systems/flamingock-couchbase-targetsystem/src/test/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystemTest.java index ceff45f12..125f8a336 100644 --- a/core/target-systems/flamingock-couchbase-targetsystem/src/test/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystemTest.java +++ b/core/target-systems/flamingock-couchbase-targetsystem/src/test/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystemTest.java @@ -136,6 +136,7 @@ void afterEach() throws Exception { //tear down mockRunnerServer.stop(); CouchbaseCollectionHelper.dropCollectionIfExists(cluster, BUCKET_NAME, SCOPE_NAME, CLIENTS_COLLECTION); + CouchbaseCollectionHelper.dropCollectionIfExists(cluster, BUCKET_NAME, SCOPE_NAME, CommunityPersistenceConstants.DEFAULT_MARKER_STORE_NAME); } @Test @@ -182,14 +183,12 @@ void happyPath() { // check clients changes couchbaseTestHelper.checkCount(bucket.scope(SCOPE_NAME).collection(CLIENTS_COLLECTION), 1); - //TODO add when cloud added // check ongoing status -// couchbaseTestHelper.checkOngoingChange(ongoingCount -> ongoingCount == 0); + couchbaseTestHelper.checkOngoingChange(ongoingCount -> ongoingCount == 0); } } @Test - @Disabled("adapt when adding cloud support") @DisplayName("Should rollback the ongoing deletion when a change fails") void failedChanges() { String executionId = "execution-1"; @@ -230,16 +229,15 @@ void failedChanges() { .build(); //THEN - mockRunnerServer.verifyAllCalls(); - OperationException ex = Assertions.assertThrows(OperationException.class, runner::run); + mockRunnerServer.verifyAllCalls(); + // check clients changes couchbaseTestHelper.checkCount(bucket.scope(SCOPE_NAME).collection(CLIENTS_COLLECTION), 0); - //TODO when cloud enabled // check ongoing status -// couchbaseTestHelper.checkEmptyTargetSystemAuditMarker(); + couchbaseTestHelper.checkEmptyTargetSystemAuditMarker(); } } diff --git a/core/target-systems/flamingock-dynamodb-targetsystem/src/main/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystem.java b/core/target-systems/flamingock-dynamodb-targetsystem/src/main/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystem.java index c8ad4c9e7..45474a3a0 100644 --- a/core/target-systems/flamingock-dynamodb-targetsystem/src/main/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystem.java +++ b/core/target-systems/flamingock-dynamodb-targetsystem/src/main/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystem.java @@ -35,6 +35,7 @@ import static io.flamingock.internal.common.core.audit.AuditReaderType.MONGOCK; import static io.flamingock.internal.common.core.metadata.Constants.DEFAULT_MONGOCK_ORIGIN; import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_ORIGIN_PROPERTY_KEY; +import static io.flamingock.internal.core.builder.FlamingockEdition.COMMUNITY; public class DynamoDBTargetSystem extends TransactionalTargetSystem implements DynamoDBExternalSystem { @@ -63,14 +64,13 @@ public void initialize(ContextResolver baseContext) { this.validate(); targetSystemContext.addDependency(client); - FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class) - .orElse(FlamingockEdition.CLOUD); - TransactionManager txManager = new TransactionManager<>(TransactWriteItemsEnhancedRequest::builder); txWrapper = new DynamoDBTxWrapper(client, txManager); - //TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class)) - auditMarker = new NoOpTargetSystemAuditMarker(this.getId()); + FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class).orElse(COMMUNITY); + auditMarker = edition == COMMUNITY + ? new NoOpTargetSystemAuditMarker(this.getId()) + : DynamoDBTargetSystemAuditMarker.builder(client, txManager).build(); } private void validate() { diff --git a/core/target-systems/flamingock-dynamodb-targetsystem/src/test/java/io/flamingock/targetsystem/dynamodb/DynamoDBCloudTargetSystemTest.java b/core/target-systems/flamingock-dynamodb-targetsystem/src/test/java/io/flamingock/targetsystem/dynamodb/DynamoDBCloudTargetSystemTest.java index 222af1a24..1bbd67fc7 100644 --- a/core/target-systems/flamingock-dynamodb-targetsystem/src/test/java/io/flamingock/targetsystem/dynamodb/DynamoDBCloudTargetSystemTest.java +++ b/core/target-systems/flamingock-dynamodb-targetsystem/src/test/java/io/flamingock/targetsystem/dynamodb/DynamoDBCloudTargetSystemTest.java @@ -115,6 +115,8 @@ void afterEach() throws Exception { //tear down logger.info("Stopping Mock Server..."); mockRunnerServer.stop(); + dynamoDBTestHelper.dropTable(UserEntity.tableName); + dynamoDBTestHelper.dropTable(dynamoDBTestHelper.tableName); } @Test @@ -168,14 +170,12 @@ void happyPath() { .table(UserEntity.tableName, TableSchema.fromBean(UserEntity.class)), 1); - //TODO when cloud enabled // check ongoing status -// dynamoDBTestHelper.checkOngoingChange(ongoingCount -> ongoingCount == 0); + dynamoDBTestHelper.checkOngoingChange(ongoingCount -> ongoingCount == 0); } } @Test - @Disabled("adapt when adding cloud support") @DisplayName("Should rollback the ongoing deletion when a change fails") void failedChanges() { String executionId = "execution-1"; @@ -216,10 +216,10 @@ void failedChanges() { .build(); //THEN - mockRunnerServer.verifyAllCalls(); - OperationException ex = Assertions.assertThrows(OperationException.class, runner::run); + mockRunnerServer.verifyAllCalls(); + // check clients changes dynamoDBTestHelper.checkCount( DynamoDbEnhancedClient.builder() diff --git a/core/target-systems/flamingock-dynamodb-targetsystem/src/test/java/io/flamingock/targetsystem/dynamodb/DynamoDBTestHelper.java b/core/target-systems/flamingock-dynamodb-targetsystem/src/test/java/io/flamingock/targetsystem/dynamodb/DynamoDBTestHelper.java index 390f4c0b7..def87bc50 100644 --- a/core/target-systems/flamingock-dynamodb-targetsystem/src/test/java/io/flamingock/targetsystem/dynamodb/DynamoDBTestHelper.java +++ b/core/target-systems/flamingock-dynamodb-targetsystem/src/test/java/io/flamingock/targetsystem/dynamodb/DynamoDBTestHelper.java @@ -23,6 +23,8 @@ import software.amazon.awssdk.enhanced.dynamodb.model.PutItemEnhancedRequest; import software.amazon.awssdk.enhanced.dynamodb.model.ScanEnhancedRequest; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; import java.util.function.Predicate; @@ -42,6 +44,13 @@ public boolean tableExists(String tableName) { return dynamoDBUtil.getDynamoDBClient().listTables().tableNames().contains(tableName); } + public void dropTable(String tableName) { + try { + dynamoDBUtil.getDynamoDBClient().deleteTable(DeleteTableRequest.builder().tableName(tableName).build()); + } catch (ResourceNotFoundException ignored) { + } + } + public DynamoDbClient getDynamoDBClient() { return dynamoDBUtil.getDynamoDBClient(); } diff --git a/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataAuditMarker.java b/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataAuditMarker.java new file mode 100644 index 000000000..7f460d310 --- /dev/null +++ b/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataAuditMarker.java @@ -0,0 +1,129 @@ +/* + * Copyright 2025 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.targetsystem.mongodb.springdata; + +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.UpdateOptions; +import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType; +import io.flamingock.internal.common.mongodb.CollectionInitializator; +import io.flamingock.internal.common.mongodb.MongoDBSyncCollectionHelper; +import io.flamingock.internal.common.mongodb.MongoDBSyncDocumentHelper; +import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMark; +import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMarker; +import io.flamingock.internal.util.constants.CommunityPersistenceConstants; +import org.bson.Document; +import org.springframework.data.mongodb.core.MongoTemplate; + +import java.util.HashSet; +import java.util.Set; + +/** + * Audit marker for the MongoDB Spring Data target system. + *

+ * Writes participate in the active Spring-managed MongoDB transaction by going through + * {@link MongoTemplate#getDb()}, which returns a session-aware {@link MongoDatabase} when + * called inside an active {@code MongoTransactionManager} transaction. This avoids exposing + * the underlying {@code ClientSession} through the Flamingock {@code TransactionManager}, + * keeping the Spring abstraction intact. + */ +public class MongoDBSpringDataAuditMarker implements TargetSystemAuditMarker { + + public static final String OPERATION = "operation"; + private static final String CHANGE_ID = "changeId"; + + private final MongoTemplate mongoTemplate; + private final String collectionName; + + public MongoDBSpringDataAuditMarker(MongoTemplate mongoTemplate, String collectionName) { + this.mongoTemplate = mongoTemplate; + this.collectionName = collectionName; + } + + public static Builder builder(MongoTemplate mongoTemplate) { + return new Builder(mongoTemplate); + } + + public static TargetSystemAuditMark mapToOnGoingStatus(Document document) { + TargetSystemAuditMarkType operation = TargetSystemAuditMarkType.valueOf(document.getString(OPERATION)); + return new TargetSystemAuditMark(document.getString(CHANGE_ID), operation); + } + + @Override + public Set listAll() { + return collection().find() + .map(MongoDBSpringDataAuditMarker::mapToOnGoingStatus) + .into(new HashSet<>()); + } + + @Override + public void clearMark(String changeId) { + collection().deleteMany(Filters.eq(CHANGE_ID, changeId)); + } + + @Override + public void mark(TargetSystemAuditMark auditMark) { + Document filter = new Document(CHANGE_ID, auditMark.getChangeId()); + Document newDocument = new Document(CHANGE_ID, auditMark.getChangeId()) + .append(OPERATION, auditMark.getOperation().name()); + + collection().updateOne( + filter, + new Document("$set", newDocument), + new UpdateOptions().upsert(true)); + } + + private MongoCollection collection() { + return mongoTemplate.getDb().getCollection(collectionName); + } + + public static class Builder { + private final MongoTemplate mongoTemplate; + private boolean autoCreate = true; + private String collectionName = CommunityPersistenceConstants.DEFAULT_MARKER_STORE_NAME; + + public Builder(MongoTemplate mongoTemplate) { + this.mongoTemplate = mongoTemplate; + } + + public Builder setCollectionName(String collectionName) { + this.collectionName = collectionName; + return this; + } + + public Builder withAutoCreate(boolean autoCreate) { + this.autoCreate = autoCreate; + return this; + } + + public MongoDBSpringDataAuditMarker build() { + MongoDatabase db = mongoTemplate.getDb(); + MongoCollection collection = db.getCollection(collectionName); + CollectionInitializator initializer = new CollectionInitializator<>( + new MongoDBSyncCollectionHelper(collection), + () -> new MongoDBSyncDocumentHelper(new Document()), + new String[]{CHANGE_ID} + ); + if (autoCreate) { + initializer.initialize(); + } else { + initializer.justValidateCollection(); + } + return new MongoDBSpringDataAuditMarker(mongoTemplate, collectionName); + } + } +} diff --git a/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataTargetSystem.java b/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataTargetSystem.java index 6ff74508a..ba034b5e9 100644 --- a/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataTargetSystem.java +++ b/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataTargetSystem.java @@ -24,6 +24,7 @@ import io.flamingock.internal.common.core.audit.AuditReaderType; import io.flamingock.internal.common.core.context.ContextResolver; import io.flamingock.internal.common.core.error.FlamingockException; +import io.flamingock.internal.core.builder.FlamingockEdition; import io.flamingock.internal.core.external.targets.mark.NoOpTargetSystemAuditMarker; import io.flamingock.internal.core.external.targets.TransactionalTargetSystem; import io.flamingock.internal.core.transaction.TransactionWrapper; @@ -36,6 +37,7 @@ import static io.flamingock.internal.common.core.audit.AuditReaderType.MONGOCK; import static io.flamingock.internal.common.core.metadata.Constants.DEFAULT_MONGOCK_ORIGIN; import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_ORIGIN_PROPERTY_KEY; +import static io.flamingock.internal.core.builder.FlamingockEdition.COMMUNITY; public class MongoDBSpringDataTargetSystem extends TransactionalTargetSystem @@ -106,8 +108,10 @@ public void initialize(ContextResolver baseContext) { .writeConcern(writeConcern) .build(); - //TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class)) - auditMarker = new NoOpTargetSystemAuditMarker(this.getId()); + FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class).orElse(COMMUNITY); + auditMarker = edition == COMMUNITY + ? new NoOpTargetSystemAuditMarker(this.getId()) + : MongoDBSpringDataAuditMarker.builder(mongoTemplate).build(); } private void validate() { diff --git a/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/test/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataTargetSystemTest.java b/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/test/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataTargetSystemTest.java index 841859f57..3f32942c2 100644 --- a/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/test/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataTargetSystemTest.java +++ b/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/test/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataTargetSystemTest.java @@ -33,6 +33,7 @@ import io.flamingock.common.test.cloud.prototype.PrototypeClientSubmission; import io.flamingock.common.test.cloud.prototype.PrototypeStage; import io.flamingock.internal.util.Trio; +import io.flamingock.internal.util.constants.CommunityPersistenceConstants; import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType; import io.flamingock.internal.core.builder.FlamingockFactory; import io.flamingock.internal.core.builder.CloudChangeRunnerBuilder; @@ -127,6 +128,7 @@ void afterEach() throws Exception { mockRunnerServer.stop(); testDatabase.getCollection(CLIENTS_COLLECTION).drop(); + testDatabase.getCollection(CommunityPersistenceConstants.DEFAULT_MARKER_STORE_NAME).drop(); } @Test @@ -180,7 +182,6 @@ void happyPath() { } @Test - @Disabled("adapt when adding cloud support") @DisplayName("Should rollback the ongoing deletion when a change fails") void failedChanges() { String executionId = "execution-1"; @@ -221,16 +222,15 @@ void failedChanges() { .build(); //THEN - mockRunnerServer.verifyAllCalls(); - OperationException ex = Assertions.assertThrows(OperationException.class, runner::run); + mockRunnerServer.verifyAllCalls(); + // check clients changes mongoDBTestHelper.checkCount(testDatabase.getCollection(CLIENTS_COLLECTION), 0); - //TODO when cloud enabled // check ongoing status -// mongoDBTestHelper.checkEmptyTargetSystemAudiMarker(); + mongoDBTestHelper.checkEmptyTargetSystemAudiMarker(); } } diff --git a/core/target-systems/flamingock-mongodb-sync-targetsystem/src/test/java/io/flamingock/targetsystem/mongodb/sync/MongoDBSyncTargetSystemTest.java b/core/target-systems/flamingock-mongodb-sync-targetsystem/src/test/java/io/flamingock/targetsystem/mongodb/sync/MongoDBSyncTargetSystemTest.java index a6ed84f66..9eafda830 100644 --- a/core/target-systems/flamingock-mongodb-sync-targetsystem/src/test/java/io/flamingock/targetsystem/mongodb/sync/MongoDBSyncTargetSystemTest.java +++ b/core/target-systems/flamingock-mongodb-sync-targetsystem/src/test/java/io/flamingock/targetsystem/mongodb/sync/MongoDBSyncTargetSystemTest.java @@ -33,6 +33,7 @@ import io.flamingock.common.test.cloud.prototype.PrototypeClientSubmission; import io.flamingock.common.test.cloud.prototype.PrototypeStage; import io.flamingock.internal.util.Trio; +import io.flamingock.internal.util.constants.CommunityPersistenceConstants; import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType; import io.flamingock.internal.core.builder.FlamingockFactory; import io.flamingock.internal.core.builder.CloudChangeRunnerBuilder; @@ -125,6 +126,7 @@ void afterEach() throws Exception { mockRunnerServer.stop(); testDatabase.getCollection(CLIENTS_COLLECTION).drop(); + testDatabase.getCollection(CommunityPersistenceConstants.DEFAULT_MARKER_STORE_NAME).drop(); } @Test @@ -178,7 +180,6 @@ void happyPath() { } @Test - @Disabled("adapt when adding cloud support") @DisplayName("Should rollback the ongoing deletion when a change fails") void failedChanges() { String executionId = "execution-1"; @@ -223,16 +224,15 @@ void failedChanges() { .build(); //THEN - mockRunnerServer.verifyAllCalls(); - OperationException ex = Assertions.assertThrows(OperationException.class, runner::run); + mockRunnerServer.verifyAllCalls(); + // check clients changes mongoDBTestHelper.checkCount(testDatabase.getCollection(CLIENTS_COLLECTION), 0); - //TODO when cloud enabled // check ongoing status -// mongoDBTestHelper.checkEmptyTargetSystemAudiMarker(); + mongoDBTestHelper.checkEmptyTargetSystemAudiMarker(); } } From 16270c9c58c7edbd5131fddc475073dd2dd9fa50 Mon Sep 17 00:00:00 2001 From: Antonio Perez Dieppa Date: Mon, 18 May 2026 13:10:15 +0100 Subject: [PATCH 2/3] fix: bug in DynamoDB marker --- .../targetsystem/dynamodb/DynamoDBTargetSystemAuditMarker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/target-systems/flamingock-dynamodb-targetsystem/src/main/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystemAuditMarker.java b/core/target-systems/flamingock-dynamodb-targetsystem/src/main/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystemAuditMarker.java index 0bfe3198e..49fcecb75 100644 --- a/core/target-systems/flamingock-dynamodb-targetsystem/src/main/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystemAuditMarker.java +++ b/core/target-systems/flamingock-dynamodb-targetsystem/src/main/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystemAuditMarker.java @@ -94,7 +94,7 @@ public void mark(TargetSystemAuditMark auditMark) { public static class Builder { - protected static DynamoDBUtil dynamoDBUtil; + private final DynamoDBUtil dynamoDBUtil; private final TransactionManager txManager; private String tableName = CommunityPersistenceConstants.DEFAULT_MARKER_STORE_NAME; private boolean autoCreate = true; From 9f2e0f1f1c5523faf9144291cb434b78d1a01d02 Mon Sep 17 00:00:00 2001 From: Antonio Perez Dieppa Date: Mon, 18 May 2026 17:44:18 +0100 Subject: [PATCH 3/3] test: add tests for markers --- .../CouchbaseTargetSystemAuditMarkerTest.java | 151 ++++++++++++++++++ .../DynamoDBTargetSystemAuditMarkerTest.java | 134 ++++++++++++++++ .../MongoDBSpringDataAuditMarkerTest.java | 127 +++++++++++++++ 3 files changed, 412 insertions(+) create mode 100644 core/target-systems/flamingock-couchbase-targetsystem/src/test/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystemAuditMarkerTest.java create mode 100644 core/target-systems/flamingock-dynamodb-targetsystem/src/test/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystemAuditMarkerTest.java create mode 100644 core/target-systems/flamingock-mongodb-springdata-targetsystem/src/test/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataAuditMarkerTest.java diff --git a/core/target-systems/flamingock-couchbase-targetsystem/src/test/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystemAuditMarkerTest.java b/core/target-systems/flamingock-couchbase-targetsystem/src/test/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystemAuditMarkerTest.java new file mode 100644 index 000000000..b199f3185 --- /dev/null +++ b/core/target-systems/flamingock-couchbase-targetsystem/src/test/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystemAuditMarkerTest.java @@ -0,0 +1,151 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.targetsystem.couchbase; + +import com.couchbase.client.core.io.CollectionIdentifier; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Cluster; +import com.couchbase.client.java.transactions.TransactionAttemptContext; +import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType; +import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMark; +import io.flamingock.internal.core.transaction.TransactionManager; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.testcontainers.couchbase.BucketDefinition; +import org.testcontainers.couchbase.CouchbaseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.Duration; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Direct write-verification for {@link CouchbaseTargetSystemAuditMarker}. The existing + * {@code CouchbaseTargetSystemTest} only asserts the end-state marker count is zero, which is + * also satisfied when {@code mark()} is a no-op. This test exercises {@code mark()}, + * {@code listAll()} and {@code clearMark()} through their production code paths, proving the + * write actually persists. + * + *

Couchbase's marker {@code mark()} requires a live {@link TransactionAttemptContext} from + * the {@link TransactionManager}. Each call is therefore wrapped in a real Couchbase transaction + * (`cluster.transactions().run(...)`), registering the attempt context under the changeId before + * invoking the marker and unregistering after. + */ +@Testcontainers +public class CouchbaseTargetSystemAuditMarkerTest { + + private static final String BUCKET_NAME = "test"; + private static final String SCOPE_NAME = CollectionIdentifier.DEFAULT_SCOPE; + private static final String MARKER_COLLECTION = "flamingockAuditMarkerTest"; + + @Container + public static final CouchbaseContainer couchbaseContainer = new CouchbaseContainer("couchbase/server:7.2.4") + .withBucket(new BucketDefinition(BUCKET_NAME)); + + private static Cluster cluster; + private static Bucket bucket; + + private TransactionManager txManager; + private CouchbaseTargetSystemAuditMarker marker; + + @BeforeAll + static void beforeAll() { + couchbaseContainer.start(); + cluster = Cluster.connect( + couchbaseContainer.getConnectionString(), + couchbaseContainer.getUsername(), + couchbaseContainer.getPassword()); + + bucket = cluster.bucket(BUCKET_NAME); + bucket.waitUntilReady(Duration.ofSeconds(10)); + } + + @BeforeEach + void beforeEach() { + txManager = new TransactionManager<>(() -> { + throw new UnsupportedOperationException( + "Supplier is unused: Couchbase tests register the TransactionAttemptContext explicitly via startSession(id, ctx)"); + }); + + marker = CouchbaseTargetSystemAuditMarker.builder(cluster, bucket, txManager) + .withScopeName(SCOPE_NAME) + .withCollectionName(MARKER_COLLECTION) + .withAutoCreate(true) + .build(); + + // Start each test from an empty marker collection. + clearAll(); + } + + @AfterEach + void afterEach() { + clearAll(); + } + + @Test + @DisplayName("mark() persists each mark and listAll() returns them with the right contents") + void markPersistsAndIsReadableViaListAll() { + String changeId1 = "change-1"; + String changeId2 = "change-2"; + + markInTransaction(changeId1, TargetSystemAuditMarkType.APPLIED); + markInTransaction(changeId2, TargetSystemAuditMarkType.ROLLED_BACK); + + Set marks = marker.listAll(); + Assertions.assertEquals(2, marks.size()); + + Map byId = marks.stream() + .collect(Collectors.toMap(TargetSystemAuditMark::getChangeId, + TargetSystemAuditMark::getOperation)); + Assertions.assertEquals(TargetSystemAuditMarkType.APPLIED, byId.get(changeId1)); + Assertions.assertEquals(TargetSystemAuditMarkType.ROLLED_BACK, byId.get(changeId2)); + } + + @Test + @DisplayName("clearMark() removes only the targeted mark") + void clearMarkRemovesOnlyTheTargetedMark() { + String changeId1 = "change-1"; + String changeId2 = "change-2"; + markInTransaction(changeId1, TargetSystemAuditMarkType.APPLIED); + markInTransaction(changeId2, TargetSystemAuditMarkType.APPLIED); + + marker.clearMark(changeId1); + + Set marks = marker.listAll(); + Assertions.assertEquals(1, marks.size()); + Assertions.assertEquals(changeId2, marks.iterator().next().getChangeId()); + } + + private void markInTransaction(String changeId, TargetSystemAuditMarkType operation) { + cluster.transactions().run(ctx -> { + txManager.startSession(changeId, ctx); + marker.mark(new TargetSystemAuditMark(changeId, operation)); + txManager.closeSession(changeId); + }); + } + + private void clearAll() { + for (TargetSystemAuditMark mark : marker.listAll()) { + marker.clearMark(mark.getChangeId()); + } + } +} diff --git a/core/target-systems/flamingock-dynamodb-targetsystem/src/test/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystemAuditMarkerTest.java b/core/target-systems/flamingock-dynamodb-targetsystem/src/test/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystemAuditMarkerTest.java new file mode 100644 index 000000000..91f800277 --- /dev/null +++ b/core/target-systems/flamingock-dynamodb-targetsystem/src/test/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystemAuditMarkerTest.java @@ -0,0 +1,134 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.targetsystem.dynamodb; + +import io.flamingock.dynamodb.kit.DynamoDBTestContainer; +import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType; +import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMark; +import io.flamingock.internal.core.transaction.TransactionManager; +import io.flamingock.internal.util.dynamodb.DynamoDBUtil; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient; +import software.amazon.awssdk.enhanced.dynamodb.model.TransactWriteItemsEnhancedRequest; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; + +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Direct write-verification for {@link DynamoDBTargetSystemAuditMarker}. The existing + * {@code DynamoDBCloudTargetSystemTest} only asserts the end-state marker count is zero, which is + * also satisfied when {@code mark()} is a no-op. This test exercises {@code mark()}, + * {@code listAll()} and {@code clearMark()} through their production code paths, proving the + * write actually persists. + * + *

DynamoDB's marker {@code mark()} adds a {@code PutItem} to a + * {@link TransactWriteItemsEnhancedRequest.Builder} obtained from the {@link TransactionManager}. + * The test orchestrates the surrounding transaction itself: register a fresh builder under the + * changeId, invoke the marker, then execute the transaction via the enhanced client so the + * {@code PutItem} actually lands. + */ +@Testcontainers +public class DynamoDBTargetSystemAuditMarkerTest { + + private static final String MARKER_TABLE = "flamingockAuditMarkerTest"; + + @Container + static GenericContainer dynamoContainer = DynamoDBTestContainer.createContainer(); + + private DynamoDbClient client; + private DynamoDbEnhancedClient enhancedClient; + private TransactionManager txManager; + private DynamoDBTargetSystemAuditMarker marker; + + @BeforeEach + void beforeEach() { + client = DynamoDBTestContainer.createClient(dynamoContainer); + enhancedClient = new DynamoDBUtil(client).getEnhancedClient(); + + txManager = new TransactionManager<>(TransactWriteItemsEnhancedRequest::builder); + + marker = DynamoDBTargetSystemAuditMarker.builder(client, txManager) + .setTableName(MARKER_TABLE) + .withAutoCreate(true) + .build(); + } + + @AfterEach + void afterEach() { + try { + client.deleteTable(b -> b.tableName(MARKER_TABLE)); + } catch (Exception ignored) { + // table may not exist if build failed + } + client.close(); + } + + @Test + @DisplayName("mark() persists each mark and listAll() returns them with the right contents") + void markPersistsAndIsReadableViaListAll() { + String changeId1 = "change-1"; + String changeId2 = "change-2"; + + markInTransaction(changeId1, TargetSystemAuditMarkType.APPLIED); + markInTransaction(changeId2, TargetSystemAuditMarkType.ROLLED_BACK); + + Set marks = marker.listAll(); + Assertions.assertEquals(2, marks.size()); + + Map byId = marks.stream() + .collect(Collectors.toMap(TargetSystemAuditMark::getChangeId, + TargetSystemAuditMark::getOperation)); + Assertions.assertEquals(TargetSystemAuditMarkType.APPLIED, byId.get(changeId1)); + Assertions.assertEquals(TargetSystemAuditMarkType.ROLLED_BACK, byId.get(changeId2)); + } + + @Test + @DisplayName("clearMark() removes only the targeted mark") + void clearMarkRemovesOnlyTheTargetedMark() { + String changeId1 = "change-1"; + String changeId2 = "change-2"; + markInTransaction(changeId1, TargetSystemAuditMarkType.APPLIED); + markInTransaction(changeId2, TargetSystemAuditMarkType.APPLIED); + + marker.clearMark(changeId1); + + Set marks = marker.listAll(); + Assertions.assertEquals(1, marks.size()); + Assertions.assertEquals(changeId2, marks.iterator().next().getChangeId()); + } + + /** + * Wraps a single mark() call in a real DynamoDB transaction: open the session with a fresh + * request builder, let the marker queue its PutItem onto it, then execute the transaction so + * the write actually persists. + */ + private void markInTransaction(String changeId, TargetSystemAuditMarkType operation) { + TransactWriteItemsEnhancedRequest.Builder requestBuilder = + txManager.startSession(changeId); + marker.mark(new TargetSystemAuditMark(changeId, operation)); + enhancedClient.transactWriteItems(requestBuilder.build()); + txManager.closeSession(changeId); + } +} diff --git a/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/test/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataAuditMarkerTest.java b/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/test/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataAuditMarkerTest.java new file mode 100644 index 000000000..8d1287c16 --- /dev/null +++ b/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/test/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataAuditMarkerTest.java @@ -0,0 +1,127 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.targetsystem.mongodb.springdata; + +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType; +import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMark; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Direct write-verification for {@link MongoDBSpringDataAuditMarker}. The existing + * {@code MongoDBSpringDataTargetSystemTest} only asserts the end-state marker count is zero, + * which is also satisfied when {@code mark()} is a no-op. This test exercises {@code mark()}, + * {@code listAll()} and {@code clearMark()} through their production code paths, proving the + * write actually persists. + * + *

The Spring Data marker relies on Spring's {@code MongoTransactionManager} for session + * binding. Outside a Spring transaction, {@code mongoTemplate.getDb()} returns the standard + * (non-session-bound) {@code MongoDatabase}; the upsert from {@code mark()} still commits + * immediately. The test exploits that to drive {@code mark()} without orchestrating a Spring + * transaction. + */ +@Testcontainers +public class MongoDBSpringDataAuditMarkerTest { + + private static final String DB_NAME = "test"; + private static final String MARKER_COLLECTION = "flamingockAuditMarkerTest"; + + @Container + public static final MongoDBContainer mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:6")); + + private static MongoTemplate mongoTemplate; + + private MongoDBSpringDataAuditMarker marker; + + @BeforeAll + static void beforeAll() { + MongoClient mongoClient = MongoClients.create(MongoClientSettings.builder() + .applyConnectionString(new ConnectionString(mongoDBContainer.getConnectionString())) + .build()); + mongoTemplate = new MongoTemplate(mongoClient, DB_NAME); + } + + @BeforeEach + void beforeEach() { + marker = MongoDBSpringDataAuditMarker.builder(mongoTemplate) + .setCollectionName(MARKER_COLLECTION) + .withAutoCreate(true) + .build(); + clearAll(); + } + + @AfterEach + void afterEach() { + clearAll(); + } + + @Test + @DisplayName("mark() persists each mark and listAll() returns them with the right contents") + void markPersistsAndIsReadableViaListAll() { + String changeId1 = "change-1"; + String changeId2 = "change-2"; + + marker.mark(new TargetSystemAuditMark(changeId1, TargetSystemAuditMarkType.APPLIED)); + marker.mark(new TargetSystemAuditMark(changeId2, TargetSystemAuditMarkType.ROLLED_BACK)); + + Set marks = marker.listAll(); + Assertions.assertEquals(2, marks.size()); + + Map byId = marks.stream() + .collect(Collectors.toMap(TargetSystemAuditMark::getChangeId, + TargetSystemAuditMark::getOperation)); + Assertions.assertEquals(TargetSystemAuditMarkType.APPLIED, byId.get(changeId1)); + Assertions.assertEquals(TargetSystemAuditMarkType.ROLLED_BACK, byId.get(changeId2)); + } + + @Test + @DisplayName("clearMark() removes only the targeted mark") + void clearMarkRemovesOnlyTheTargetedMark() { + String changeId1 = "change-1"; + String changeId2 = "change-2"; + marker.mark(new TargetSystemAuditMark(changeId1, TargetSystemAuditMarkType.APPLIED)); + marker.mark(new TargetSystemAuditMark(changeId2, TargetSystemAuditMarkType.APPLIED)); + + marker.clearMark(changeId1); + + Set marks = marker.listAll(); + Assertions.assertEquals(1, marks.size()); + Assertions.assertEquals(changeId2, marks.iterator().next().getChangeId()); + } + + private void clearAll() { + for (TargetSystemAuditMark mark : marker.listAll()) { + marker.clearMark(mark.getChangeId()); + } + } +}