From c85d18e934543a2030f5427c572eb478173e2b11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 17:43:21 +0100 Subject: [PATCH 1/8] feat(core): add OutboxSchedulerConfig value object with validation --- .../okapi/core/OutboxSchedulerConfig.kt | 11 +++++ .../okapi/core/OutboxSchedulerConfigTest.kt | 44 +++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxSchedulerConfig.kt create mode 100644 okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxSchedulerConfigTest.kt diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxSchedulerConfig.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxSchedulerConfig.kt new file mode 100644 index 0000000..5d3ea60 --- /dev/null +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxSchedulerConfig.kt @@ -0,0 +1,11 @@ +package com.softwaremill.okapi.core + +data class OutboxSchedulerConfig( + val intervalMs: Long = 1_000L, + val batchSize: Int = 10, +) { + init { + require(intervalMs > 0) { "intervalMs must be positive, got: $intervalMs" } + require(batchSize > 0) { "batchSize must be positive, got: $batchSize" } + } +} diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxSchedulerConfigTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxSchedulerConfigTest.kt new file mode 100644 index 0000000..c92d437 --- /dev/null +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxSchedulerConfigTest.kt @@ -0,0 +1,44 @@ +package com.softwaremill.okapi.core + +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe + +class OutboxSchedulerConfigTest : FunSpec({ + + test("default config has valid values") { + val config = OutboxSchedulerConfig() + config.intervalMs shouldBe 1_000L + config.batchSize shouldBe 10 + } + + test("accepts custom valid values") { + val config = OutboxSchedulerConfig(intervalMs = 500, batchSize = 50) + config.intervalMs shouldBe 500L + config.batchSize shouldBe 50 + } + + test("rejects zero intervalMs") { + shouldThrow { + OutboxSchedulerConfig(intervalMs = 0) + } + } + + test("rejects negative intervalMs") { + shouldThrow { + OutboxSchedulerConfig(intervalMs = -1) + } + } + + test("rejects zero batchSize") { + shouldThrow { + OutboxSchedulerConfig(batchSize = 0) + } + } + + test("rejects negative batchSize") { + shouldThrow { + OutboxSchedulerConfig(batchSize = -5) + } + } +}) From 9d8e21b332ab2257cd434d08d516af351381f023 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 17:44:59 +0100 Subject: [PATCH 2/8] feat(core): rewrite OutboxScheduler with error handling, guards, logging Add try/catch in tick() to prevent silent scheduler death on exception. Add AtomicBoolean running guard and isShutdown restart check. Accept OutboxSchedulerConfig instead of raw parameters. Add SLF4J logging: INFO start/stop, DEBUG per tick, ERROR on failure. Add isRunning() method. --- .../okapi/core/OutboxScheduler.kt | 41 +++-- .../okapi/core/OutboxSchedulerTest.kt | 165 ++++++++++++++++++ .../springboot/OutboxProcessorScheduler.kt | 4 +- 3 files changed, 194 insertions(+), 16 deletions(-) create mode 100644 okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxSchedulerTest.kt diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxScheduler.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxScheduler.kt index 798d809..2c76d52 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxScheduler.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxScheduler.kt @@ -1,51 +1,64 @@ package com.softwaremill.okapi.core +import org.slf4j.LoggerFactory import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean /** * Standalone scheduler that periodically calls [OutboxProcessor.processNext]. * * Each tick is optionally wrapped in a transaction via [transactionRunner]. - * Runs on a single daemon thread and provides explicit [start]/[stop] lifecycle. + * Runs on a single daemon thread with explicit [start]/[stop] lifecycle. + * [start] and [stop] are single-use -- the internal executor cannot be restarted after shutdown. + * [AtomicBoolean] guards against accidental double-start, not restart. * * Framework-specific modules hook into their own lifecycle events: - * - `okapi-spring`: `SmartInitializingSingleton` / `DisposableBean` + * - `okapi-spring-boot`: `SmartLifecycle` * - `okapi-ktor`: `ApplicationStarted` / `ApplicationStopped` */ class OutboxScheduler( private val outboxProcessor: OutboxProcessor, private val transactionRunner: TransactionRunner? = null, - private val intervalMs: Long = 1_000L, - private val batchSize: Int = 10, + private val config: OutboxSchedulerConfig = OutboxSchedulerConfig(), ) { + private val running = AtomicBoolean(false) + private val scheduler: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor { r -> Thread(r, "outbox-processor").apply { isDaemon = true } } fun start() { - scheduler.scheduleWithFixedDelay( - ::tick, - 0L, - intervalMs, - TimeUnit.MILLISECONDS, - ) + check(!scheduler.isShutdown) { "OutboxScheduler cannot be restarted after stop()" } + if (!running.compareAndSet(false, true)) return + logger.info("Outbox processor started [interval={}ms, batchSize={}]", config.intervalMs, config.batchSize) + scheduler.scheduleWithFixedDelay(::tick, 0L, config.intervalMs, TimeUnit.MILLISECONDS) } fun stop() { + if (!running.compareAndSet(true, false)) return scheduler.shutdown() if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { scheduler.shutdownNow() } + logger.info("Outbox processor stopped") } + fun isRunning(): Boolean = running.get() + private fun tick() { - if (transactionRunner != null) { - transactionRunner.runInTransaction { outboxProcessor.processNext(batchSize) } - } else { - outboxProcessor.processNext(batchSize) + try { + transactionRunner?.runInTransaction { outboxProcessor.processNext(config.batchSize) } + ?: outboxProcessor.processNext(config.batchSize) + logger.debug("Outbox processor tick completed [batchSize={}]", config.batchSize) + } catch (e: Exception) { + logger.error("Outbox processor tick failed, will retry at next scheduled interval", e) } } + + companion object { + private val logger = LoggerFactory.getLogger(OutboxScheduler::class.java) + } } diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxSchedulerTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxSchedulerTest.kt new file mode 100644 index 0000000..6e9da7d --- /dev/null +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxSchedulerTest.kt @@ -0,0 +1,165 @@ +package com.softwaremill.okapi.core + +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger + +class OutboxSchedulerTest : FunSpec({ + + test("tick calls processNext with configured batchSize") { + var capturedLimit: Int? = null + val latch = CountDownLatch(1) + val processor = stubProcessor { limit -> + capturedLimit = limit + latch.countDown() + } + + val scheduler = OutboxScheduler( + outboxProcessor = processor, + config = OutboxSchedulerConfig(intervalMs = 50, batchSize = 25), + ) + + scheduler.start() + latch.await(2, TimeUnit.SECONDS) shouldBe true + scheduler.stop() + + capturedLimit shouldBe 25 + } + + test("exception in tick does not kill scheduler") { + val callCount = AtomicInteger(0) + val latch = CountDownLatch(2) + val processor = stubProcessor { _ -> + val count = callCount.incrementAndGet() + latch.countDown() + if (count == 1) throw RuntimeException("db connection lost") + } + + val scheduler = OutboxScheduler( + outboxProcessor = processor, + config = OutboxSchedulerConfig(intervalMs = 50), + ) + + scheduler.start() + latch.await(2, TimeUnit.SECONDS) shouldBe true + scheduler.stop() + + callCount.get() shouldBe 2 + } + + test("double start is ignored") { + val callCount = AtomicInteger(0) + val latch = CountDownLatch(1) + val processor = stubProcessor { _ -> + callCount.incrementAndGet() + latch.countDown() + } + + val scheduler = OutboxScheduler( + outboxProcessor = processor, + config = OutboxSchedulerConfig(intervalMs = 50), + ) + + scheduler.start() + scheduler.start() + latch.await(2, TimeUnit.SECONDS) shouldBe true + scheduler.stop() + + scheduler.isRunning() shouldBe false + } + + test("isRunning transitions") { + val processor = stubProcessor { _ -> } + val scheduler = OutboxScheduler( + outboxProcessor = processor, + config = OutboxSchedulerConfig(intervalMs = 60_000), + ) + + scheduler.isRunning() shouldBe false + scheduler.start() + scheduler.isRunning() shouldBe true + scheduler.stop() + scheduler.isRunning() shouldBe false + } + + test("start after stop throws") { + val processor = stubProcessor { _ -> } + val scheduler = OutboxScheduler( + outboxProcessor = processor, + config = OutboxSchedulerConfig(intervalMs = 60_000), + ) + + scheduler.start() + scheduler.stop() + + shouldThrow { + scheduler.start() + }.message shouldBe "OutboxScheduler cannot be restarted after stop()" + } + + test("transactionRunner wraps tick when provided") { + val txInvoked = AtomicBoolean(false) + val latch = CountDownLatch(1) + val processor = stubProcessor { _ -> latch.countDown() } + val txRunner = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T { + txInvoked.set(true) + return block() + } + } + + val scheduler = OutboxScheduler( + outboxProcessor = processor, + transactionRunner = txRunner, + config = OutboxSchedulerConfig(intervalMs = 50), + ) + + scheduler.start() + latch.await(2, TimeUnit.SECONDS) shouldBe true + scheduler.stop() + + txInvoked.get() shouldBe true + } + + test("tick runs without transactionRunner") { + val latch = CountDownLatch(1) + val processor = stubProcessor { _ -> latch.countDown() } + + val scheduler = OutboxScheduler( + outboxProcessor = processor, + transactionRunner = null, + config = OutboxSchedulerConfig(intervalMs = 50), + ) + + scheduler.start() + latch.await(2, TimeUnit.SECONDS) shouldBe true + scheduler.stop() + } +}) + +private fun stubProcessor(onProcessNext: (Int) -> Unit): OutboxProcessor { + val store = object : OutboxStore { + override fun persist(entry: OutboxEntry) = entry + override fun claimPending(limit: Int): List { + onProcessNext(limit) + return emptyList() + } + override fun updateAfterProcessing(entry: OutboxEntry) = entry + override fun removeDeliveredBefore(time: java.time.Instant, limit: Int) = 0 + override fun findOldestCreatedAt(statuses: Set) = emptyMap() + override fun countByStatuses() = emptyMap() + } + val entryProcessor = OutboxEntryProcessor( + deliverer = object : MessageDeliverer { + override val type = "stub" + override fun deliver(entry: OutboxEntry) = DeliveryResult.Success + }, + retryPolicy = RetryPolicy(maxRetries = 3), + clock = java.time.Clock.systemUTC(), + ) + return OutboxProcessor(store = store, entryProcessor = entryProcessor) +} diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorScheduler.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorScheduler.kt index 94a686b..5c02fee 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorScheduler.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorScheduler.kt @@ -2,6 +2,7 @@ package com.softwaremill.okapi.springboot import com.softwaremill.okapi.core.OutboxProcessor import com.softwaremill.okapi.core.OutboxScheduler +import com.softwaremill.okapi.core.OutboxSchedulerConfig import org.springframework.beans.factory.DisposableBean import org.springframework.beans.factory.SmartInitializingSingleton import org.springframework.transaction.support.TransactionTemplate @@ -26,8 +27,7 @@ class OutboxProcessorScheduler( private val scheduler = OutboxScheduler( outboxProcessor = outboxProcessor, transactionRunner = transactionTemplate?.let { SpringTransactionRunner(it) }, - intervalMs = intervalMs, - batchSize = batchSize, + config = OutboxSchedulerConfig(intervalMs = intervalMs, batchSize = batchSize), ) override fun afterSingletonsInstantiated() { From 1713ff32729ebe27b6f970e6dd23573138c7f795 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 18:07:39 +0100 Subject: [PATCH 3/8] feat(spring): add OutboxProcessorProperties with @ConfigurationProperties binding --- .../okapi/springboot/OutboxProcessorProperties.kt | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorProperties.kt diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorProperties.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorProperties.kt new file mode 100644 index 0000000..2e01b31 --- /dev/null +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorProperties.kt @@ -0,0 +1,12 @@ +package com.softwaremill.okapi.springboot + +import jakarta.validation.constraints.Min +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.validation.annotation.Validated + +@ConfigurationProperties(prefix = "okapi.processor") +@Validated +data class OutboxProcessorProperties( + @field:Min(1) val intervalMs: Long = 1_000, + @field:Min(1) val batchSize: Int = 10, +) From b7f2045fa912d76d6a3379da3f414e07220826bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 18:08:00 +0100 Subject: [PATCH 4/8] feat(spring): migrate OutboxProcessorScheduler to SmartLifecycle Replace SmartInitializingSingleton + DisposableBean with SmartLifecycle. Add stop(callback) with try-finally to prevent Spring shutdown hang. Add phase ordering: processor (MAX_VALUE-2048) starts before purger (MAX_VALUE-1024) and stops after it. Accept OutboxSchedulerConfig instead of raw parameters. --- .../springboot/OutboxProcessorScheduler.kt | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorScheduler.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorScheduler.kt index 5c02fee..d4fc485 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorScheduler.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorScheduler.kt @@ -3,38 +3,51 @@ package com.softwaremill.okapi.springboot import com.softwaremill.okapi.core.OutboxProcessor import com.softwaremill.okapi.core.OutboxScheduler import com.softwaremill.okapi.core.OutboxSchedulerConfig -import org.springframework.beans.factory.DisposableBean -import org.springframework.beans.factory.SmartInitializingSingleton +import org.springframework.context.SmartLifecycle import org.springframework.transaction.support.TransactionTemplate /** - * Schedules periodic calls to [OutboxProcessor.processNext] on a dedicated daemon thread. + * Spring lifecycle wrapper for [OutboxScheduler]. * - * Each tick is wrapped in a Spring transaction when a [TransactionTemplate] is provided. - * Starts automatically after all Spring beans are initialized ([SmartInitializingSingleton]) - * and shuts down gracefully on context close ([DisposableBean]). + * Uses [SmartLifecycle] for phase-ordered startup/shutdown. + * Starts before [OutboxPurgerScheduler] and stops after it, + * ensuring entries are processed before they can be purged. * - * Delegates scheduling logic to [OutboxScheduler] from okapi-core. + * Enabled by default; disable with `okapi.processor.enabled=false`. */ class OutboxProcessorScheduler( outboxProcessor: OutboxProcessor, transactionTemplate: TransactionTemplate?, - intervalMs: Long = 1_000L, - batchSize: Int = 10, -) : SmartInitializingSingleton, - DisposableBean { + config: OutboxSchedulerConfig = OutboxSchedulerConfig(), +) : SmartLifecycle { private val scheduler = OutboxScheduler( outboxProcessor = outboxProcessor, transactionRunner = transactionTemplate?.let { SpringTransactionRunner(it) }, - config = OutboxSchedulerConfig(intervalMs = intervalMs, batchSize = batchSize), + config = config, ) - override fun afterSingletonsInstantiated() { + override fun start() { scheduler.start() } - override fun destroy() { + override fun stop() { scheduler.stop() } + + override fun stop(callback: Runnable) { + try { + scheduler.stop() + } finally { + callback.run() + } + } + + override fun isRunning(): Boolean = scheduler.isRunning() + + override fun getPhase(): Int = PROCESSOR_PHASE + + companion object { + const val PROCESSOR_PHASE = Integer.MAX_VALUE - 2048 + } } From 6161168897a36b51e1b416d78b5ce8cc8dd738b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 18:08:38 +0100 Subject: [PATCH 5/8] feat(spring): bind OutboxProcessorProperties in OutboxAutoConfiguration Add @EnableConfigurationProperties for OutboxProcessorProperties. Add @ConditionalOnProperty for okapi.processor.enabled toggle. Map properties to OutboxSchedulerConfig in bean factory. --- .../okapi/springboot/OutboxAutoConfiguration.kt | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt index 3b8f5e1..b5be1d5 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt @@ -5,6 +5,7 @@ import com.softwaremill.okapi.core.MessageDeliverer import com.softwaremill.okapi.core.OutboxEntryProcessor import com.softwaremill.okapi.core.OutboxProcessor import com.softwaremill.okapi.core.OutboxPublisher +import com.softwaremill.okapi.core.OutboxSchedulerConfig import com.softwaremill.okapi.core.OutboxStore import com.softwaremill.okapi.core.RetryPolicy import com.softwaremill.okapi.mysql.MysqlOutboxStore @@ -42,7 +43,7 @@ import javax.sql.DataSource * - [PlatformTransactionManager] — if absent, each store call runs in its own transaction */ @AutoConfiguration -@EnableConfigurationProperties(OutboxPurgerProperties::class) +@EnableConfigurationProperties(OutboxPurgerProperties::class, OutboxProcessorProperties::class) class OutboxAutoConfiguration { @Bean @ConditionalOnMissingBean @@ -82,13 +83,19 @@ class OutboxAutoConfiguration { @Bean @ConditionalOnMissingBean + @ConditionalOnProperty(prefix = "okapi.processor", name = ["enabled"], havingValue = "true", matchIfMissing = true) fun outboxProcessorScheduler( + props: OutboxProcessorProperties, outboxProcessor: OutboxProcessor, transactionManager: ObjectProvider, ): OutboxProcessorScheduler { return OutboxProcessorScheduler( outboxProcessor = outboxProcessor, transactionTemplate = transactionManager.getIfAvailable()?.let { TransactionTemplate(it) }, + config = OutboxSchedulerConfig( + intervalMs = props.intervalMs, + batchSize = props.batchSize, + ), ) } From dd9dcbe50d0bd7c4e65333f1fb7eee35d004c6db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 18:11:52 +0100 Subject: [PATCH 6/8] test(spring): add OutboxProcessorAutoConfigurationTest Test: bean creation, disabled toggle, properties binding, defaults, SmartLifecycle isRunning, validation, stop callback safety. --- .../OutboxProcessorAutoConfigurationTest.kt | 94 +++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt new file mode 100644 index 0000000..d453ed5 --- /dev/null +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt @@ -0,0 +1,94 @@ +package com.softwaremill.okapi.springboot + +import com.softwaremill.okapi.core.DeliveryResult +import com.softwaremill.okapi.core.MessageDeliverer +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.core.OutboxStore +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.nulls.shouldNotBeNull +import io.kotest.matchers.shouldBe +import org.springframework.boot.autoconfigure.AutoConfigurations +import org.springframework.boot.test.context.runner.ApplicationContextRunner +import java.time.Instant + +class OutboxProcessorAutoConfigurationTest : FunSpec({ + + val contextRunner = ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java)) + .withBean(OutboxStore::class.java, { stubStore() }) + .withBean(MessageDeliverer::class.java, { stubDeliverer() }) + + test("processor bean is created by default") { + contextRunner.run { ctx -> + ctx.getBean(OutboxProcessorScheduler::class.java).shouldNotBeNull() + } + } + + test("processor bean is not created when disabled") { + contextRunner + .withPropertyValues("okapi.processor.enabled=false") + .run { ctx -> + ctx.containsBean("outboxProcessorScheduler") shouldBe false + } + } + + test("properties are bound from application config") { + contextRunner + .withPropertyValues( + "okapi.processor.interval-ms=500", + "okapi.processor.batch-size=20", + ) + .run { ctx -> + val props = ctx.getBean(OutboxProcessorProperties::class.java) + props.intervalMs shouldBe 500 + props.batchSize shouldBe 20 + } + } + + test("default properties when nothing is configured") { + contextRunner.run { ctx -> + val props = ctx.getBean(OutboxProcessorProperties::class.java) + props.intervalMs shouldBe 1_000 + props.batchSize shouldBe 10 + } + } + + test("SmartLifecycle is running after context start") { + contextRunner.run { ctx -> + val scheduler = ctx.getBean(OutboxProcessorScheduler::class.java) + scheduler.isRunning shouldBe true + } + } + + test("invalid batch-size triggers validation") { + contextRunner + .withPropertyValues("okapi.processor.batch-size=0") + .run { ctx -> + ctx.startupFailure.shouldNotBeNull() + } + } + + test("stop callback is always invoked") { + contextRunner.run { ctx -> + val scheduler = ctx.getBean(OutboxProcessorScheduler::class.java) + var callbackInvoked = false + scheduler.stop { callbackInvoked = true } + callbackInvoked shouldBe true + } + } +}) + +private fun stubStore() = object : OutboxStore { + override fun persist(entry: OutboxEntry) = entry + override fun claimPending(limit: Int) = emptyList() + override fun updateAfterProcessing(entry: OutboxEntry) = entry + override fun removeDeliveredBefore(time: Instant, limit: Int) = 0 + override fun findOldestCreatedAt(statuses: Set) = emptyMap() + override fun countByStatuses() = emptyMap() +} + +private fun stubDeliverer() = object : MessageDeliverer { + override val type = "stub" + override fun deliver(entry: OutboxEntry) = DeliveryResult.Success +} From 1b7c44c35791260c407da6a29f2140e4067b7900 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 18:12:13 +0100 Subject: [PATCH 7/8] feat(db): add index (status, created_at) for processor claimPending query Without this index, claimPending() does a full table scan on SELECT ... WHERE status='PENDING' ORDER BY created_at. --- .../softwaremill/okapi/db/mysql/003__add_processor_index.sql | 4 ++++ .../resources/com/softwaremill/okapi/db/mysql/changelog.xml | 1 + .../com/softwaremill/okapi/db/004__add_processor_index.sql | 4 ++++ .../main/resources/com/softwaremill/okapi/db/changelog.xml | 1 + 4 files changed, 10 insertions(+) create mode 100644 okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/003__add_processor_index.sql create mode 100644 okapi-postgres/src/main/resources/com/softwaremill/okapi/db/004__add_processor_index.sql diff --git a/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/003__add_processor_index.sql b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/003__add_processor_index.sql new file mode 100644 index 0000000..70ca899 --- /dev/null +++ b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/003__add_processor_index.sql @@ -0,0 +1,4 @@ +--liquibase formatted sql +--changeset outbox:003 + +CREATE INDEX idx_outbox_status_created_at ON outbox (status, created_at); diff --git a/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml index fe5d7b0..1eb0831 100644 --- a/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml +++ b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml @@ -5,4 +5,5 @@ http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-latest.xsd"> + diff --git a/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/004__add_processor_index.sql b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/004__add_processor_index.sql new file mode 100644 index 0000000..2e4bafc --- /dev/null +++ b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/004__add_processor_index.sql @@ -0,0 +1,4 @@ +--liquibase formatted sql +--changeset outbox:004 + +CREATE INDEX IF NOT EXISTS idx_outbox_status_created_at ON outbox (status, created_at); diff --git a/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/changelog.xml b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/changelog.xml index d0497e3..ff6d05a 100644 --- a/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/changelog.xml +++ b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/changelog.xml @@ -6,4 +6,5 @@ + From dc624d3c93aec18faf099bbbf4e452475253c5da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 18:12:27 +0100 Subject: [PATCH 8/8] feat(spring): add processor properties to spring-configuration-metadata.json --- .../spring-configuration-metadata.json | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json b/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json index 73586cc..f081378 100644 --- a/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json +++ b/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json @@ -4,6 +4,11 @@ "name": "okapi.purger", "type": "com.softwaremill.okapi.springboot.OutboxPurgerProperties", "description": "Outbox purger configuration." + }, + { + "name": "okapi.processor", + "type": "com.softwaremill.okapi.springboot.OutboxProcessorProperties", + "description": "Outbox processor configuration." } ], "properties": [ @@ -30,6 +35,24 @@ "type": "java.lang.Integer", "defaultValue": 100, "description": "Number of entries deleted per batch. Each purge cycle may execute multiple batches." + }, + { + "name": "okapi.processor.enabled", + "type": "java.lang.Boolean", + "defaultValue": true, + "description": "Whether the outbox processor is enabled." + }, + { + "name": "okapi.processor.interval-ms", + "type": "java.lang.Long", + "defaultValue": 1000, + "description": "How often the processor runs, in milliseconds." + }, + { + "name": "okapi.processor.batch-size", + "type": "java.lang.Integer", + "defaultValue": 10, + "description": "Maximum number of pending entries to process per tick." } ] }