diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f222d06..b4b5e5d 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -12,6 +12,8 @@ kafkaClients = "4.2.0" spring = "7.0.6" springBoot = "4.0.4" wiremock = "3.13.2" +slf4j = "2.0.17" +assertj = "3.27.3" [libraries] kotlinGradlePlugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" } @@ -33,7 +35,12 @@ kafkaClients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka springContext = { module = "org.springframework:spring-context", version.ref = "spring" } springTx = { module = "org.springframework:spring-tx", version.ref = "spring" } springBootAutoconfigure = { module = "org.springframework.boot:spring-boot-autoconfigure", version.ref = "springBoot" } +springBootStarterValidation = { module = "org.springframework.boot:spring-boot-starter-validation", version.ref = "springBoot" } +springBootTest = { module = "org.springframework.boot:spring-boot-test", version.ref = "springBoot" } +assertjCore = { module = "org.assertj:assertj-core", version.ref = "assertj" } wiremock = { module = "org.wiremock:wiremock", version.ref = "wiremock" } +slf4jApi = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } +slf4jSimple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" } [plugins] ktlint = { id = "org.jlleitschuh.gradle.ktlint", version.ref = "ktlint" } diff --git a/okapi-core/build.gradle.kts b/okapi-core/build.gradle.kts index 0bdb31f..34f5112 100644 --- a/okapi-core/build.gradle.kts +++ b/okapi-core/build.gradle.kts @@ -3,6 +3,8 @@ plugins { } dependencies { + implementation(libs.slf4jApi) testImplementation(libs.kotestRunnerJunit5) testImplementation(libs.kotestAssertionsCore) + testRuntimeOnly(libs.slf4jSimple) } diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt index 803f94a..0e48f9f 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt @@ -1,45 +1,86 @@ package com.softwaremill.okapi.core +import org.slf4j.LoggerFactory import java.time.Clock import java.time.Duration import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean /** * Periodically removes DELIVERED outbox entries older than [retentionDuration]. * * Runs on a single daemon thread with explicit [start]/[stop] lifecycle. - * Delegates to [OutboxStore.removeDeliveredBefore] — works with any storage adapter. + * [start] and [stop] are single-use -- the internal executor cannot be restarted after shutdown. + * [AtomicBoolean] guards against accidental double-start, not restart. + * + * Delegates to [OutboxStore.removeDeliveredBefore] -- works with any storage adapter. */ class OutboxPurger( private val outboxStore: OutboxStore, private val retentionDuration: Duration = Duration.ofDays(7), private val intervalMs: Long = 3_600_000L, + private val batchSize: Int = 100, private val clock: Clock = Clock.systemUTC(), ) { + init { + require(retentionDuration > Duration.ZERO) { "retentionDuration must be positive, got: $retentionDuration" } + require(intervalMs > 0) { "intervalMs must be positive, got: $intervalMs" } + require(batchSize > 0) { "batchSize must be positive, got: $batchSize" } + } + + private val running = AtomicBoolean(false) + private val scheduler: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor { r -> Thread(r, "outbox-purger").apply { isDaemon = true } } fun start() { - scheduler.scheduleWithFixedDelay( - ::tick, - intervalMs, + check(!scheduler.isShutdown) { "OutboxPurger cannot be restarted after stop()" } + if (!running.compareAndSet(false, true)) return + logger.info( + "Outbox purger started [retention={}, interval={}ms, batchSize={}]", + retentionDuration, intervalMs, - TimeUnit.MILLISECONDS, + batchSize, ) + scheduler.scheduleWithFixedDelay(::tick, intervalMs, intervalMs, TimeUnit.MILLISECONDS) } fun stop() { + if (!running.compareAndSet(true, false)) return scheduler.shutdown() if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { scheduler.shutdownNow() } + logger.info("Outbox purger stopped") } + fun isRunning(): Boolean = running.get() + private fun tick() { - outboxStore.removeDeliveredBefore(clock.instant().minus(retentionDuration)) + try { + val cutoff = clock.instant().minus(retentionDuration) + var totalDeleted = 0 + var batches = 0 + do { + val deleted = outboxStore.removeDeliveredBefore(cutoff, batchSize) + totalDeleted += deleted + batches++ + } while (deleted == batchSize && batches < MAX_BATCHES_PER_TICK) + + if (totalDeleted > 0) { + logger.debug("Purged {} delivered entries in {} batches", totalDeleted, batches) + } + } catch (e: Exception) { + logger.error("Outbox purge failed, will retry at next scheduled interval", e) + } + } + + companion object { + private val logger = LoggerFactory.getLogger(OutboxPurger::class.java) + private const val MAX_BATCHES_PER_TICK = 10 } } diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt index 13631ca..b9252fb 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt @@ -12,8 +12,12 @@ interface OutboxStore { /** Updates an entry after a delivery attempt (status change, retries, lastError). */ fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry - /** Removes DELIVERED entries older than [time]. */ - fun removeDeliveredBefore(time: Instant) + /** + * Removes up to [limit] DELIVERED entries older than [time]. + * @param limit maximum number of entries to delete; must be positive + * @return the number of entries actually deleted, always in `[0, limit]` + */ + fun removeDeliveredBefore(time: Instant, limit: Int): Int /** Returns the oldest createdAt per status (useful for lag metrics). */ fun findOldestCreatedAt(statuses: Set): Map diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt index a104acf..9d5bb1b 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt @@ -36,7 +36,7 @@ class OutboxProcessorTest : override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = entry.also { processedEntries += it } - override fun removeDeliveredBefore(time: Instant) = Unit + override fun removeDeliveredBefore(time: Instant, limit: Int): Int = 0 override fun findOldestCreatedAt(statuses: Set) = emptyMap() diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPublisherTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPublisherTest.kt index 7428dad..9f324dc 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPublisherTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPublisherTest.kt @@ -27,7 +27,7 @@ class OutboxPublisherTest : override fun updateAfterProcessing(entry: OutboxEntry) = entry - override fun removeDeliveredBefore(time: Instant) = Unit + override fun removeDeliveredBefore(time: Instant, limit: Int): Int = 0 override fun findOldestCreatedAt(statuses: Set) = emptyMap() diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt index 02d5e90..7d7199b 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt @@ -1,40 +1,168 @@ package com.softwaremill.okapi.core +import io.kotest.assertions.throwables.shouldThrow import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.shouldBe import java.time.Clock import java.time.Duration import java.time.Instant import java.time.ZoneOffset +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger private val fixedNow = Instant.parse("2025-03-20T12:00:00Z") private val fixedClock = Clock.fixed(fixedNow, ZoneOffset.UTC) class OutboxPurgerTest : FunSpec({ - test("tick removes entries older than retention duration") { + + test("tick removes entries older than retention duration with correct batch size") { var capturedCutoff: Instant? = null - val store = object : OutboxStore { - override fun persist(entry: OutboxEntry) = entry - override fun claimPending(limit: Int) = emptyList() - override fun updateAfterProcessing(entry: OutboxEntry) = entry - override fun removeDeliveredBefore(time: Instant) { - capturedCutoff = time - } - override fun findOldestCreatedAt(statuses: Set) = emptyMap() - override fun countByStatuses() = emptyMap() - } + var capturedLimit: Int? = null + val latch = CountDownLatch(1) + val store = stubStore(onRemove = { time, limit -> + capturedCutoff = time + capturedLimit = limit + latch.countDown() + 0 + }) val purger = OutboxPurger( outboxStore = store, retentionDuration = Duration.ofDays(7), intervalMs = 50, + batchSize = 100, clock = fixedClock, ) purger.start() - Thread.sleep(150) + latch.await(2, TimeUnit.SECONDS) shouldBe true purger.stop() capturedCutoff shouldBe fixedNow.minus(Duration.ofDays(7)) + capturedLimit shouldBe 100 + } + + test("batch loop stops when deleted < batchSize") { + val callCount = AtomicInteger(0) + val latch = CountDownLatch(1) + val store = stubStore(onRemove = { _, _ -> + val count = callCount.incrementAndGet() + if (count == 1) { + 100 // first batch: full + } else { + latch.countDown() + 42 // second batch: partial, loop stops + } + }) + + val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock) + purger.start() + latch.await(2, TimeUnit.SECONDS) shouldBe true + purger.stop() + + callCount.get() shouldBe 2 + } + + test("batch loop respects MAX_BATCHES_PER_TICK") { + val callCount = AtomicInteger(0) + val latch = CountDownLatch(1) + val store = stubStore(onRemove = { _, _ -> + val count = callCount.incrementAndGet() + if (count >= 10) latch.countDown() + 100 // always full, would loop forever without guard + }) + + val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock) + purger.start() + latch.await(2, TimeUnit.SECONDS) shouldBe true + purger.stop() + + callCount.get() shouldBe 10 + } + + test("exception in tick does not kill scheduler") { + val callCount = AtomicInteger(0) + val latch = CountDownLatch(2) + val store = stubStore(onRemove = { _, _ -> + val count = callCount.incrementAndGet() + latch.countDown() + if (count == 1) throw RuntimeException("db connection lost") + 0 + }) + + val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock) + purger.start() + latch.await(2, TimeUnit.SECONDS) shouldBe true + purger.stop() + + callCount.get() shouldBe 2 + } + + test("double start is ignored") { + val callCount = AtomicInteger(0) + val latch = CountDownLatch(1) + val store = stubStore(onRemove = { _, _ -> + callCount.incrementAndGet() + latch.countDown() + 0 + }) + + val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock) + purger.start() + purger.start() // second start should be ignored + latch.await(2, TimeUnit.SECONDS) shouldBe true + purger.stop() + + purger.isRunning() shouldBe false + } + + test("isRunning transitions") { + val store = stubStore(onRemove = { _, _ -> 0 }) + val purger = OutboxPurger(store, intervalMs = 60_000, batchSize = 100, clock = fixedClock) + + purger.isRunning() shouldBe false + purger.start() + purger.isRunning() shouldBe true + purger.stop() + purger.isRunning() shouldBe false + } + + test("constructor rejects invalid batchSize") { + shouldThrow { + OutboxPurger(stubStore(), batchSize = 0, clock = fixedClock) + } + } + + test("constructor rejects zero retentionDuration") { + shouldThrow { + OutboxPurger(stubStore(), retentionDuration = Duration.ZERO, clock = fixedClock) + } + } + + test("constructor rejects negative intervalMs") { + shouldThrow { + OutboxPurger(stubStore(), intervalMs = -1, clock = fixedClock) + } + } + + test("start after stop throws") { + val purger = OutboxPurger(stubStore(), intervalMs = 60_000, batchSize = 100, clock = fixedClock) + + purger.start() + purger.stop() + + shouldThrow { + purger.start() + }.message shouldBe "OutboxPurger cannot be restarted after stop()" } }) + +private fun stubStore(onRemove: (Instant, Int) -> Int = { _, _ -> 0 }) = object : OutboxStore { + override fun persist(entry: OutboxEntry) = entry + override fun claimPending(limit: Int) = emptyList() + override fun updateAfterProcessing(entry: OutboxEntry) = entry + override fun removeDeliveredBefore(time: Instant, limit: Int): Int = onRemove(time, limit) + override fun findOldestCreatedAt(statuses: Set) = emptyMap() + override fun countByStatuses() = emptyMap() +} diff --git a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt index fb9940b..6e61ccf 100644 --- a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt +++ b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt @@ -4,14 +4,11 @@ import com.softwaremill.okapi.core.OutboxEntry import com.softwaremill.okapi.core.OutboxId import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore +import org.jetbrains.exposed.v1.core.IntegerColumnType import org.jetbrains.exposed.v1.core.alias -import org.jetbrains.exposed.v1.core.and import org.jetbrains.exposed.v1.core.count -import org.jetbrains.exposed.v1.core.eq import org.jetbrains.exposed.v1.core.inList -import org.jetbrains.exposed.v1.core.less import org.jetbrains.exposed.v1.core.min -import org.jetbrains.exposed.v1.jdbc.deleteWhere import org.jetbrains.exposed.v1.jdbc.select import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager import org.jetbrains.exposed.v1.jdbc.upsert @@ -56,10 +53,28 @@ class MysqlOutboxStore( override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = persist(entry) - override fun removeDeliveredBefore(time: Instant) { - OutboxTable.deleteWhere { - (OutboxTable.status eq OutboxStatus.DELIVERED.name) and (OutboxTable.lastAttempt less time) - } + override fun removeDeliveredBefore(time: Instant, limit: Int): Int { + val sql = """ + DELETE FROM ${OutboxTable.tableName} WHERE ${OutboxTable.id.name} IN ( + SELECT ${OutboxTable.id.name} FROM ( + SELECT ${OutboxTable.id.name} FROM ${OutboxTable.tableName} + WHERE ${OutboxTable.status.name} = '${OutboxStatus.DELIVERED}' + AND ${OutboxTable.lastAttempt.name} < ? + ORDER BY ${OutboxTable.id.name} + LIMIT ? + FOR UPDATE SKIP LOCKED + ) AS batch + ) + """.trimIndent() + + val statement = TransactionManager.current().connection.prepareStatement(sql, false) + statement.fillParameters( + listOf( + OutboxTable.lastAttempt.columnType to time, + IntegerColumnType() to limit, + ), + ) + return statement.executeUpdate() } override fun findOldestCreatedAt(statuses: Set): Map { diff --git a/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/002__add_purger_index.sql b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/002__add_purger_index.sql new file mode 100644 index 0000000..b299cb0 --- /dev/null +++ b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/002__add_purger_index.sql @@ -0,0 +1,4 @@ +--liquibase formatted sql +--changeset outbox:002 + +CREATE INDEX idx_outbox_status_last_attempt ON outbox(status, last_attempt); diff --git a/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml index d249d7f..fe5d7b0 100644 --- a/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml +++ b/okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml @@ -4,4 +4,5 @@ xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-latest.xsd"> + diff --git a/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt b/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt index f5c9882..cbaa5c3 100644 --- a/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt +++ b/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt @@ -74,7 +74,7 @@ class MysqlOutboxStoreTest : BehaviorSpec({ lastAttempt = Instant.parse("2020-01-01T00:00:00Z"), ) transaction(db) { store.persist(delivered) } - transaction(db) { store.removeDeliveredBefore(Instant.parse("2025-01-01T00:00:00Z")) } + transaction(db) { store.removeDeliveredBefore(Instant.parse("2025-01-01T00:00:00Z"), Int.MAX_VALUE) } then("old delivered entries are removed") { val counts = transaction(db) { store.countByStatuses() } @@ -83,6 +83,31 @@ class MysqlOutboxStoreTest : BehaviorSpec({ } } + given("removeDeliveredBefore with limit") { + `when`("limit is smaller than matching entries") { + transaction(db) { exec("DELETE FROM outbox") } + repeat(5) { + val entry = newEntry().copy( + status = OutboxStatus.DELIVERED, + lastAttempt = Instant.parse("2020-01-01T00:00:00Z"), + ) + transaction(db) { store.persist(entry) } + } + + val deleted = transaction(db) { + store.removeDeliveredBefore(Instant.parse("2025-01-01T00:00:00Z"), 3) + } + + then("only deletes up to limit") { + deleted shouldBe 3 + } + then("remaining entries still exist") { + val counts = transaction(db) { store.countByStatuses() } + counts[OutboxStatus.DELIVERED] shouldBe 2L + } + } + } + given("countByStatuses") { `when`("entries exist with different statuses") { transaction(db) { diff --git a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt index f7ca77f..9dd3eba 100644 --- a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt +++ b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt @@ -4,14 +4,11 @@ import com.softwaremill.okapi.core.OutboxEntry import com.softwaremill.okapi.core.OutboxId import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore +import org.jetbrains.exposed.v1.core.IntegerColumnType import org.jetbrains.exposed.v1.core.alias -import org.jetbrains.exposed.v1.core.and import org.jetbrains.exposed.v1.core.count -import org.jetbrains.exposed.v1.core.eq import org.jetbrains.exposed.v1.core.inList -import org.jetbrains.exposed.v1.core.less import org.jetbrains.exposed.v1.core.min -import org.jetbrains.exposed.v1.jdbc.deleteWhere import org.jetbrains.exposed.v1.jdbc.select import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager import org.jetbrains.exposed.v1.jdbc.upsert @@ -56,10 +53,26 @@ class PostgresOutboxStore( override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = persist(entry) - override fun removeDeliveredBefore(time: Instant) { - OutboxTable.deleteWhere { - (OutboxTable.status eq OutboxStatus.DELIVERED.name) and (OutboxTable.lastAttempt less time) - } + override fun removeDeliveredBefore(time: Instant, limit: Int): Int { + val sql = """ + DELETE FROM ${OutboxTable.tableName} WHERE ${OutboxTable.id.name} IN ( + SELECT ${OutboxTable.id.name} FROM ${OutboxTable.tableName} + WHERE ${OutboxTable.status.name} = '${OutboxStatus.DELIVERED}' + AND ${OutboxTable.lastAttempt.name} < ? + ORDER BY ${OutboxTable.id.name} + LIMIT ? + FOR UPDATE SKIP LOCKED + ) + """.trimIndent() + + val statement = TransactionManager.current().connection.prepareStatement(sql, false) + statement.fillParameters( + listOf( + OutboxTable.lastAttempt.columnType to time, + IntegerColumnType() to limit, + ), + ) + return statement.executeUpdate() } override fun findOldestCreatedAt(statuses: Set): Map { diff --git a/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/003__add_purger_index.sql b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/003__add_purger_index.sql new file mode 100644 index 0000000..ef3e169 --- /dev/null +++ b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/003__add_purger_index.sql @@ -0,0 +1,4 @@ +--liquibase formatted sql +--changeset outbox:003 + +CREATE INDEX idx_outbox_status_last_attempt ON outbox(status, last_attempt); diff --git a/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/changelog.xml b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/changelog.xml index a9e4e66..d0497e3 100644 --- a/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/changelog.xml +++ b/okapi-postgres/src/main/resources/com/softwaremill/okapi/db/changelog.xml @@ -5,4 +5,5 @@ http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-latest.xsd"> + diff --git a/okapi-spring-boot/build.gradle.kts b/okapi-spring-boot/build.gradle.kts index 596a9ef..584286e 100644 --- a/okapi-spring-boot/build.gradle.kts +++ b/okapi-spring-boot/build.gradle.kts @@ -8,6 +8,11 @@ dependencies { compileOnly(libs.springContext) compileOnly(libs.springTx) compileOnly(libs.springBootAutoconfigure) + + // Validation annotations for @ConfigurationProperties classes + compileOnly(libs.springBootStarterValidation) + + // Optional store autoconfiguration — compileOnly + @ConditionalOnClass guards runtime absence compileOnly(project(":okapi-postgres")) compileOnly(project(":okapi-mysql")) compileOnly(libs.liquibaseCore) @@ -18,6 +23,8 @@ dependencies { testImplementation(libs.springTx) testImplementation(libs.exposedCore) testImplementation(libs.springBootAutoconfigure) + testImplementation(libs.springBootTest) + testImplementation(libs.assertjCore) testImplementation(project(":okapi-postgres")) testImplementation(project(":okapi-mysql")) testImplementation(project(":okapi-http")) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt index 013e0af..3b8f5e1 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt @@ -16,6 +16,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean import org.springframework.boot.autoconfigure.condition.ConditionalOnClass import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.transaction.PlatformTransactionManager @@ -30,7 +31,7 @@ import javax.sql.DataSource * - One or more [MessageDeliverer] beans — transport implementations * (e.g. HttpMessageDeliverer, KafkaMessageDeliverer). * Multiple deliverers are automatically wrapped in [CompositeMessageDeliverer] - * and routed by the `type` field in each entry's deliveryMetadata. + * and routed by [OutboxEntry.deliveryType]. * * Optional beans with defaults: * - [OutboxStore] — auto-configured to [PostgresOutboxStore] or [MysqlOutboxStore] @@ -41,6 +42,7 @@ import javax.sql.DataSource * - [PlatformTransactionManager] — if absent, each store call runs in its own transaction */ @AutoConfiguration +@EnableConfigurationProperties(OutboxPurgerProperties::class) class OutboxAutoConfiguration { @Bean @ConditionalOnMissingBean @@ -93,13 +95,25 @@ class OutboxAutoConfiguration { @Bean @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "okapi.purger", name = ["enabled"], havingValue = "true", matchIfMissing = true) - fun outboxPurgerScheduler(outboxStore: OutboxStore, clock: ObjectProvider): OutboxPurgerScheduler { + fun outboxPurgerScheduler( + props: OutboxPurgerProperties, + outboxStore: OutboxStore, + clock: ObjectProvider, + ): OutboxPurgerScheduler { return OutboxPurgerScheduler( outboxStore = outboxStore, + retentionDays = props.retentionDays, + intervalMinutes = props.intervalMinutes, + batchSize = props.batchSize, clock = clock.getIfAvailable { Clock.systemUTC() }, ) } + /** + * Auto-configures [PostgresOutboxStore] and Liquibase schema migration + * when `outbox-postgres` is on the classpath. + * Skipped if the application provides its own [OutboxStore] bean. + */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(PostgresOutboxStore::class) class PostgresStoreConfiguration { diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerProperties.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerProperties.kt new file mode 100644 index 0000000..f1ec3e8 --- /dev/null +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerProperties.kt @@ -0,0 +1,13 @@ +package com.softwaremill.okapi.springboot + +import jakarta.validation.constraints.Min +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.validation.annotation.Validated + +@ConfigurationProperties(prefix = "okapi.purger") +@Validated +data class OutboxPurgerProperties( + @field:Min(1) val retentionDays: Long = 7, + @field:Min(1) val intervalMinutes: Long = 60, + @field:Min(1) val batchSize: Int = 100, +) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt index 7858a59..9dec6cd 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt @@ -2,37 +2,54 @@ package com.softwaremill.okapi.springboot import com.softwaremill.okapi.core.OutboxPurger import com.softwaremill.okapi.core.OutboxStore -import org.springframework.beans.factory.DisposableBean -import org.springframework.beans.factory.SmartInitializingSingleton +import org.springframework.context.SmartLifecycle import java.time.Clock import java.time.Duration /** * Spring lifecycle wrapper for [OutboxPurger]. * - * Starts purging after all beans are initialized, stops on context close. + * Uses [SmartLifecycle] for phase-ordered startup/shutdown. * Enabled by default; disable with `okapi.purger.enabled=false`. */ class OutboxPurgerScheduler( outboxStore: OutboxStore, retentionDays: Long = 7, intervalMinutes: Long = 60, + batchSize: Int = 100, clock: Clock = Clock.systemUTC(), -) : SmartInitializingSingleton, - DisposableBean { +) : SmartLifecycle { private val purger = OutboxPurger( outboxStore = outboxStore, retentionDuration = Duration.ofDays(retentionDays), intervalMs = intervalMinutes * 60 * 1_000, + batchSize = batchSize, clock = clock, ) - override fun afterSingletonsInstantiated() { + override fun start() { purger.start() } - override fun destroy() { + override fun stop() { purger.stop() } + + override fun stop(callback: Runnable) { + try { + purger.stop() + } finally { + callback.run() + } + } + + override fun isRunning(): Boolean = purger.isRunning() + + override fun getPhase(): Int = PURGER_PHASE + + companion object { + /** Start late (after app beans), stop early (before app beans). */ + const val PURGER_PHASE = Integer.MAX_VALUE - 1024 + } } diff --git a/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json b/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json new file mode 100644 index 0000000..73586cc --- /dev/null +++ b/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json @@ -0,0 +1,35 @@ +{ + "groups": [ + { + "name": "okapi.purger", + "type": "com.softwaremill.okapi.springboot.OutboxPurgerProperties", + "description": "Outbox purger configuration." + } + ], + "properties": [ + { + "name": "okapi.purger.enabled", + "type": "java.lang.Boolean", + "defaultValue": true, + "description": "Whether the outbox purger is enabled." + }, + { + "name": "okapi.purger.retention-days", + "type": "java.lang.Long", + "defaultValue": 7, + "description": "How many days to keep delivered entries before purging." + }, + { + "name": "okapi.purger.interval-minutes", + "type": "java.lang.Long", + "defaultValue": 60, + "description": "How often the purger runs, in minutes." + }, + { + "name": "okapi.purger.batch-size", + "type": "java.lang.Integer", + "defaultValue": 100, + "description": "Number of entries deleted per batch. Each purge cycle may execute multiple batches." + } + ] +} diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerAutoConfigurationTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerAutoConfigurationTest.kt new file mode 100644 index 0000000..8764cf5 --- /dev/null +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerAutoConfigurationTest.kt @@ -0,0 +1,101 @@ +package com.softwaremill.okapi.springboot + +import com.softwaremill.okapi.core.DeliveryResult +import com.softwaremill.okapi.core.MessageDeliverer +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.core.OutboxStore +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.nulls.shouldNotBeNull +import io.kotest.matchers.shouldBe +import org.springframework.boot.autoconfigure.AutoConfigurations +import org.springframework.boot.test.context.runner.ApplicationContextRunner +import java.time.Instant + +class OutboxPurgerAutoConfigurationTest : FunSpec({ + + val contextRunner = ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java)) + .withBean(OutboxStore::class.java, { stubStore() }) + .withBean(MessageDeliverer::class.java, { stubDeliverer() }) + + test("purger bean is created by default") { + contextRunner.run { ctx -> + ctx.getBean(OutboxPurgerScheduler::class.java).shouldNotBeNull() + } + } + + test("purger bean is not created when disabled") { + contextRunner + .withPropertyValues("okapi.purger.enabled=false") + .run { ctx -> + ctx.containsBean("outboxPurgerScheduler") shouldBe false + } + } + + test("properties are bound from application config") { + contextRunner + .withPropertyValues( + "okapi.purger.retention-days=14", + "okapi.purger.interval-minutes=30", + "okapi.purger.batch-size=200", + ) + .run { ctx -> + val props = ctx.getBean(OutboxPurgerProperties::class.java) + props.retentionDays shouldBe 14 + props.intervalMinutes shouldBe 30 + props.batchSize shouldBe 200 + } + } + + test("default properties when nothing is configured") { + contextRunner.run { ctx -> + val props = ctx.getBean(OutboxPurgerProperties::class.java) + props.retentionDays shouldBe 7 + props.intervalMinutes shouldBe 60 + props.batchSize shouldBe 100 + } + } + + test("SmartLifecycle is running after context start") { + contextRunner.run { ctx -> + val scheduler = ctx.getBean(OutboxPurgerScheduler::class.java) + scheduler.isRunning shouldBe true + } + } + + test("invalid retention-days triggers constructor validation") { + contextRunner + .withPropertyValues("okapi.purger.retention-days=0") + .run { ctx -> + ctx.startupFailure.shouldNotBeNull() + } + } + + test("stop callback is always invoked") { + val scheduler = OutboxPurgerScheduler( + outboxStore = stubStore(), + intervalMinutes = 60, + ) + scheduler.start() + + var callbackInvoked = false + scheduler.stop { callbackInvoked = true } + + callbackInvoked shouldBe true + } +}) + +private fun stubStore() = object : OutboxStore { + override fun persist(entry: OutboxEntry) = entry + override fun claimPending(limit: Int) = emptyList() + override fun updateAfterProcessing(entry: OutboxEntry) = entry + override fun removeDeliveredBefore(time: Instant, limit: Int) = 0 + override fun findOldestCreatedAt(statuses: Set) = emptyMap() + override fun countByStatuses() = emptyMap() +} + +private fun stubDeliverer() = object : MessageDeliverer { + override val type = "stub" + override fun deliver(entry: OutboxEntry) = DeliveryResult.Success +} diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/SpringOutboxPublisherTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/SpringOutboxPublisherTest.kt index 77555e8..b482d3e 100644 --- a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/SpringOutboxPublisherTest.kt +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/SpringOutboxPublisherTest.kt @@ -36,7 +36,7 @@ class SpringOutboxPublisherTest : override fun updateAfterProcessing(entry: OutboxEntry) = entry - override fun removeDeliveredBefore(time: Instant) = Unit + override fun removeDeliveredBefore(time: Instant, limit: Int): Int = 0 override fun findOldestCreatedAt(statuses: Set) = emptyMap()