diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java index 1f28cd73eb..e8f4da4a82 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java @@ -159,7 +159,7 @@ protected JobKeyGenerator getJobKeyGenerator() { * @since 6.0 */ protected DataFieldMaxValueIncrementer getJobInstanceIncrementer() { - return new MongoSequenceIncrementer(getMongoOperations(), "BATCH_JOB_INSTANCE_SEQ"); + return new MongoSequenceIncrementer(); } /** @@ -168,7 +168,7 @@ protected DataFieldMaxValueIncrementer getJobInstanceIncrementer() { * @since 6.0 */ protected DataFieldMaxValueIncrementer getJobExecutionIncrementer() { - return new MongoSequenceIncrementer(getMongoOperations(), "BATCH_JOB_EXECUTION_SEQ"); + return new MongoSequenceIncrementer(); } /** @@ -177,7 +177,7 @@ protected DataFieldMaxValueIncrementer getJobExecutionIncrementer() { * @since 6.0 */ protected DataFieldMaxValueIncrementer getStepExecutionIncrementer() { - return new MongoSequenceIncrementer(getMongoOperations(), "BATCH_STEP_EXECUTION_SEQ"); + return new MongoSequenceIncrementer(); } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java index e0630659ab..3bfbf7d8b2 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java @@ -50,8 +50,6 @@ public class MongoJobExecutionDao implements JobExecutionDao { private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION"; - private static final String JOB_EXECUTIONS_SEQUENCE_NAME = "BATCH_JOB_EXECUTION_SEQ"; - private final MongoOperations mongoOperations; private final JobExecutionConverter jobExecutionConverter = new JobExecutionConverter(); @@ -62,7 +60,7 @@ public class MongoJobExecutionDao implements JobExecutionDao { public MongoJobExecutionDao(MongoOperations mongoOperations) { this.mongoOperations = mongoOperations; - this.jobExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, JOB_EXECUTIONS_SEQUENCE_NAME); + this.jobExecutionIncrementer = new MongoSequenceIncrementer(); } public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) { diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java index 760bdbba55..c0d1ff2c8b 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java @@ -44,8 +44,6 @@ public class MongoJobInstanceDao implements JobInstanceDao { private static final String COLLECTION_NAME = "BATCH_JOB_INSTANCE"; - private static final String SEQUENCE_NAME = "BATCH_JOB_INSTANCE_SEQ"; - private final MongoOperations mongoOperations; private DataFieldMaxValueIncrementer jobInstanceIncrementer; @@ -57,7 +55,7 @@ public class MongoJobInstanceDao implements JobInstanceDao { public MongoJobInstanceDao(MongoOperations mongoOperations) { Assert.notNull(mongoOperations, "mongoOperations must not be null."); this.mongoOperations = mongoOperations; - this.jobInstanceIncrementer = new MongoSequenceIncrementer(mongoOperations, SEQUENCE_NAME); + this.jobInstanceIncrementer = new MongoSequenceIncrementer(); } public void setJobKeyGenerator(JobKeyGenerator jobKeyGenerator) { diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java index 9722db637f..5bac301256 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java @@ -15,16 +15,12 @@ */ package org.springframework.batch.core.repository.dao.mongodb; -import com.mongodb.client.model.FindOneAndUpdateOptions; -import com.mongodb.client.model.ReturnDocument; -import org.bson.Document; - import org.springframework.dao.DataAccessException; -import org.springframework.data.mongodb.core.MongoOperations; import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; -// Based on https://www.mongodb.com/blog/post/generating-globally-unique-identifiers-for-use-with-mongodb -// Section: Use a single counter document to generate unique identifiers one at a time +import java.net.InetAddress; +import java.security.SecureRandom; +import java.util.concurrent.atomic.AtomicInteger; /** * @author Mahmoud Ben Hassine @@ -33,22 +29,35 @@ */ public class MongoSequenceIncrementer implements DataFieldMaxValueIncrementer { - private final MongoOperations mongoTemplate; + private static final int NODE_BITS = 10; + private static final int SEQUENCE_BITS = 12; + private static final int NODE_SHIFT = SEQUENCE_BITS; + private static final int TIMESTAMP_SHIFT = NODE_BITS + SEQUENCE_BITS; + private static final int SEQUENCE_MASK = (1 << SEQUENCE_BITS) - 1; + private static final int NODE_MASK = (1 << NODE_BITS) - 1; + + private static final long TSID_EPOCH = 1577836800000L; + + private final int nodeId; + private final AtomicInteger sequence = new AtomicInteger(0); + private volatile long lastTimestamp = -1L; + + private static final SecureRandom random = new SecureRandom(); - private final String sequenceName; + public MongoSequenceIncrementer() { + this.nodeId = calculateNodeId(); + } - public MongoSequenceIncrementer(MongoOperations mongoTemplate, String sequenceName) { - this.mongoTemplate = mongoTemplate; - this.sequenceName = sequenceName; + public MongoSequenceIncrementer(int nodeId) { + if (nodeId < 0 || nodeId > NODE_MASK) { + throw new IllegalArgumentException("Node ID must be between 0 and " + NODE_MASK); + } + this.nodeId = nodeId; } @Override public long nextLongValue() throws DataAccessException { - return mongoTemplate.execute("BATCH_SEQUENCES", - collection -> collection - .findOneAndUpdate(new Document("_id", sequenceName), new Document("$inc", new Document("count", 1)), - new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER)) - .getLong("count")); + return generateTsid(); } @Override @@ -61,4 +70,45 @@ public String nextStringValue() throws DataAccessException { throw new UnsupportedOperationException(); } + private synchronized long generateTsid() { + long timestamp = System.currentTimeMillis() - TSID_EPOCH; + + if (timestamp < lastTimestamp) { + timestamp = lastTimestamp; + } + + if (timestamp == lastTimestamp) { + int seq = sequence.incrementAndGet() & SEQUENCE_MASK; + if (seq == 0) { + timestamp = waitNextMillis(lastTimestamp); + lastTimestamp = timestamp; + } + return (timestamp << TIMESTAMP_SHIFT) | ((long) nodeId << NODE_SHIFT) | seq; + } else { + sequence.set(0); + lastTimestamp = timestamp; + return (timestamp << TIMESTAMP_SHIFT) | ((long) nodeId << NODE_SHIFT); + } + } + + private long waitNextMillis(long lastTimestamp) { + long timestamp = System.currentTimeMillis() - TSID_EPOCH; + while (timestamp <= lastTimestamp) { + timestamp = System.currentTimeMillis() - TSID_EPOCH; + } + return timestamp; + } + + private int calculateNodeId() { + try { + String hostname = InetAddress.getLocalHost().getHostName(); + int hostHash = hostname.hashCode(); + long processId = ProcessHandle.current().pid(); + long randomValue = random.nextInt(); + return (int) ((hostHash ^ processId ^ randomValue) & NODE_MASK); + } catch (Exception e) { + return (int) ((System.nanoTime() ^ Thread.currentThread().getId()) & NODE_MASK); + } + } + } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java index f203f3184a..450809d241 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java @@ -43,8 +43,6 @@ public class MongoStepExecutionDao implements StepExecutionDao { private static final String STEP_EXECUTIONS_COLLECTION_NAME = "BATCH_STEP_EXECUTION"; - private static final String STEP_EXECUTIONS_SEQUENCE_NAME = "BATCH_STEP_EXECUTION_SEQ"; - private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION"; private final StepExecutionConverter stepExecutionConverter = new StepExecutionConverter(); @@ -59,7 +57,7 @@ public class MongoStepExecutionDao implements StepExecutionDao { public MongoStepExecutionDao(MongoOperations mongoOperations) { this.mongoOperations = mongoOperations; - this.stepExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, STEP_EXECUTIONS_SEQUENCE_NAME); + this.stepExecutionIncrementer = new MongoSequenceIncrementer(); } public void setStepExecutionIncrementer(DataFieldMaxValueIncrementer stepExecutionIncrementer) { diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java index 75e309f70b..b54038ab2a 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java @@ -108,15 +108,13 @@ public void afterPropertiesSet() throws Exception { super.afterPropertiesSet(); Assert.notNull(this.mongoOperations, "MongoOperations must not be null."); if (this.jobInstanceIncrementer == null) { - this.jobInstanceIncrementer = new MongoSequenceIncrementer(this.mongoOperations, "BATCH_JOB_INSTANCE_SEQ"); + this.jobInstanceIncrementer = new MongoSequenceIncrementer(); } if (this.jobExecutionIncrementer == null) { - this.jobExecutionIncrementer = new MongoSequenceIncrementer(this.mongoOperations, - "BATCH_JOB_EXECUTION_SEQ"); + this.jobExecutionIncrementer = new MongoSequenceIncrementer(); } if (this.stepExecutionIncrementer == null) { - this.stepExecutionIncrementer = new MongoSequenceIncrementer(this.mongoOperations, - "BATCH_STEP_EXECUTION_SEQ"); + this.stepExecutionIncrementer = new MongoSequenceIncrementer(); } } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementerTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementerTests.java new file mode 100644 index 0000000000..02e501bf56 --- /dev/null +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementerTests.java @@ -0,0 +1,101 @@ +/* + * Copyright 2024-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.core.repository.dao.mongodb; + +import org.junit.jupiter.api.Test; +import org.springframework.dao.DataAccessException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Tests for {@link MongoSequenceIncrementer}. + */ +public class MongoSequenceIncrementerTests { + + @Test + void testTimeOrdering() throws DataAccessException { + MongoSequenceIncrementer incrementer = new MongoSequenceIncrementer(); + List ids = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + ids.add(incrementer.nextLongValue()); + } + + List sorted = new ArrayList<>(ids); + Collections.sort(sorted); + assertEquals(sorted, ids, "IDs should be in time order"); + } + + @Test + void testConcurrency() throws InterruptedException { + MongoSequenceIncrementer incrementer = new MongoSequenceIncrementer(); + Set ids = Collections.synchronizedSet(new HashSet<>()); + int threadCount = 10; + int idsPerThread = 100; + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + CountDownLatch latch = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + executor.submit(() -> { + try { + for (int j = 0; j < idsPerThread; j++) { + ids.add(incrementer.nextLongValue()); + } + } + catch (DataAccessException e) { + fail("Should not throw DataAccessException: " + e.getMessage()); + } + finally { + latch.countDown(); + } + }); + } + + latch.await(10, TimeUnit.SECONDS); + executor.shutdown(); + + assertEquals(threadCount * idsPerThread, ids.size(), + "All IDs generated from multiple threads should be unique"); + } + + @Test + void testNodeIdSeparation() throws DataAccessException { + MongoSequenceIncrementer incrementer1 = new MongoSequenceIncrementer(1); + MongoSequenceIncrementer incrementer2 = new MongoSequenceIncrementer(2); + + long id1 = incrementer1.nextLongValue(); + long id2 = incrementer2.nextLongValue(); + + assertNotEquals(id1, id2, "IDs from different nodes should be different"); + + long nodeId1 = (id1 >> 12) & 0x3FF; + long nodeId2 = (id2 >> 12) & 0x3FF; + + assertEquals(1, nodeId1, "First ID should have node ID 1"); + assertEquals(2, nodeId2, "Second ID should have node ID 2"); + } + +} diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryIntegrationTests.java index 5d3fe053da..abd0a7eaf6 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryIntegrationTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryIntegrationTests.java @@ -86,6 +86,51 @@ void testJobExecution(@Autowired JobOperator jobOperator, @Autowired Job job) th dump(stepExecutionsCollection, "step execution = "); } + @Test + void testParallelJobExecution(@Autowired JobOperator jobOperator, @Autowired Job job) throws Exception { + int parallelJobs = 10; + Thread[] threads = new Thread[parallelJobs]; + JobExecution[] executions = new JobExecution[parallelJobs]; + + for (int i = 0; i < parallelJobs; i++) { + final int idx = i; + threads[i] = new Thread(() -> { + JobParameters jobParameters = new JobParametersBuilder() + .addString("name", "foo" + idx) + .addLocalDateTime("runtime", LocalDateTime.now()) + .toJobParameters(); + try { + executions[idx] = jobOperator.start(job, jobParameters); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + for (JobExecution exec : executions) { + Assertions.assertNotNull(exec); + Assertions.assertEquals(ExitStatus.COMPLETED, exec.getExitStatus()); + } + + MongoCollection jobInstancesCollection = mongoTemplate.getCollection("BATCH_JOB_INSTANCE"); + MongoCollection jobExecutionsCollection = mongoTemplate.getCollection("BATCH_JOB_EXECUTION"); + MongoCollection stepExecutionsCollection = mongoTemplate.getCollection("BATCH_STEP_EXECUTION"); + + Assertions.assertEquals(parallelJobs, jobInstancesCollection.countDocuments()); + Assertions.assertEquals(parallelJobs, jobExecutionsCollection.countDocuments()); + Assertions.assertEquals(parallelJobs * 2, stepExecutionsCollection.countDocuments()); + + // dump results for inspection + dump(jobInstancesCollection, "job instance = "); + dump(jobExecutionsCollection, "job execution = "); + dump(stepExecutionsCollection, "step execution = "); + } + private static void dump(MongoCollection collection, String prefix) { for (Document document : collection.find()) { System.out.println(prefix + document.toJson());