Skip to content
Open
Original file line number Diff line number Diff line change
@@ -1,51 +1,64 @@
package com.softwaremill.okapi.core

import org.slf4j.LoggerFactory
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

/**
* Standalone scheduler that periodically calls [OutboxProcessor.processNext].
*
* Each tick is optionally wrapped in a transaction via [transactionRunner].
* Runs on a single daemon thread and provides explicit [start]/[stop] lifecycle.
* Runs on a single daemon thread with explicit [start]/[stop] lifecycle.
* [start] and [stop] are single-use -- the internal executor cannot be restarted after shutdown.
* [AtomicBoolean] guards against accidental double-start, not restart.
*
* Framework-specific modules hook into their own lifecycle events:
* - `okapi-spring`: `SmartInitializingSingleton` / `DisposableBean`
* - `okapi-spring-boot`: `SmartLifecycle`
* - `okapi-ktor`: `ApplicationStarted` / `ApplicationStopped`
*/
class OutboxScheduler(
private val outboxProcessor: OutboxProcessor,
private val transactionRunner: TransactionRunner? = null,
private val intervalMs: Long = 1_000L,
private val batchSize: Int = 10,
private val config: OutboxSchedulerConfig = OutboxSchedulerConfig(),
) {
private val running = AtomicBoolean(false)

private val scheduler: ScheduledExecutorService =
Executors.newSingleThreadScheduledExecutor { r ->
Thread(r, "outbox-processor").apply { isDaemon = true }
}

fun start() {
scheduler.scheduleWithFixedDelay(
::tick,
0L,
intervalMs,
TimeUnit.MILLISECONDS,
)
check(!scheduler.isShutdown) { "OutboxScheduler cannot be restarted after stop()" }
if (!running.compareAndSet(false, true)) return
logger.info("Outbox processor started [interval={}ms, batchSize={}]", config.intervalMs, config.batchSize)
scheduler.scheduleWithFixedDelay(::tick, 0L, config.intervalMs, TimeUnit.MILLISECONDS)
}

fun stop() {
if (!running.compareAndSet(true, false)) return
scheduler.shutdown()
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow()
}
logger.info("Outbox processor stopped")
}

fun isRunning(): Boolean = running.get()

