From 5615ecf6f4999bf96d18085970f9b81669a98a77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 25 Mar 2026 14:22:17 +0100 Subject: [PATCH 01/14] feat(core): add SLF4J API dependency for OutboxPurger logging --- gradle/libs.versions.toml | 2 ++ okapi-core/build.gradle.kts | 1 + 2 files changed, 3 insertions(+) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f222d06..4b52394 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -12,6 +12,7 @@ kafkaClients = "4.2.0" spring = "7.0.6" springBoot = "4.0.4" wiremock = "3.13.2" +slf4j = "2.0.17" [libraries] kotlinGradlePlugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" } @@ -34,6 +35,7 @@ springContext = { module = "org.springframework:spring-context", version.ref = " springTx = { module = "org.springframework:spring-tx", version.ref = "spring" } springBootAutoconfigure = { module = "org.springframework.boot:spring-boot-autoconfigure", version.ref = "springBoot" } wiremock = { module = "org.wiremock:wiremock", version.ref = "wiremock" } +slf4jApi = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } [plugins] ktlint = { id = "org.jlleitschuh.gradle.ktlint", version.ref = "ktlint" } diff --git a/okapi-core/build.gradle.kts b/okapi-core/build.gradle.kts index 0bdb31f..373cee9 100644 --- a/okapi-core/build.gradle.kts +++ b/okapi-core/build.gradle.kts @@ -3,6 +3,7 @@ plugins { } dependencies { + implementation(libs.slf4jApi) testImplementation(libs.kotestRunnerJunit5) testImplementation(libs.kotestAssertionsCore) } From 70e75a5b5c68f66b8c927d7e4c1f32c7254a5a02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 25 Mar 2026 14:24:36 +0100 Subject: [PATCH 02/14] feat(core): change OutboxStore.removeDeliveredBefore to accept limit and return count Breaking change: removeDeliveredBefore(Instant) -> removeDeliveredBefore(Instant, Int): Int Store implementations temporarily ignore limit parameter. --- .../main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt | 2 +- .../main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt | 4 ++-- .../kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt | 2 +- .../kotlin/com/softwaremill/okapi/core/OutboxPublisherTest.kt | 2 +- .../kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt | 3 ++- .../kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt | 4 ++-- .../com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt | 2 +- .../com/softwaremill/okapi/postgres/PostgresOutboxStore.kt | 4 ++-- .../okapi/springboot/SpringOutboxPublisherTest.kt | 2 +- 9 files changed, 13 insertions(+), 12 deletions(-) diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt index 803f94a..9df346d 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt @@ -40,6 +40,6 @@ class OutboxPurger( } private fun tick() { - outboxStore.removeDeliveredBefore(clock.instant().minus(retentionDuration)) + outboxStore.removeDeliveredBefore(clock.instant().minus(retentionDuration), Int.MAX_VALUE) } } diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt index 13631ca..24098ad 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt @@ -12,8 +12,8 @@ interface OutboxStore { /** Updates an entry after a delivery attempt (status change, retries, lastError). */ fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry - /** Removes DELIVERED entries older than [time]. */ - fun removeDeliveredBefore(time: Instant) + /** Removes up to [limit] DELIVERED entries with lastAttempt before [time]. Returns the number of entries deleted. */ + fun removeDeliveredBefore(time: Instant, limit: Int): Int /** Returns the oldest createdAt per status (useful for lag metrics). */ fun findOldestCreatedAt(statuses: Set): Map diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt index a104acf..9d5bb1b 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt @@ -36,7 +36,7 @@ class OutboxProcessorTest : override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = entry.also { processedEntries += it } - override fun removeDeliveredBefore(time: Instant) = Unit + override fun removeDeliveredBefore(time: Instant, limit: Int): Int = 0 override fun findOldestCreatedAt(statuses: Set) = emptyMap() diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPublisherTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPublisherTest.kt index 7428dad..9f324dc 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPublisherTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPublisherTest.kt @@ -27,7 +27,7 @@ class OutboxPublisherTest : override fun updateAfterProcessing(entry: OutboxEntry) = entry - override fun removeDeliveredBefore(time: Instant) = Unit + override fun removeDeliveredBefore(time: Instant, limit: Int): Int = 0 override fun findOldestCreatedAt(statuses: Set) = emptyMap() diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt index 02d5e90..59d8d6c 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt @@ -17,8 +17,9 @@ class OutboxPurgerTest : FunSpec({ override fun persist(entry: OutboxEntry) = entry override fun claimPending(limit: Int) = emptyList() override fun updateAfterProcessing(entry: OutboxEntry) = entry - override fun removeDeliveredBefore(time: Instant) { + override fun removeDeliveredBefore(time: Instant, limit: Int): Int { capturedCutoff = time + return 0 } override fun findOldestCreatedAt(statuses: Set) = emptyMap() override fun countByStatuses() = emptyMap() diff --git a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt index fb9940b..5743426 100644 --- a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt +++ b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt @@ -56,8 +56,8 @@ class MysqlOutboxStore( override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = persist(entry) - override fun removeDeliveredBefore(time: Instant) { - OutboxTable.deleteWhere { + override fun removeDeliveredBefore(time: Instant, limit: Int): Int { + return OutboxTable.deleteWhere { (OutboxTable.status eq OutboxStatus.DELIVERED.name) and (OutboxTable.lastAttempt less time) } } diff --git a/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt b/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt index f5c9882..9d73964 100644 --- a/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt +++ b/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt @@ -74,7 +74,7 @@ class MysqlOutboxStoreTest : BehaviorSpec({ lastAttempt = Instant.parse("2020-01-01T00:00:00Z"), ) transaction(db) { store.persist(delivered) } - transaction(db) { store.removeDeliveredBefore(Instant.parse("2025-01-01T00:00:00Z")) } + transaction(db) { store.removeDeliveredBefore(Instant.parse("2025-01-01T00:00:00Z"), Int.MAX_VALUE) } then("old delivered entries are removed") { val counts = transaction(db) { store.countByStatuses() } diff --git a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt index f7ca77f..98f5b31 100644 --- a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt +++ b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt @@ -56,8 +56,8 @@ class PostgresOutboxStore( override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = persist(entry) - override fun removeDeliveredBefore(time: Instant) { - OutboxTable.deleteWhere { + override fun removeDeliveredBefore(time: Instant, limit: Int): Int { + return OutboxTable.deleteWhere { (OutboxTable.status eq OutboxStatus.DELIVERED.name) and (OutboxTable.lastAttempt less time) } } diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/SpringOutboxPublisherTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/SpringOutboxPublisherTest.kt index 77555e8..b482d3e 100644 --- a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/SpringOutboxPublisherTest.kt +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/SpringOutboxPublisherTest.kt @@ -36,7 +36,7 @@ class SpringOutboxPublisherTest : override fun updateAfterProcessing(entry: OutboxEntry) = entry - override fun removeDeliveredBefore(time: Instant) = Unit + override fun removeDeliveredBefore(time: Instant, limit: Int): Int = 0 override fun findOldestCreatedAt(statuses: Set) = emptyMap() From 9df3848caadaf667aaac7ce41679512a6e3b4582 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 25 Mar 2026 14:28:25 +0100 Subject: [PATCH 03/14] feat(postgres): implement batched delete with FOR UPDATE SKIP LOCKED --- .../okapi/mysql/MysqlOutboxStore.kt | 24 +++++++++++++----- .../okapi/mysql/MysqlOutboxStoreTest.kt | 25 +++++++++++++++++++ .../okapi/postgres/PostgresOutboxStore.kt | 21 ++++++++++++---- 3 files changed, 59 insertions(+), 11 deletions(-) diff --git a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt index 5743426..210aa44 100644 --- a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt +++ b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt @@ -5,13 +5,9 @@ import com.softwaremill.okapi.core.OutboxId import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore import org.jetbrains.exposed.v1.core.alias -import org.jetbrains.exposed.v1.core.and import org.jetbrains.exposed.v1.core.count -import org.jetbrains.exposed.v1.core.eq import org.jetbrains.exposed.v1.core.inList -import org.jetbrains.exposed.v1.core.less import org.jetbrains.exposed.v1.core.min -import org.jetbrains.exposed.v1.jdbc.deleteWhere import org.jetbrains.exposed.v1.jdbc.select import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager import org.jetbrains.exposed.v1.jdbc.upsert @@ -57,8 +53,24 @@ class MysqlOutboxStore( override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = persist(entry) override fun removeDeliveredBefore(time: Instant, limit: Int): Int { - return OutboxTable.deleteWhere { - (OutboxTable.status eq OutboxStatus.DELIVERED.name) and (OutboxTable.lastAttempt less time) + val sql = """ + DELETE FROM ${OutboxTable.tableName} WHERE ${OutboxTable.id.name} IN ( + SELECT ${OutboxTable.id.name} FROM ( + SELECT ${OutboxTable.id.name} FROM ${OutboxTable.tableName} + WHERE ${OutboxTable.status.name} = '${OutboxStatus.DELIVERED}' + AND ${OutboxTable.lastAttempt.name} < ? + ORDER BY ${OutboxTable.id.name} + LIMIT ? + FOR UPDATE SKIP LOCKED + ) AS batch + ) + """.trimIndent() + + val conn = TransactionManager.current().connection.connection as java.sql.Connection + return conn.prepareStatement(sql).use { stmt -> + stmt.setTimestamp(1, java.sql.Timestamp.from(time)) + stmt.setInt(2, limit) + stmt.executeUpdate() } } diff --git a/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt b/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt index 9d73964..cbaa5c3 100644 --- a/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt +++ b/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt @@ -83,6 +83,31 @@ class MysqlOutboxStoreTest : BehaviorSpec({ } } + given("removeDeliveredBefore with limit") { + `when`("limit is smaller than matching entries") { + transaction(db) { exec("DELETE FROM outbox") } + repeat(5) { + val entry = newEntry().copy( + status = OutboxStatus.DELIVERED, + lastAttempt = Instant.parse("2020-01-01T00:00:00Z"), + ) + transaction(db) { store.persist(entry) } + } + + val deleted = transaction(db) { + store.removeDeliveredBefore(Instant.parse("2025-01-01T00:00:00Z"), 3) + } + + then("only deletes up to limit") { + deleted shouldBe 3 + } + then("remaining entries still exist") { + val counts = transaction(db) { store.countByStatuses() } + counts[OutboxStatus.DELIVERED] shouldBe 2L + } + } + } + given("countByStatuses") { `when`("entries exist with different statuses") { transaction(db) { diff --git a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt index 98f5b31..3d9e62b 100644 --- a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt +++ b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt @@ -5,13 +5,10 @@ import com.softwaremill.okapi.core.OutboxId import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore import org.jetbrains.exposed.v1.core.alias -import org.jetbrains.exposed.v1.core.and import org.jetbrains.exposed.v1.core.count import org.jetbrains.exposed.v1.core.eq import org.jetbrains.exposed.v1.core.inList -import org.jetbrains.exposed.v1.core.less import org.jetbrains.exposed.v1.core.min -import org.jetbrains.exposed.v1.jdbc.deleteWhere import org.jetbrains.exposed.v1.jdbc.select import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager import org.jetbrains.exposed.v1.jdbc.upsert @@ -57,8 +54,22 @@ class PostgresOutboxStore( override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = persist(entry) override fun removeDeliveredBefore(time: Instant, limit: Int): Int { - return OutboxTable.deleteWhere { - (OutboxTable.status eq OutboxStatus.DELIVERED.name) and (OutboxTable.lastAttempt less time) + val sql = """ + DELETE FROM ${OutboxTable.tableName} WHERE ${OutboxTable.id.name} IN ( + SELECT ${OutboxTable.id.name} FROM ${OutboxTable.tableName} + WHERE ${OutboxTable.status.name} = '${OutboxStatus.DELIVERED}' + AND ${OutboxTable.lastAttempt.name} < ? + ORDER BY ${OutboxTable.id.name} + LIMIT ? + FOR UPDATE SKIP LOCKED + ) + """.trimIndent() + + val conn = TransactionManager.current().connection.connection as java.sql.Connection + return conn.prepareStatement(sql).use { stmt -> + stmt.setTimestamp(1, java.sql.Timestamp.from(time)) + stmt.setInt(2, limit) + stmt.executeUpdate() } } From 564af2043b3605e58b453f236b732960cd0a4db3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 25 Mar 2026 14:30:31 +0100 Subject: [PATCH 04/14] feat(db): add index on (status, last_attempt) for efficient purge queries --- .../com/softwaremill/okapi/db/mysql/002__add_purger_index.sql | 4 ++++ .../resources/com/softwaremill/okapi/db/mysql/changelog.xml | 1 + .../com/softwaremill/okapi/db/003__add_purger_index.sql | 4 ++++ .../main/resources/com/softwaremill/okapi/db/changelog.xml | 1 + 4 files changed, 10 insertions(+) create mode 100644 okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/002__add_purger_index.sql create mode 100644 okapi-postgres/src/main/resources/com/softwaremill/okapi/db/003__add_purger_index.sql diff --git a/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/002__add_purger_index.sql b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/002__add_purger_index.sql new file mode 100644 index 0000000..b299cb0 --- /dev/null +++ b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/002__add_purger_index.sql @@ -0,0 +1,4 @@ +--liquibase formatted sql +--changeset outbox:002 + +CREATE INDEX idx_outbox_status_last_attempt ON outbox(status, last_attempt); diff --git a/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml index d249d7f..fe5d7b0 100644 --- a/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml +++ b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml @@ -4,4 +4,5 @@ xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-latest.xsd"> + diff --git a/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/003__add_purger_index.sql b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/003__add_purger_index.sql new file mode 100644 index 0000000..ef3e169 --- /dev/null +++ b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/003__add_purger_index.sql @@ -0,0 +1,4 @@ +--liquibase formatted sql +--changeset outbox:003 + +CREATE INDEX idx_outbox_status_last_attempt ON outbox(status, last_attempt); diff --git a/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/changelog.xml b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/changelog.xml index a9e4e66..d0497e3 100644 --- a/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/changelog.xml +++ b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/changelog.xml @@ -5,4 +5,5 @@ http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-latest.xsd"> + From 24865e736e890896b29c7a7cc56051fa87109aa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 25 Mar 2026 14:32:57 +0100 Subject: [PATCH 05/14] feat(core): rewrite OutboxPurger with batched delete, error handling, logging, validation --- gradle/libs.versions.toml | 1 + okapi-core/build.gradle.kts | 1 + .../softwaremill/okapi/core/OutboxPurger.kt | 52 ++++++- .../okapi/core/OutboxPurgerTest.kt | 139 ++++++++++++++++-- 4 files changed, 174 insertions(+), 19 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 4b52394..909d3e7 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -36,6 +36,7 @@ springTx = { module = "org.springframework:spring-tx", version.ref = "spring" } springBootAutoconfigure = { module = "org.springframework.boot:spring-boot-autoconfigure", version.ref = "springBoot" } wiremock = { module = "org.wiremock:wiremock", version.ref = "wiremock" } slf4jApi = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } +slf4jSimple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" } [plugins] ktlint = { id = "org.jlleitschuh.gradle.ktlint", version.ref = "ktlint" } diff --git a/okapi-core/build.gradle.kts b/okapi-core/build.gradle.kts index 373cee9..34f5112 100644 --- a/okapi-core/build.gradle.kts +++ b/okapi-core/build.gradle.kts @@ -6,4 +6,5 @@ dependencies { implementation(libs.slf4jApi) testImplementation(libs.kotestRunnerJunit5) testImplementation(libs.kotestAssertionsCore) + testRuntimeOnly(libs.slf4jSimple) } diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt index 9df346d..7805d5e 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt @@ -1,45 +1,83 @@ package com.softwaremill.okapi.core +import org.slf4j.LoggerFactory import java.time.Clock import java.time.Duration import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean /** * Periodically removes DELIVERED outbox entries older than [retentionDuration]. * * Runs on a single daemon thread with explicit [start]/[stop] lifecycle. - * Delegates to [OutboxStore.removeDeliveredBefore] — works with any storage adapter. + * [start] and [stop] are single-use -- the internal executor cannot be restarted after shutdown. + * [AtomicBoolean] guards against accidental double-start, not restart. + * + * Delegates to [OutboxStore.removeDeliveredBefore] -- works with any storage adapter. */ class OutboxPurger( private val outboxStore: OutboxStore, private val retentionDuration: Duration = Duration.ofDays(7), private val intervalMs: Long = 3_600_000L, + private val batchSize: Int = 100, private val clock: Clock = Clock.systemUTC(), ) { + init { + require(retentionDuration > Duration.ZERO) { "retentionDuration must be positive, got: $retentionDuration" } + require(intervalMs > 0) { "intervalMs must be positive, got: $intervalMs" } + require(batchSize > 0) { "batchSize must be positive, got: $batchSize" } + } + + private val running = AtomicBoolean(false) + private val scheduler: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor { r -> Thread(r, "outbox-purger").apply { isDaemon = true } } fun start() { - scheduler.scheduleWithFixedDelay( - ::tick, - intervalMs, - intervalMs, - TimeUnit.MILLISECONDS, + if (!running.compareAndSet(false, true)) return + logger.info( + "Outbox purger started [retention={}, interval={}ms, batchSize={}]", + retentionDuration, intervalMs, batchSize, ) + scheduler.scheduleWithFixedDelay(::tick, intervalMs, intervalMs, TimeUnit.MILLISECONDS) } fun stop() { + if (!running.compareAndSet(true, false)) return scheduler.shutdown() if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { scheduler.shutdownNow() } + logger.info("Outbox purger stopped") } + fun isRunning(): Boolean = running.get() + private fun tick() { - outboxStore.removeDeliveredBefore(clock.instant().minus(retentionDuration), Int.MAX_VALUE) + try { + val cutoff = clock.instant().minus(retentionDuration) + var totalDeleted = 0 + var batches = 0 + do { + val deleted = outboxStore.removeDeliveredBefore(cutoff, batchSize) + totalDeleted += deleted + batches++ + } while (deleted == batchSize && batches < MAX_BATCHES_PER_TICK) + + if (totalDeleted > 0) { + logger.debug("Purged {} delivered entries in {} batches", totalDeleted, batches) + } + } catch (e: Exception) { + logger.error("Outbox purge failed, will retry at next scheduled interval", e) + } + } + + companion object { + private val logger = LoggerFactory.getLogger(OutboxPurger::class.java) + private const val MAX_BATCHES_PER_TICK = 10 } } diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt index 59d8d6c..ba66cb5 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt @@ -1,41 +1,156 @@ package com.softwaremill.okapi.core +import io.kotest.assertions.throwables.shouldThrow import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.shouldBe import java.time.Clock import java.time.Duration import java.time.Instant import java.time.ZoneOffset +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger private val fixedNow = Instant.parse("2025-03-20T12:00:00Z") private val fixedClock = Clock.fixed(fixedNow, ZoneOffset.UTC) class OutboxPurgerTest : FunSpec({ + test("tick removes entries older than retention duration") { var capturedCutoff: Instant? = null - val store = object : OutboxStore { - override fun persist(entry: OutboxEntry) = entry - override fun claimPending(limit: Int) = emptyList() - override fun updateAfterProcessing(entry: OutboxEntry) = entry - override fun removeDeliveredBefore(time: Instant, limit: Int): Int { - capturedCutoff = time - return 0 - } - override fun findOldestCreatedAt(statuses: Set) = emptyMap() - override fun countByStatuses() = emptyMap() - } + val latch = CountDownLatch(1) + val store = stubStore(onRemove = { time, _ -> + capturedCutoff = time + latch.countDown() + 0 + }) val purger = OutboxPurger( outboxStore = store, retentionDuration = Duration.ofDays(7), intervalMs = 50, + batchSize = 100, clock = fixedClock, ) purger.start() - Thread.sleep(150) + latch.await(2, TimeUnit.SECONDS) purger.stop() capturedCutoff shouldBe fixedNow.minus(Duration.ofDays(7)) } + + test("batch loop stops when deleted < batchSize") { + val callCount = AtomicInteger(0) + val latch = CountDownLatch(1) + val store = stubStore(onRemove = { _, _ -> + val count = callCount.incrementAndGet() + if (count == 1) { + 100 // first batch: full + } else { + latch.countDown() + 42 // second batch: partial, loop stops + } + }) + + val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock) + purger.start() + latch.await(2, TimeUnit.SECONDS) + purger.stop() + + callCount.get() shouldBe 2 + } + + test("batch loop respects MAX_BATCHES_PER_TICK") { + val callCount = AtomicInteger(0) + val latch = CountDownLatch(1) + val store = stubStore(onRemove = { _, _ -> + val count = callCount.incrementAndGet() + if (count >= 10) latch.countDown() + 100 // always full, would loop forever without guard + }) + + val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock) + purger.start() + latch.await(2, TimeUnit.SECONDS) + purger.stop() + + callCount.get() shouldBe 10 + } + + test("exception in tick does not kill scheduler") { + val callCount = AtomicInteger(0) + val latch = CountDownLatch(2) + val store = stubStore(onRemove = { _, _ -> + val count = callCount.incrementAndGet() + latch.countDown() + if (count == 1) throw RuntimeException("db connection lost") + 0 + }) + + val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock) + purger.start() + latch.await(2, TimeUnit.SECONDS) + purger.stop() + + callCount.get() shouldBe 2 + } + + test("double start is ignored") { + val callCount = AtomicInteger(0) + val latch = CountDownLatch(1) + val store = stubStore(onRemove = { _, _ -> + callCount.incrementAndGet() + latch.countDown() + 0 + }) + + val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock) + purger.start() + purger.start() // second start should be ignored + latch.await(2, TimeUnit.SECONDS) + purger.stop() + + purger.isRunning() shouldBe false + } + + test("isRunning transitions") { + val store = stubStore(onRemove = { _, _ -> 0 }) + val purger = OutboxPurger(store, intervalMs = 60_000, batchSize = 100, clock = fixedClock) + + purger.isRunning() shouldBe false + purger.start() + purger.isRunning() shouldBe true + purger.stop() + purger.isRunning() shouldBe false + } + + test("constructor rejects invalid batchSize") { + shouldThrow { + OutboxPurger(stubStore(), batchSize = 0, clock = fixedClock) + } + } + + test("constructor rejects zero retentionDuration") { + shouldThrow { + OutboxPurger(stubStore(), retentionDuration = Duration.ZERO, clock = fixedClock) + } + } + + test("constructor rejects negative intervalMs") { + shouldThrow { + OutboxPurger(stubStore(), intervalMs = -1, clock = fixedClock) + } + } }) + +private fun stubStore( + onRemove: (Instant, Int) -> Int = { _, _ -> 0 }, +) = object : OutboxStore { + override fun persist(entry: OutboxEntry) = entry + override fun claimPending(limit: Int) = emptyList() + override fun updateAfterProcessing(entry: OutboxEntry) = entry + override fun removeDeliveredBefore(time: Instant, limit: Int): Int = onRemove(time, limit) + override fun findOldestCreatedAt(statuses: Set) = emptyMap() + override fun countByStatuses() = emptyMap() +} From 58f4b5141895694e07602f5831cd551590e84d0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 25 Mar 2026 14:35:01 +0100 Subject: [PATCH 06/14] feat(spring): add OkapiPurgerProperties with @ConfigurationProperties binding --- gradle/libs.versions.toml | 2 ++ okapi-spring-boot/build.gradle.kts | 5 +++ .../okapi/springboot/OkapiPurgerProperties.kt | 14 ++++++++ .../spring-configuration-metadata.json | 35 +++++++++++++++++++ 4 files changed, 56 insertions(+) create mode 100644 okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiPurgerProperties.kt create mode 100644 okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 909d3e7..79fad14 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -34,6 +34,8 @@ kafkaClients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka springContext = { module = "org.springframework:spring-context", version.ref = "spring" } springTx = { module = "org.springframework:spring-tx", version.ref = "spring" } springBootAutoconfigure = { module = "org.springframework.boot:spring-boot-autoconfigure", version.ref = "springBoot" } +springBootStarterValidation = { module = "org.springframework.boot:spring-boot-starter-validation", version.ref = "springBoot" } +springBootTest = { module = "org.springframework.boot:spring-boot-test", version.ref = "springBoot" } wiremock = { module = "org.wiremock:wiremock", version.ref = "wiremock" } slf4jApi = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } slf4jSimple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" } diff --git a/okapi-spring-boot/build.gradle.kts b/okapi-spring-boot/build.gradle.kts index 596a9ef..6ea028a 100644 --- a/okapi-spring-boot/build.gradle.kts +++ b/okapi-spring-boot/build.gradle.kts @@ -8,6 +8,11 @@ dependencies { compileOnly(libs.springContext) compileOnly(libs.springTx) compileOnly(libs.springBootAutoconfigure) + + // Validation annotations for @ConfigurationProperties classes + compileOnly(libs.springBootStarterValidation) + + // Optional store autoconfiguration — compileOnly + @ConditionalOnClass guards runtime absence compileOnly(project(":okapi-postgres")) compileOnly(project(":okapi-mysql")) compileOnly(libs.liquibaseCore) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiPurgerProperties.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiPurgerProperties.kt new file mode 100644 index 0000000..4ccae47 --- /dev/null +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiPurgerProperties.kt @@ -0,0 +1,14 @@ +package com.softwaremill.okapi.springboot + +import jakarta.validation.constraints.Min +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.validation.annotation.Validated + +@ConfigurationProperties(prefix = "okapi.purger") +@Validated +data class OkapiPurgerProperties( + val enabled: Boolean = true, + @field:Min(1) val retentionDays: Long = 7, + @field:Min(1) val intervalMinutes: Long = 60, + @field:Min(1) val batchSize: Int = 100, +) diff --git a/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json b/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json new file mode 100644 index 0000000..6ba41df --- /dev/null +++ b/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json @@ -0,0 +1,35 @@ +{ + "groups": [ + { + "name": "okapi.purger", + "type": "com.softwaremill.okapi.springboot.OkapiPurgerProperties", + "description": "Outbox purger configuration." + } + ], + "properties": [ + { + "name": "okapi.purger.enabled", + "type": "java.lang.Boolean", + "defaultValue": true, + "description": "Whether the outbox purger is enabled." + }, + { + "name": "okapi.purger.retention-days", + "type": "java.lang.Long", + "defaultValue": 7, + "description": "How many days to keep delivered entries before purging." + }, + { + "name": "okapi.purger.interval-minutes", + "type": "java.lang.Long", + "defaultValue": 60, + "description": "How often the purger runs, in minutes." + }, + { + "name": "okapi.purger.batch-size", + "type": "java.lang.Integer", + "defaultValue": 100, + "description": "Max entries deleted per batch in a single purge cycle." + } + ] +} From 62150a55d93b02ef965c84f9709f5b1edcfb350d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 25 Mar 2026 14:36:20 +0100 Subject: [PATCH 07/14] feat(spring): migrate OutboxPurgerScheduler to SmartLifecycle, bind from OkapiPurgerProperties --- .../springboot/OutboxAutoConfiguration.kt | 11 +++++++- .../okapi/springboot/OutboxPurgerScheduler.kt | 28 ++++++++++++++----- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt index 013e0af..3fb1c4a 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt @@ -16,6 +16,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean import org.springframework.boot.autoconfigure.condition.ConditionalOnClass import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.transaction.PlatformTransactionManager @@ -41,6 +42,7 @@ import javax.sql.DataSource * - [PlatformTransactionManager] — if absent, each store call runs in its own transaction */ @AutoConfiguration +@EnableConfigurationProperties(OkapiPurgerProperties::class) class OutboxAutoConfiguration { @Bean @ConditionalOnMissingBean @@ -93,9 +95,16 @@ class OutboxAutoConfiguration { @Bean @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "okapi.purger", name = ["enabled"], havingValue = "true", matchIfMissing = true) - fun outboxPurgerScheduler(outboxStore: OutboxStore, clock: ObjectProvider): OutboxPurgerScheduler { + fun outboxPurgerScheduler( + props: OkapiPurgerProperties, + outboxStore: OutboxStore, + clock: ObjectProvider, + ): OutboxPurgerScheduler { return OutboxPurgerScheduler( outboxStore = outboxStore, + retentionDays = props.retentionDays, + intervalMinutes = props.intervalMinutes, + batchSize = props.batchSize, clock = clock.getIfAvailable { Clock.systemUTC() }, ) } diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt index 7858a59..a2ac23b 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt @@ -2,37 +2,51 @@ package com.softwaremill.okapi.springboot import com.softwaremill.okapi.core.OutboxPurger import com.softwaremill.okapi.core.OutboxStore -import org.springframework.beans.factory.DisposableBean -import org.springframework.beans.factory.SmartInitializingSingleton +import org.springframework.context.SmartLifecycle import java.time.Clock import java.time.Duration /** * Spring lifecycle wrapper for [OutboxPurger]. * - * Starts purging after all beans are initialized, stops on context close. + * Uses [SmartLifecycle] for phase-ordered startup/shutdown and async stop support. * Enabled by default; disable with `okapi.purger.enabled=false`. */ class OutboxPurgerScheduler( outboxStore: OutboxStore, retentionDays: Long = 7, intervalMinutes: Long = 60, + batchSize: Int = 100, clock: Clock = Clock.systemUTC(), -) : SmartInitializingSingleton, - DisposableBean { +) : SmartLifecycle { private val purger = OutboxPurger( outboxStore = outboxStore, retentionDuration = Duration.ofDays(retentionDays), intervalMs = intervalMinutes * 60 * 1_000, + batchSize = batchSize, clock = clock, ) - override fun afterSingletonsInstantiated() { + override fun start() { purger.start() } - override fun destroy() { + override fun stop() { purger.stop() } + + override fun stop(callback: Runnable) { + purger.stop() + callback.run() + } + + override fun isRunning(): Boolean = purger.isRunning() + + override fun getPhase(): Int = PURGER_PHASE + + companion object { + /** Start late (after app beans), stop early (before app beans). */ + const val PURGER_PHASE = Integer.MAX_VALUE - 1024 + } } From 66b46967667864b18933b9517c508ab4b830b187 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 25 Mar 2026 14:42:01 +0100 Subject: [PATCH 08/14] test(spring): add OutboxPurgerAutoConfigurationTest for properties binding and conditional beans Also fix nested PostgresStoreConfiguration to use proxyBeanMethods=false (Kotlin classes are final; CGLIB proxying requires non-final classes) and add assertj-core to version catalog (required by spring-boot-test's ApplicationContextRunner). --- gradle/libs.versions.toml | 2 + okapi-spring-boot/build.gradle.kts | 2 + .../springboot/OutboxAutoConfiguration.kt | 5 ++ .../OutboxPurgerAutoConfigurationTest.kt | 88 +++++++++++++++++++ 4 files changed, 97 insertions(+) create mode 100644 okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerAutoConfigurationTest.kt diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 79fad14..b4b5e5d 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -13,6 +13,7 @@ spring = "7.0.6" springBoot = "4.0.4" wiremock = "3.13.2" slf4j = "2.0.17" +assertj = "3.27.3" [libraries] kotlinGradlePlugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" } @@ -36,6 +37,7 @@ springTx = { module = "org.springframework:spring-tx", version.ref = "spring" } springBootAutoconfigure = { module = "org.springframework.boot:spring-boot-autoconfigure", version.ref = "springBoot" } springBootStarterValidation = { module = "org.springframework.boot:spring-boot-starter-validation", version.ref = "springBoot" } springBootTest = { module = "org.springframework.boot:spring-boot-test", version.ref = "springBoot" } +assertjCore = { module = "org.assertj:assertj-core", version.ref = "assertj" } wiremock = { module = "org.wiremock:wiremock", version.ref = "wiremock" } slf4jApi = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } slf4jSimple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" } diff --git a/okapi-spring-boot/build.gradle.kts b/okapi-spring-boot/build.gradle.kts index 6ea028a..584286e 100644 --- a/okapi-spring-boot/build.gradle.kts +++ b/okapi-spring-boot/build.gradle.kts @@ -23,6 +23,8 @@ dependencies { testImplementation(libs.springTx) testImplementation(libs.exposedCore) testImplementation(libs.springBootAutoconfigure) + testImplementation(libs.springBootTest) + testImplementation(libs.assertjCore) testImplementation(project(":okapi-postgres")) testImplementation(project(":okapi-mysql")) testImplementation(project(":okapi-http")) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt index 3fb1c4a..5ca0890 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt @@ -109,6 +109,11 @@ class OutboxAutoConfiguration { ) } + /** + * Auto-configures [PostgresOutboxStore] and Liquibase schema migration + * when `outbox-postgres` is on the classpath. + * Skipped if the application provides its own [OutboxStore] bean. + */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(PostgresOutboxStore::class) class PostgresStoreConfiguration { diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerAutoConfigurationTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerAutoConfigurationTest.kt new file mode 100644 index 0000000..e4df661 --- /dev/null +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerAutoConfigurationTest.kt @@ -0,0 +1,88 @@ +package com.softwaremill.okapi.springboot + +import com.softwaremill.okapi.core.DeliveryResult +import com.softwaremill.okapi.core.MessageDeliverer +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.core.OutboxStore +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.nulls.shouldNotBeNull +import io.kotest.matchers.shouldBe +import org.springframework.boot.autoconfigure.AutoConfigurations +import org.springframework.boot.test.context.runner.ApplicationContextRunner +import java.time.Instant + +class OutboxPurgerAutoConfigurationTest : FunSpec({ + + val contextRunner = ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java)) + .withBean(OutboxStore::class.java, { stubStore() }) + .withBean(MessageDeliverer::class.java, { stubDeliverer() }) + + test("purger bean is created by default") { + contextRunner.run { ctx -> + ctx.getBean(OutboxPurgerScheduler::class.java).shouldNotBeNull() + } + } + + test("purger bean is not created when disabled") { + contextRunner + .withPropertyValues("okapi.purger.enabled=false") + .run { ctx -> + ctx.containsBean("outboxPurgerScheduler") shouldBe false + } + } + + test("properties are bound from application config") { + contextRunner + .withPropertyValues( + "okapi.purger.retention-days=14", + "okapi.purger.interval-minutes=30", + "okapi.purger.batch-size=200", + ) + .run { ctx -> + val props = ctx.getBean(OkapiPurgerProperties::class.java) + props.retentionDays shouldBe 14 + props.intervalMinutes shouldBe 30 + props.batchSize shouldBe 200 + } + } + + test("default properties when nothing is configured") { + contextRunner.run { ctx -> + val props = ctx.getBean(OkapiPurgerProperties::class.java) + props.retentionDays shouldBe 7 + props.intervalMinutes shouldBe 60 + props.batchSize shouldBe 100 + } + } + + test("SmartLifecycle is running after context start") { + contextRunner.run { ctx -> + val scheduler = ctx.getBean(OutboxPurgerScheduler::class.java) + scheduler.isRunning shouldBe true + } + } + + test("invalid retention-days triggers constructor validation") { + contextRunner + .withPropertyValues("okapi.purger.retention-days=0") + .run { ctx -> + ctx.startupFailure.shouldNotBeNull() + } + } +}) + +private fun stubStore() = object : OutboxStore { + override fun persist(entry: OutboxEntry) = entry + override fun claimPending(limit: Int) = emptyList() + override fun updateAfterProcessing(entry: OutboxEntry) = entry + override fun removeDeliveredBefore(time: Instant, limit: Int) = 0 + override fun findOldestCreatedAt(statuses: Set) = emptyMap() + override fun countByStatuses() = emptyMap() +} + +private fun stubDeliverer() = object : MessageDeliverer { + override val type = "stub" + override fun deliver(entry: OutboxEntry) = DeliveryResult.Success +} From 34404e2a989940047b5d1ff259b549c0af55a0e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 25 Mar 2026 14:44:13 +0100 Subject: [PATCH 09/14] style: apply ktlint formatting --- .../main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt | 4 +++- .../kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt | 4 +--- .../com/softwaremill/okapi/postgres/PostgresOutboxStore.kt | 1 - 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt index 7805d5e..00252bb 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt @@ -41,7 +41,9 @@ class OutboxPurger( if (!running.compareAndSet(false, true)) return logger.info( "Outbox purger started [retention={}, interval={}ms, batchSize={}]", - retentionDuration, intervalMs, batchSize, + retentionDuration, + intervalMs, + batchSize, ) scheduler.scheduleWithFixedDelay(::tick, intervalMs, intervalMs, TimeUnit.MILLISECONDS) } diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt index ba66cb5..56f4598 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt @@ -144,9 +144,7 @@ class OutboxPurgerTest : FunSpec({ } }) -private fun stubStore( - onRemove: (Instant, Int) -> Int = { _, _ -> 0 }, -) = object : OutboxStore { +private fun stubStore(onRemove: (Instant, Int) -> Int = { _, _ -> 0 }) = object : OutboxStore { override fun persist(entry: OutboxEntry) = entry override fun claimPending(limit: Int) = emptyList() override fun updateAfterProcessing(entry: OutboxEntry) = entry diff --git a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt index 3d9e62b..71db5a6 100644 --- a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt +++ b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt @@ -6,7 +6,6 @@ import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore import org.jetbrains.exposed.v1.core.alias import org.jetbrains.exposed.v1.core.count -import org.jetbrains.exposed.v1.core.eq import org.jetbrains.exposed.v1.core.inList import org.jetbrains.exposed.v1.core.min import org.jetbrains.exposed.v1.jdbc.select From 126d2fc1eb22b869dcffb29f2cb6bc349d67c546 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 25 Mar 2026 15:41:07 +0100 Subject: [PATCH 10/14] refactor(stores): use Exposed exec API instead of raw JDBC connection cast --- .../softwaremill/okapi/mysql/MysqlOutboxStore.kt | 15 +++++++++------ .../okapi/postgres/PostgresOutboxStore.kt | 15 +++++++++------ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt index 210aa44..6e61ccf 100644 --- a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt +++ b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt @@ -4,6 +4,7 @@ import com.softwaremill.okapi.core.OutboxEntry import com.softwaremill.okapi.core.OutboxId import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore +import org.jetbrains.exposed.v1.core.IntegerColumnType import org.jetbrains.exposed.v1.core.alias import org.jetbrains.exposed.v1.core.count import org.jetbrains.exposed.v1.core.inList @@ -66,12 +67,14 @@ class MysqlOutboxStore( ) """.trimIndent() - val conn = TransactionManager.current().connection.connection as java.sql.Connection - return conn.prepareStatement(sql).use { stmt -> - stmt.setTimestamp(1, java.sql.Timestamp.from(time)) - stmt.setInt(2, limit) - stmt.executeUpdate() - } + val statement = TransactionManager.current().connection.prepareStatement(sql, false) + statement.fillParameters( + listOf( + OutboxTable.lastAttempt.columnType to time, + IntegerColumnType() to limit, + ), + ) + return statement.executeUpdate() } override fun findOldestCreatedAt(statuses: Set): Map { diff --git a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt index 71db5a6..9dd3eba 100644 --- a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt +++ b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt @@ -4,6 +4,7 @@ import com.softwaremill.okapi.core.OutboxEntry import com.softwaremill.okapi.core.OutboxId import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore +import org.jetbrains.exposed.v1.core.IntegerColumnType import org.jetbrains.exposed.v1.core.alias import org.jetbrains.exposed.v1.core.count import org.jetbrains.exposed.v1.core.inList @@ -64,12 +65,14 @@ class PostgresOutboxStore( ) """.trimIndent() - val conn = TransactionManager.current().connection.connection as java.sql.Connection - return conn.prepareStatement(sql).use { stmt -> - stmt.setTimestamp(1, java.sql.Timestamp.from(time)) - stmt.setInt(2, limit) - stmt.executeUpdate() - } + val statement = TransactionManager.current().connection.prepareStatement(sql, false) + statement.fillParameters( + listOf( + OutboxTable.lastAttempt.columnType to time, + IntegerColumnType() to limit, + ), + ) + return statement.executeUpdate() } override fun findOldestCreatedAt(statuses: Set): Map { From dcbd7c141a4b7d8b879480510932f77a6122730e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 25 Mar 2026 16:08:03 +0100 Subject: [PATCH 11/14] refactor(spring): remove unused enabled field from OkapiPurgerProperties MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The enabled flag is handled by @ConditionalOnProperty on the bean factory. Having it also in the properties class was redundant — no code read it. --- .../com/softwaremill/okapi/springboot/OkapiPurgerProperties.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiPurgerProperties.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiPurgerProperties.kt index 4ccae47..a0e067f 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiPurgerProperties.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiPurgerProperties.kt @@ -7,7 +7,6 @@ import org.springframework.validation.annotation.Validated @ConfigurationProperties(prefix = "okapi.purger") @Validated data class OkapiPurgerProperties( - val enabled: Boolean = true, @field:Min(1) val retentionDays: Long = 7, @field:Min(1) val intervalMinutes: Long = 60, @field:Min(1) val batchSize: Int = 100, From 522c91d2cefb252b8ecc1cc01c9c1aa96e3b6905 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 11:45:53 +0100 Subject: [PATCH 12/14] =?UTF-8?q?fix:=20address=20PR=20review=20findings?= =?UTF-8?q?=20=E2=80=94=20restart=20guard,=20stop=20callback=20safety,=20K?= =?UTF-8?q?Doc,=20test=20coverage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - OutboxPurger.start(): check(!scheduler.isShutdown) prevents restart after stop - OutboxPurgerScheduler.stop(callback): try-finally guarantees callback invocation - OutboxStore.removeDeliveredBefore: verbose KDoc with @param/@return contract - OutboxAutoConfiguration.outboxStore(): restore concrete PostgresOutboxStore return type - OutboxPurgerTest: assert batchSize is forwarded as limit, test start-after-stop --- .../softwaremill/okapi/core/OutboxPurger.kt | 1 + .../com/softwaremill/okapi/core/OutboxStore.kt | 6 +++++- .../okapi/core/OutboxPurgerTest.kt | 18 ++++++++++++++++-- .../okapi/springboot/OutboxPurgerScheduler.kt | 7 +++++-- 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt index 00252bb..0e48f9f 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt @@ -38,6 +38,7 @@ class OutboxPurger( } fun start() { + check(!scheduler.isShutdown) { "OutboxPurger cannot be restarted after stop()" } if (!running.compareAndSet(false, true)) return logger.info( "Outbox purger started [retention={}, interval={}ms, batchSize={}]", diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt index 24098ad..b9252fb 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt @@ -12,7 +12,11 @@ interface OutboxStore { /** Updates an entry after a delivery attempt (status change, retries, lastError). */ fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry - /** Removes up to [limit] DELIVERED entries with lastAttempt before [time]. Returns the number of entries deleted. */ + /** + * Removes up to [limit] DELIVERED entries older than [time]. + * @param limit maximum number of entries to delete; must be positive + * @return the number of entries actually deleted, always in `[0, limit]` + */ fun removeDeliveredBefore(time: Instant, limit: Int): Int /** Returns the oldest createdAt per status (useful for lag metrics). */ diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt index 56f4598..43f6c17 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt @@ -16,11 +16,13 @@ private val fixedClock = Clock.fixed(fixedNow, ZoneOffset.UTC) class OutboxPurgerTest : FunSpec({ - test("tick removes entries older than retention duration") { + test("tick removes entries older than retention duration with correct batch size") { var capturedCutoff: Instant? = null + var capturedLimit: Int? = null val latch = CountDownLatch(1) - val store = stubStore(onRemove = { time, _ -> + val store = stubStore(onRemove = { time, limit -> capturedCutoff = time + capturedLimit = limit latch.countDown() 0 }) @@ -38,6 +40,7 @@ class OutboxPurgerTest : FunSpec({ purger.stop() capturedCutoff shouldBe fixedNow.minus(Duration.ofDays(7)) + capturedLimit shouldBe 100 } test("batch loop stops when deleted < batchSize") { @@ -142,6 +145,17 @@ class OutboxPurgerTest : FunSpec({ OutboxPurger(stubStore(), intervalMs = -1, clock = fixedClock) } } + + test("start after stop throws") { + val purger = OutboxPurger(stubStore(), intervalMs = 60_000, batchSize = 100, clock = fixedClock) + + purger.start() + purger.stop() + + shouldThrow { + purger.start() + }.message shouldBe "OutboxPurger cannot be restarted after stop()" + } }) private fun stubStore(onRemove: (Instant, Int) -> Int = { _, _ -> 0 }) = object : OutboxStore { diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt index a2ac23b..5db1405 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt @@ -37,8 +37,11 @@ class OutboxPurgerScheduler( } override fun stop(callback: Runnable) { - purger.stop() - callback.run() + try { + purger.stop() + } finally { + callback.run() + } } override fun isRunning(): Boolean = purger.isRunning() From 41cfa2579a0ba63effb5a8e803653fd6b69c8fe0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 12:58:34 +0100 Subject: [PATCH 13/14] refactor(spring): rename OkapiPurgerProperties to OutboxPurgerProperties, improve test assertions - Rename OkapiPurgerProperties -> OutboxPurgerProperties (consistent with Outbox* naming convention) - Add latch.await() shouldBe true assertions for better timeout diagnostics - Add stop callback invocation test for SmartLifecycle contract --- .../softwaremill/okapi/core/OutboxPurgerTest.kt | 10 +++++----- .../okapi/springboot/OutboxAutoConfiguration.kt | 4 ++-- ...rProperties.kt => OutboxPurgerProperties.kt} | 2 +- .../META-INF/spring-configuration-metadata.json | 2 +- .../OutboxPurgerAutoConfigurationTest.kt | 17 +++++++++++++++-- 5 files changed, 24 insertions(+), 11 deletions(-) rename okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/{OkapiPurgerProperties.kt => OutboxPurgerProperties.kt} (92%) diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt index 43f6c17..7d7199b 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt @@ -36,7 +36,7 @@ class OutboxPurgerTest : FunSpec({ ) purger.start() - latch.await(2, TimeUnit.SECONDS) + latch.await(2, TimeUnit.SECONDS) shouldBe true purger.stop() capturedCutoff shouldBe fixedNow.minus(Duration.ofDays(7)) @@ -58,7 +58,7 @@ class OutboxPurgerTest : FunSpec({ val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock) purger.start() - latch.await(2, TimeUnit.SECONDS) + latch.await(2, TimeUnit.SECONDS) shouldBe true purger.stop() callCount.get() shouldBe 2 @@ -75,7 +75,7 @@ class OutboxPurgerTest : FunSpec({ val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock) purger.start() - latch.await(2, TimeUnit.SECONDS) + latch.await(2, TimeUnit.SECONDS) shouldBe true purger.stop() callCount.get() shouldBe 10 @@ -93,7 +93,7 @@ class OutboxPurgerTest : FunSpec({ val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock) purger.start() - latch.await(2, TimeUnit.SECONDS) + latch.await(2, TimeUnit.SECONDS) shouldBe true purger.stop() callCount.get() shouldBe 2 @@ -111,7 +111,7 @@ class OutboxPurgerTest : FunSpec({ val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock) purger.start() purger.start() // second start should be ignored - latch.await(2, TimeUnit.SECONDS) + latch.await(2, TimeUnit.SECONDS) shouldBe true purger.stop() purger.isRunning() shouldBe false diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt index 5ca0890..18695e3 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt @@ -42,7 +42,7 @@ import javax.sql.DataSource * - [PlatformTransactionManager] — if absent, each store call runs in its own transaction */ @AutoConfiguration -@EnableConfigurationProperties(OkapiPurgerProperties::class) +@EnableConfigurationProperties(OutboxPurgerProperties::class) class OutboxAutoConfiguration { @Bean @ConditionalOnMissingBean @@ -96,7 +96,7 @@ class OutboxAutoConfiguration { @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "okapi.purger", name = ["enabled"], havingValue = "true", matchIfMissing = true) fun outboxPurgerScheduler( - props: OkapiPurgerProperties, + props: OutboxPurgerProperties, outboxStore: OutboxStore, clock: ObjectProvider, ): OutboxPurgerScheduler { diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiPurgerProperties.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerProperties.kt similarity index 92% rename from okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiPurgerProperties.kt rename to okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerProperties.kt index a0e067f..f1ec3e8 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiPurgerProperties.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerProperties.kt @@ -6,7 +6,7 @@ import org.springframework.validation.annotation.Validated @ConfigurationProperties(prefix = "okapi.purger") @Validated -data class OkapiPurgerProperties( +data class OutboxPurgerProperties( @field:Min(1) val retentionDays: Long = 7, @field:Min(1) val intervalMinutes: Long = 60, @field:Min(1) val batchSize: Int = 100, diff --git a/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json b/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json index 6ba41df..29b458e 100644 --- a/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json +++ b/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json @@ -2,7 +2,7 @@ "groups": [ { "name": "okapi.purger", - "type": "com.softwaremill.okapi.springboot.OkapiPurgerProperties", + "type": "com.softwaremill.okapi.springboot.OutboxPurgerProperties", "description": "Outbox purger configuration." } ], diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerAutoConfigurationTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerAutoConfigurationTest.kt index e4df661..8764cf5 100644 --- a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerAutoConfigurationTest.kt +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerAutoConfigurationTest.kt @@ -41,7 +41,7 @@ class OutboxPurgerAutoConfigurationTest : FunSpec({ "okapi.purger.batch-size=200", ) .run { ctx -> - val props = ctx.getBean(OkapiPurgerProperties::class.java) + val props = ctx.getBean(OutboxPurgerProperties::class.java) props.retentionDays shouldBe 14 props.intervalMinutes shouldBe 30 props.batchSize shouldBe 200 @@ -50,7 +50,7 @@ class OutboxPurgerAutoConfigurationTest : FunSpec({ test("default properties when nothing is configured") { contextRunner.run { ctx -> - val props = ctx.getBean(OkapiPurgerProperties::class.java) + val props = ctx.getBean(OutboxPurgerProperties::class.java) props.retentionDays shouldBe 7 props.intervalMinutes shouldBe 60 props.batchSize shouldBe 100 @@ -71,6 +71,19 @@ class OutboxPurgerAutoConfigurationTest : FunSpec({ ctx.startupFailure.shouldNotBeNull() } } + + test("stop callback is always invoked") { + val scheduler = OutboxPurgerScheduler( + outboxStore = stubStore(), + intervalMinutes = 60, + ) + scheduler.start() + + var callbackInvoked = false + scheduler.stop { callbackInvoked = true } + + callbackInvoked shouldBe true + } }) private fun stubStore() = object : OutboxStore { From 46811643b79eeb71f255db2772f3e9b0afcb645b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 26 Mar 2026 13:03:20 +0100 Subject: [PATCH 14/14] =?UTF-8?q?docs:=20fix=20KDoc=20accuracy=20=E2=80=94?= =?UTF-8?q?=20deliveryType=20routing,=20sync=20stop,=20batch-size=20descri?= =?UTF-8?q?ption?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../softwaremill/okapi/springboot/OutboxAutoConfiguration.kt | 2 +- .../com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt | 2 +- .../main/resources/META-INF/spring-configuration-metadata.json | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt index 18695e3..3b8f5e1 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt @@ -31,7 +31,7 @@ import javax.sql.DataSource * - One or more [MessageDeliverer] beans — transport implementations * (e.g. HttpMessageDeliverer, KafkaMessageDeliverer). * Multiple deliverers are automatically wrapped in [CompositeMessageDeliverer] - * and routed by the `type` field in each entry's deliveryMetadata. + * and routed by [OutboxEntry.deliveryType]. * * Optional beans with defaults: * - [OutboxStore] — auto-configured to [PostgresOutboxStore] or [MysqlOutboxStore] diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt index 5db1405..9dec6cd 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt @@ -9,7 +9,7 @@ import java.time.Duration /** * Spring lifecycle wrapper for [OutboxPurger]. * - * Uses [SmartLifecycle] for phase-ordered startup/shutdown and async stop support. + * Uses [SmartLifecycle] for phase-ordered startup/shutdown. * Enabled by default; disable with `okapi.purger.enabled=false`. */ class OutboxPurgerScheduler( diff --git a/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json b/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json index 29b458e..73586cc 100644 --- a/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json +++ b/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json @@ -29,7 +29,7 @@ "name": "okapi.purger.batch-size", "type": "java.lang.Integer", "defaultValue": 100, - "description": "Max entries deleted per batch in a single purge cycle." + "description": "Number of entries deleted per batch. Each purge cycle may execute multiple batches." } ] }