From 75b216c3c2e24a9793270f863eeac903145acf95 Mon Sep 17 00:00:00 2001 From: Hyun Jong Park Date: Mon, 8 Dec 2025 01:37:48 +0900 Subject: [PATCH 1/2] Fix sequence with TSID implementation in MongoSequenceIncrementer Signed-off-by: Hyun Jong Park --- .../MongoDefaultBatchConfiguration.java | 6 +- .../dao/mongodb/MongoJobExecutionDao.java | 4 +- .../dao/mongodb/MongoJobInstanceDao.java | 4 +- .../dao/mongodb/MongoSequenceIncrementer.java | 68 +++++++++--- .../dao/mongodb/MongoStepExecutionDao.java | 4 +- .../MongoJobRepositoryFactoryBean.java | 8 +- .../MongoSequenceIncrementerTests.java | 101 ++++++++++++++++++ .../MongoDBJobRepositoryIntegrationTests.java | 45 ++++++++ 8 files changed, 206 insertions(+), 34 deletions(-) create mode 100644 spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementerTests.java diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java index 1f28cd73eb..e8f4da4a82 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java @@ -159,7 +159,7 @@ protected JobKeyGenerator getJobKeyGenerator() { * @since 6.0 */ protected DataFieldMaxValueIncrementer getJobInstanceIncrementer() { - return new MongoSequenceIncrementer(getMongoOperations(), "BATCH_JOB_INSTANCE_SEQ"); + return new MongoSequenceIncrementer(); } /** @@ -168,7 +168,7 @@ protected DataFieldMaxValueIncrementer getJobInstanceIncrementer() { * @since 6.0 */ protected DataFieldMaxValueIncrementer getJobExecutionIncrementer() { - return new MongoSequenceIncrementer(getMongoOperations(), "BATCH_JOB_EXECUTION_SEQ"); + return new MongoSequenceIncrementer(); } /** @@ -177,7 +177,7 @@ protected DataFieldMaxValueIncrementer getJobExecutionIncrementer() { * @since 6.0 */ protected DataFieldMaxValueIncrementer getStepExecutionIncrementer() { - return new MongoSequenceIncrementer(getMongoOperations(), "BATCH_STEP_EXECUTION_SEQ"); + return new MongoSequenceIncrementer(); } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java index e0630659ab..3bfbf7d8b2 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java @@ -50,8 +50,6 @@ public class MongoJobExecutionDao implements JobExecutionDao { private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION"; - private static final String JOB_EXECUTIONS_SEQUENCE_NAME = "BATCH_JOB_EXECUTION_SEQ"; - private final MongoOperations mongoOperations; private final JobExecutionConverter jobExecutionConverter = new JobExecutionConverter(); @@ -62,7 +60,7 @@ public class MongoJobExecutionDao implements JobExecutionDao { public MongoJobExecutionDao(MongoOperations mongoOperations) { this.mongoOperations = mongoOperations; - this.jobExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, JOB_EXECUTIONS_SEQUENCE_NAME); + this.jobExecutionIncrementer = new MongoSequenceIncrementer(); } public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) { diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java index 760bdbba55..c0d1ff2c8b 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java @@ -44,8 +44,6 @@ public class MongoJobInstanceDao implements JobInstanceDao { private static final String COLLECTION_NAME = "BATCH_JOB_INSTANCE"; - private static final String SEQUENCE_NAME = "BATCH_JOB_INSTANCE_SEQ"; - private final MongoOperations mongoOperations; private DataFieldMaxValueIncrementer jobInstanceIncrementer; @@ -57,7 +55,7 @@ public class MongoJobInstanceDao implements JobInstanceDao { public MongoJobInstanceDao(MongoOperations mongoOperations) { Assert.notNull(mongoOperations, "mongoOperations must not be null."); this.mongoOperations = mongoOperations; - this.jobInstanceIncrementer = new MongoSequenceIncrementer(mongoOperations, SEQUENCE_NAME); + this.jobInstanceIncrementer = new MongoSequenceIncrementer(); } public void setJobKeyGenerator(JobKeyGenerator jobKeyGenerator) { diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java index 9722db637f..b7a9086f6f 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java @@ -15,16 +15,10 @@ */ package org.springframework.batch.core.repository.dao.mongodb; -import com.mongodb.client.model.FindOneAndUpdateOptions; -import com.mongodb.client.model.ReturnDocument; -import org.bson.Document; - import org.springframework.dao.DataAccessException; -import org.springframework.data.mongodb.core.MongoOperations; import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; -// Based on https://www.mongodb.com/blog/post/generating-globally-unique-identifiers-for-use-with-mongodb -// Section: Use a single counter document to generate unique identifiers one at a time +import java.util.concurrent.atomic.AtomicInteger; /** * @author Mahmoud Ben Hassine @@ -33,22 +27,33 @@ */ public class MongoSequenceIncrementer implements DataFieldMaxValueIncrementer { - private final MongoOperations mongoTemplate; + private static final int NODE_BITS = 10; + private static final int SEQUENCE_BITS = 12; + private static final int NODE_SHIFT = SEQUENCE_BITS; + private static final int TIMESTAMP_SHIFT = NODE_BITS + SEQUENCE_BITS; + private static final int SEQUENCE_MASK = (1 << SEQUENCE_BITS) - 1; + private static final int NODE_MASK = (1 << NODE_BITS) - 1; + + private static final long TSID_EPOCH = 1577836800000L; + + private final int nodeId; + private final AtomicInteger sequence = new AtomicInteger(0); + private volatile long lastTimestamp = -1L; - private final String sequenceName; + public MongoSequenceIncrementer() { + this.nodeId = (int) (System.nanoTime() & NODE_MASK); + } - public MongoSequenceIncrementer(MongoOperations mongoTemplate, String sequenceName) { - this.mongoTemplate = mongoTemplate; - this.sequenceName = sequenceName; + public MongoSequenceIncrementer(int nodeId) { + if (nodeId < 0 || nodeId > NODE_MASK) { + throw new IllegalArgumentException("Node ID must be between 0 and " + NODE_MASK); + } + this.nodeId = nodeId; } @Override public long nextLongValue() throws DataAccessException { - return mongoTemplate.execute("BATCH_SEQUENCES", - collection -> collection - .findOneAndUpdate(new Document("_id", sequenceName), new Document("$inc", new Document("count", 1)), - new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER)) - .getLong("count")); + return generateTsid(); } @Override @@ -61,4 +66,33 @@ public String nextStringValue() throws DataAccessException { throw new UnsupportedOperationException(); } + private synchronized long generateTsid() { + long timestamp = System.currentTimeMillis() - TSID_EPOCH; + + if (timestamp < lastTimestamp) { + timestamp = lastTimestamp; + } + + if (timestamp == lastTimestamp) { + int seq = sequence.incrementAndGet() & SEQUENCE_MASK; + if (seq == 0) { + timestamp = waitNextMillis(lastTimestamp); + lastTimestamp = timestamp; + } + return (timestamp << TIMESTAMP_SHIFT) | ((long) nodeId << NODE_SHIFT) | seq; + } else { + sequence.set(0); + lastTimestamp = timestamp; + return (timestamp << TIMESTAMP_SHIFT) | ((long) nodeId << NODE_SHIFT); + } + } + + private long waitNextMillis(long lastTimestamp) { + long timestamp = System.currentTimeMillis() - TSID_EPOCH; + while (timestamp <= lastTimestamp) { + timestamp = System.currentTimeMillis() - TSID_EPOCH; + } + return timestamp; + } + } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java index f203f3184a..450809d241 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java @@ -43,8 +43,6 @@ public class MongoStepExecutionDao implements StepExecutionDao { private static final String STEP_EXECUTIONS_COLLECTION_NAME = "BATCH_STEP_EXECUTION"; - private static final String STEP_EXECUTIONS_SEQUENCE_NAME = "BATCH_STEP_EXECUTION_SEQ"; - private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION"; private final StepExecutionConverter stepExecutionConverter = new StepExecutionConverter(); @@ -59,7 +57,7 @@ public class MongoStepExecutionDao implements StepExecutionDao { public MongoStepExecutionDao(MongoOperations mongoOperations) { this.mongoOperations = mongoOperations; - this.stepExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, STEP_EXECUTIONS_SEQUENCE_NAME); + this.stepExecutionIncrementer = new MongoSequenceIncrementer(); } public void setStepExecutionIncrementer(DataFieldMaxValueIncrementer stepExecutionIncrementer) { diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java index 75e309f70b..b54038ab2a 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java @@ -108,15 +108,13 @@ public void afterPropertiesSet() throws Exception { super.afterPropertiesSet(); Assert.notNull(this.mongoOperations, "MongoOperations must not be null."); if (this.jobInstanceIncrementer == null) { - this.jobInstanceIncrementer = new MongoSequenceIncrementer(this.mongoOperations, "BATCH_JOB_INSTANCE_SEQ"); + this.jobInstanceIncrementer = new MongoSequenceIncrementer(); } if (this.jobExecutionIncrementer == null) { - this.jobExecutionIncrementer = new MongoSequenceIncrementer(this.mongoOperations, - "BATCH_JOB_EXECUTION_SEQ"); + this.jobExecutionIncrementer = new MongoSequenceIncrementer(); } if (this.stepExecutionIncrementer == null) { - this.stepExecutionIncrementer = new MongoSequenceIncrementer(this.mongoOperations, - "BATCH_STEP_EXECUTION_SEQ"); + this.stepExecutionIncrementer = new MongoSequenceIncrementer(); } } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementerTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementerTests.java new file mode 100644 index 0000000000..02e501bf56 --- /dev/null +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementerTests.java @@ -0,0 +1,101 @@ +/* + * Copyright 2024-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.core.repository.dao.mongodb; + +import org.junit.jupiter.api.Test; +import org.springframework.dao.DataAccessException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Tests for {@link MongoSequenceIncrementer}. + */ +public class MongoSequenceIncrementerTests { + + @Test + void testTimeOrdering() throws DataAccessException { + MongoSequenceIncrementer incrementer = new MongoSequenceIncrementer(); + List ids = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + ids.add(incrementer.nextLongValue()); + } + + List sorted = new ArrayList<>(ids); + Collections.sort(sorted); + assertEquals(sorted, ids, "IDs should be in time order"); + } + + @Test + void testConcurrency() throws InterruptedException { + MongoSequenceIncrementer incrementer = new MongoSequenceIncrementer(); + Set ids = Collections.synchronizedSet(new HashSet<>()); + int threadCount = 10; + int idsPerThread = 100; + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + CountDownLatch latch = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + executor.submit(() -> { + try { + for (int j = 0; j < idsPerThread; j++) { + ids.add(incrementer.nextLongValue()); + } + } + catch (DataAccessException e) { + fail("Should not throw DataAccessException: " + e.getMessage()); + } + finally { + latch.countDown(); + } + }); + } + + latch.await(10, TimeUnit.SECONDS); + executor.shutdown(); + + assertEquals(threadCount * idsPerThread, ids.size(), + "All IDs generated from multiple threads should be unique"); + } + + @Test + void testNodeIdSeparation() throws DataAccessException { + MongoSequenceIncrementer incrementer1 = new MongoSequenceIncrementer(1); + MongoSequenceIncrementer incrementer2 = new MongoSequenceIncrementer(2); + + long id1 = incrementer1.nextLongValue(); + long id2 = incrementer2.nextLongValue(); + + assertNotEquals(id1, id2, "IDs from different nodes should be different"); + + long nodeId1 = (id1 >> 12) & 0x3FF; + long nodeId2 = (id2 >> 12) & 0x3FF; + + assertEquals(1, nodeId1, "First ID should have node ID 1"); + assertEquals(2, nodeId2, "Second ID should have node ID 2"); + } + +} diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryIntegrationTests.java index 5d3fe053da..abd0a7eaf6 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryIntegrationTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryIntegrationTests.java @@ -86,6 +86,51 @@ void testJobExecution(@Autowired JobOperator jobOperator, @Autowired Job job) th dump(stepExecutionsCollection, "step execution = "); } + @Test + void testParallelJobExecution(@Autowired JobOperator jobOperator, @Autowired Job job) throws Exception { + int parallelJobs = 10; + Thread[] threads = new Thread[parallelJobs]; + JobExecution[] executions = new JobExecution[parallelJobs]; + + for (int i = 0; i < parallelJobs; i++) { + final int idx = i; + threads[i] = new Thread(() -> { + JobParameters jobParameters = new JobParametersBuilder() + .addString("name", "foo" + idx) + .addLocalDateTime("runtime", LocalDateTime.now()) + .toJobParameters(); + try { + executions[idx] = jobOperator.start(job, jobParameters); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + for (JobExecution exec : executions) { + Assertions.assertNotNull(exec); + Assertions.assertEquals(ExitStatus.COMPLETED, exec.getExitStatus()); + } + + MongoCollection jobInstancesCollection = mongoTemplate.getCollection("BATCH_JOB_INSTANCE"); + MongoCollection jobExecutionsCollection = mongoTemplate.getCollection("BATCH_JOB_EXECUTION"); + MongoCollection stepExecutionsCollection = mongoTemplate.getCollection("BATCH_STEP_EXECUTION"); + + Assertions.assertEquals(parallelJobs, jobInstancesCollection.countDocuments()); + Assertions.assertEquals(parallelJobs, jobExecutionsCollection.countDocuments()); + Assertions.assertEquals(parallelJobs * 2, stepExecutionsCollection.countDocuments()); + + // dump results for inspection + dump(jobInstancesCollection, "job instance = "); + dump(jobExecutionsCollection, "job execution = "); + dump(stepExecutionsCollection, "step execution = "); + } + private static void dump(MongoCollection collection, String prefix) { for (Document document : collection.find()) { System.out.println(prefix + document.toJson()); From 190767ec32d09f316d2527ce93fca5efdd4979ff Mon Sep 17 00:00:00 2001 From: Hyun Jong Park Date: Mon, 8 Dec 2025 17:58:19 +0900 Subject: [PATCH 2/2] Improve nodeId generation to prevent conflicts in distributed environments Signed-off-by: Hyun Jong Park --- .../dao/mongodb/MongoSequenceIncrementer.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java index b7a9086f6f..5bac301256 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java @@ -18,6 +18,8 @@ import org.springframework.dao.DataAccessException; import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; +import java.net.InetAddress; +import java.security.SecureRandom; import java.util.concurrent.atomic.AtomicInteger; /** @@ -40,8 +42,10 @@ public class MongoSequenceIncrementer implements DataFieldMaxValueIncrementer { private final AtomicInteger sequence = new AtomicInteger(0); private volatile long lastTimestamp = -1L; + private static final SecureRandom random = new SecureRandom(); + public MongoSequenceIncrementer() { - this.nodeId = (int) (System.nanoTime() & NODE_MASK); + this.nodeId = calculateNodeId(); } public MongoSequenceIncrementer(int nodeId) { @@ -95,4 +99,16 @@ private long waitNextMillis(long lastTimestamp) { return timestamp; } + private int calculateNodeId() { + try { + String hostname = InetAddress.getLocalHost().getHostName(); + int hostHash = hostname.hashCode(); + long processId = ProcessHandle.current().pid(); + long randomValue = random.nextInt(); + return (int) ((hostHash ^ processId ^ randomValue) & NODE_MASK); + } catch (Exception e) { + return (int) ((System.nanoTime() ^ Thread.currentThread().getId()) & NODE_MASK); + } + } + }