private fun tick() {
if (transactionRunner != null) {
transactionRunner.runInTransaction { outboxProcessor.processNext(batchSize) }
} else {
outboxProcessor.processNext(batchSize)
try {
transactionRunner?.runInTransaction { outboxProcessor.processNext(config.batchSize) }
?: outboxProcessor.processNext(config.batchSize)
logger.debug("Outbox processor tick completed [batchSize={}]", config.batchSize)
} catch (e: Exception) {
logger.error("Outbox processor tick failed, will retry at next scheduled interval", e)
}
}

companion object {
private val logger = LoggerFactory.getLogger(OutboxScheduler::class.java)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.softwaremill.okapi.core

data class OutboxSchedulerConfig(
val intervalMs: Long = 1_000L,
val batchSize: Int = 10,
) {
init {
require(intervalMs > 0) { "intervalMs must be positive, got: $intervalMs" }
require(batchSize > 0) { "batchSize must be positive, got: $batchSize" }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.softwaremill.okapi.core

import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe

class OutboxSchedulerConfigTest : FunSpec({

test("default config has valid values") {
val config = OutboxSchedulerConfig()
config.intervalMs shouldBe 1_000L
config.batchSize shouldBe 10
}

test("accepts custom valid values") {
val config = OutboxSchedulerConfig(intervalMs = 500, batchSize = 50)
config.intervalMs shouldBe 500L
config.batchSize shouldBe 50
}

test("rejects zero intervalMs") {
shouldThrow<IllegalArgumentException> {
OutboxSchedulerConfig(intervalMs = 0)
}
}

test("rejects negative intervalMs") {
shouldThrow<IllegalArgumentException> {
OutboxSchedulerConfig(intervalMs = -1)
}
}

test("rejects zero batchSize") {
shouldThrow<IllegalArgumentException> {
OutboxSchedulerConfig(batchSize = 0)
}
}

test("rejects negative batchSize") {
shouldThrow<IllegalArgumentException> {
OutboxSchedulerConfig(batchSize = -5)
}
}
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package com.softwaremill.okapi.core

import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger

class OutboxSchedulerTest : FunSpec({

test("tick calls processNext with configured batchSize") {
var capturedLimit: Int? = null
val latch = CountDownLatch(1)
val processor = stubProcessor { limit ->
capturedLimit = limit
latch.countDown()
}

val scheduler = OutboxScheduler(
outboxProcessor = processor,
config = OutboxSchedulerConfig(intervalMs = 50, batchSize = 25),
)

scheduler.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
scheduler.stop()

capturedLimit shouldBe 25
}

test("exception in tick does not kill scheduler") {
val callCount = AtomicInteger(0)
val latch = CountDownLatch(2)
val processor = stubProcessor { _ ->
val count = callCount.incrementAndGet()
latch.countDown()
if (count == 1) throw RuntimeException("db connection lost")
}

val scheduler = OutboxScheduler(
outboxProcessor = processor,
config = OutboxSchedulerConfig(intervalMs = 50),
)

scheduler.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
scheduler.stop()

callCount.get() shouldBe 2
}

test("double start is ignored") {
val callCount = AtomicInteger(0)
val latch = CountDownLatch(1)
val processor = stubProcessor { _ ->
callCount.incrementAndGet()
latch.countDown()
}

val scheduler = OutboxScheduler(
outboxProcessor = processor,
config = OutboxSchedulerConfig(intervalMs = 50),
)

scheduler.start()
scheduler.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
scheduler.stop()

scheduler.isRunning() shouldBe false
}

test("isRunning transitions") {
val processor = stubProcessor { _ -> }
val scheduler = OutboxScheduler(
outboxProcessor = processor,
config = OutboxSchedulerConfig(intervalMs = 60_000),
)

scheduler.isRunning() shouldBe false
scheduler.start()
scheduler.isRunning() shouldBe true
scheduler.stop()
scheduler.isRunning() shouldBe false
}

test("start after stop throws") {
val processor = stubProcessor { _ -> }
val scheduler = OutboxScheduler(
outboxProcessor = processor,
config = OutboxSchedulerConfig(intervalMs = 60_000),
)

scheduler.start()
scheduler.stop()

shouldThrow<IllegalStateException> {
scheduler.start()
}.message shouldBe "OutboxScheduler cannot be restarted after stop()"
}

test("transactionRunner wraps tick when provided") {
val txInvoked = AtomicBoolean(false)
val latch = CountDownLatch(1)
val processor = stubProcessor { _ -> latch.countDown() }
val txRunner = object : TransactionRunner {
override fun <T> runInTransaction(block: () -> T): T {
txInvoked.set(true)
return block()
}
}

val scheduler = OutboxScheduler(
outboxProcessor = processor,
transactionRunner = txRunner,
config = OutboxSchedulerConfig(intervalMs = 50),
)

scheduler.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
scheduler.stop()

txInvoked.get() shouldBe true
}

test("tick runs without transactionRunner") {
val latch = CountDownLatch(1)
val processor = stubProcessor { _ -> latch.countDown() }

val scheduler = OutboxScheduler(
outboxProcessor = processor,
transactionRunner = null,
config = OutboxSchedulerConfig(intervalMs = 50),
)

scheduler.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
scheduler.stop()
}
})

private fun stubProcessor(onProcessNext: (Int) -> Unit): OutboxProcessor {
val store = object : OutboxStore {
override fun persist(entry: OutboxEntry) = entry
override fun claimPending(limit: Int): List<OutboxEntry> {
onProcessNext(limit)
return emptyList()
}
override fun updateAfterProcessing(entry: OutboxEntry) = entry
override fun removeDeliveredBefore(time: java.time.Instant, limit: Int) = 0
override fun findOldestCreatedAt(statuses: Set<OutboxStatus>) = emptyMap<OutboxStatus, java.time.Instant>()
override fun countByStatuses() = emptyMap<OutboxStatus, Long>()
}
val entryProcessor = OutboxEntryProcessor(
deliverer = object : MessageDeliverer {
override val type = "stub"
override fun deliver(entry: OutboxEntry) = DeliveryResult.Success
},
retryPolicy = RetryPolicy(maxRetries = 3),
clock = java.time.Clock.systemUTC(),
)
return OutboxProcessor(store = store, entryProcessor = entryProcessor)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
--liquibase formatted sql
--changeset outbox:003

CREATE INDEX idx_outbox_status_created_at ON outbox (status, created_at);
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-latest.xsd">
<include file="001__create_outbox_table.sql" relativeToChangelogFile="true"/>
<include file="002__add_purger_index.sql" relativeToChangelogFile="true"/>
<include file="003__add_processor_index.sql" relativeToChangelogFile="true"/>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
--liquibase formatted sql
--changeset outbox:004

CREATE INDEX IF NOT EXISTS idx_outbox_status_created_at ON outbox (status, created_at);
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
<include file="001__create_outbox_table.sql" relativeToChangelogFile="true"/>
<include file="002__add_delivery_type_column.sql" relativeToChangelogFile="true"/>
<include file="003__add_purger_index.sql" relativeToChangelogFile="true"/>
<include file="004__add_processor_index.sql" relativeToChangelogFile="true"/>
</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.softwaremill.okapi.core.MessageDeliverer
import com.softwaremill.okapi.core.OutboxEntryProcessor
import com.softwaremill.okapi.core.OutboxProcessor
import com.softwaremill.okapi.core.OutboxPublisher
import com.softwaremill.okapi.core.OutboxSchedulerConfig
import com.softwaremill.okapi.core.OutboxStore
import com.softwaremill.okapi.core.RetryPolicy
import com.softwaremill.okapi.mysql.MysqlOutboxStore
Expand Down Expand Up @@ -42,7 +43,7 @@ import javax.sql.DataSource
* - [PlatformTransactionManager] — if absent, each store call runs in its own transaction
*/
@AutoConfiguration
@EnableConfigurationProperties(OutboxPurgerProperties::class)
@EnableConfigurationProperties(OutboxPurgerProperties::class, OutboxProcessorProperties::class)
class OutboxAutoConfiguration {
@Bean
@ConditionalOnMissingBean
Expand Down Expand Up @@ -82,13 +83,19 @@ class OutboxAutoConfiguration {

@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "okapi.processor", name = ["enabled"], havingValue = "true", matchIfMissing = true)
fun outboxProcessorScheduler(
props: OutboxProcessorProperties,
outboxProcessor: OutboxProcessor,
transactionManager: ObjectProvider<PlatformTransactionManager>,
): OutboxProcessorScheduler {
return OutboxProcessorScheduler(
outboxProcessor = outboxProcessor,
transactionTemplate = transactionManager.getIfAvailable()?.let { TransactionTemplate(it) },
config = OutboxSchedulerConfig(
intervalMs = props.intervalMs,
batchSize = props.batchSize,
),
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.softwaremill.okapi.springboot

import jakarta.validation.constraints.Min
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.validation.annotation.Validated

@ConfigurationProperties(prefix = "okapi.processor")
@Validated
data class OutboxProcessorProperties(
@field:Min(1) val intervalMs: Long = 1_000,
@field:Min(1) val batchSize: Int = 10,
)
Loading