Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ kafkaClients = "4.2.0"
spring = "7.0.6"
springBoot = "4.0.4"
wiremock = "3.13.2"
slf4j = "2.0.17"
assertj = "3.27.3"

[libraries]
kotlinGradlePlugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" }
Expand All @@ -33,7 +35,12 @@ kafkaClients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka
springContext = { module = "org.springframework:spring-context", version.ref = "spring" }
springTx = { module = "org.springframework:spring-tx", version.ref = "spring" }
springBootAutoconfigure = { module = "org.springframework.boot:spring-boot-autoconfigure", version.ref = "springBoot" }
springBootStarterValidation = { module = "org.springframework.boot:spring-boot-starter-validation", version.ref = "springBoot" }
springBootTest = { module = "org.springframework.boot:spring-boot-test", version.ref = "springBoot" }
assertjCore = { module = "org.assertj:assertj-core", version.ref = "assertj" }
wiremock = { module = "org.wiremock:wiremock", version.ref = "wiremock" }
slf4jApi = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
slf4jSimple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }

[plugins]
ktlint = { id = "org.jlleitschuh.gradle.ktlint", version.ref = "ktlint" }
2 changes: 2 additions & 0 deletions okapi-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ plugins {
}

dependencies {
implementation(libs.slf4jApi)
testImplementation(libs.kotestRunnerJunit5)
testImplementation(libs.kotestAssertionsCore)
testRuntimeOnly(libs.slf4jSimple)
}
Original file line number Diff line number Diff line change
@@ -1,45 +1,86 @@
package com.softwaremill.okapi.core

import org.slf4j.LoggerFactory
import java.time.Clock
import java.time.Duration
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

/**
* Periodically removes DELIVERED outbox entries older than [retentionDuration].
*
* Runs on a single daemon thread with explicit [start]/[stop] lifecycle.
* Delegates to [OutboxStore.removeDeliveredBefore] — works with any storage adapter.
* [start] and [stop] are single-use -- the internal executor cannot be restarted after shutdown.
* [AtomicBoolean] guards against accidental double-start, not restart.
*
* Delegates to [OutboxStore.removeDeliveredBefore] -- works with any storage adapter.
*/
class OutboxPurger(
private val outboxStore: OutboxStore,
private val retentionDuration: Duration = Duration.ofDays(7),
private val intervalMs: Long = 3_600_000L,
private val batchSize: Int = 100,
private val clock: Clock = Clock.systemUTC(),
) {
init {
require(retentionDuration > Duration.ZERO) { "retentionDuration must be positive, got: $retentionDuration" }
require(intervalMs > 0) { "intervalMs must be positive, got: $intervalMs" }
require(batchSize > 0) { "batchSize must be positive, got: $batchSize" }
}

private val running = AtomicBoolean(false)

private val scheduler: ScheduledExecutorService =
Executors.newSingleThreadScheduledExecutor { r ->
Thread(r, "outbox-purger").apply { isDaemon = true }
}

fun start() {
scheduler.scheduleWithFixedDelay(
::tick,
intervalMs,
check(!scheduler.isShutdown) { "OutboxPurger cannot be restarted after stop()" }
if (!running.compareAndSet(false, true)) return
logger.info(
"Outbox purger started [retention={}, interval={}ms, batchSize={}]",
retentionDuration,
intervalMs,
TimeUnit.MILLISECONDS,
batchSize,
)
scheduler.scheduleWithFixedDelay(::tick, intervalMs, intervalMs, TimeUnit.MILLISECONDS)
}

fun stop() {
if (!running.compareAndSet(true, false)) return
scheduler.shutdown()
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow()
}
logger.info("Outbox purger stopped")
}

fun isRunning(): Boolean = running.get()

private fun tick() {
outboxStore.removeDeliveredBefore(clock.instant().minus(retentionDuration))
try {
val cutoff = clock.instant().minus(retentionDuration)
var totalDeleted = 0
var batches = 0
do {
val deleted = outboxStore.removeDeliveredBefore(cutoff, batchSize)
totalDeleted += deleted
batches++
} while (deleted == batchSize && batches < MAX_BATCHES_PER_TICK)

if (totalDeleted > 0) {
logger.debug("Purged {} delivered entries in {} batches", totalDeleted, batches)
}
} catch (e: Exception) {
logger.error("Outbox purge failed, will retry at next scheduled interval", e)
}
}

companion object {
private val logger = LoggerFactory.getLogger(OutboxPurger::class.java)
private const val MAX_BATCHES_PER_TICK = 10
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ interface OutboxStore {
/** Updates an entry after a delivery attempt (status change, retries, lastError). */
fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry

/** Removes DELIVERED entries older than [time]. */
fun removeDeliveredBefore(time: Instant)
/**
* Removes up to [limit] DELIVERED entries older than [time].
* @param limit maximum number of entries to delete; must be positive
* @return the number of entries actually deleted, always in `[0, limit]`
*/
fun removeDeliveredBefore(time: Instant, limit: Int): Int

/** Returns the oldest createdAt per status (useful for lag metrics). */
fun findOldestCreatedAt(statuses: Set<OutboxStatus>): Map<OutboxStatus, Instant>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class OutboxProcessorTest :

override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = entry.also { processedEntries += it }

override fun removeDeliveredBefore(time: Instant) = Unit
override fun removeDeliveredBefore(time: Instant, limit: Int): Int = 0

override fun findOldestCreatedAt(statuses: Set<OutboxStatus>) = emptyMap<OutboxStatus, Instant>()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class OutboxPublisherTest :

override fun updateAfterProcessing(entry: OutboxEntry) = entry

override fun removeDeliveredBefore(time: Instant) = Unit
override fun removeDeliveredBefore(time: Instant, limit: Int): Int = 0

override fun findOldestCreatedAt(statuses: Set<OutboxStatus>) = emptyMap<OutboxStatus, Instant>()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,40 +1,168 @@
package com.softwaremill.okapi.core

import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.time.ZoneOffset
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

private val fixedNow = Instant.parse("2025-03-20T12:00:00Z")
private val fixedClock = Clock.fixed(fixedNow, ZoneOffset.UTC)

class OutboxPurgerTest : FunSpec({
test("tick removes entries older than retention duration") {

test("tick removes entries older than retention duration with correct batch size") {
var capturedCutoff: Instant? = null
val store = object : OutboxStore {
override fun persist(entry: OutboxEntry) = entry
override fun claimPending(limit: Int) = emptyList<OutboxEntry>()
override fun updateAfterProcessing(entry: OutboxEntry) = entry
override fun removeDeliveredBefore(time: Instant) {
capturedCutoff = time
}
override fun findOldestCreatedAt(statuses: Set<OutboxStatus>) = emptyMap<OutboxStatus, Instant>()
override fun countByStatuses() = emptyMap<OutboxStatus, Long>()
}
var capturedLimit: Int? = null
val latch = CountDownLatch(1)
val store = stubStore(onRemove = { time, limit ->
capturedCutoff = time
capturedLimit = limit
latch.countDown()
0
})

val purger = OutboxPurger(
outboxStore = store,
retentionDuration = Duration.ofDays(7),
intervalMs = 50,
batchSize = 100,
clock = fixedClock,
)

purger.start()
Thread.sleep(150)
latch.await(2, TimeUnit.SECONDS) shouldBe true
purger.stop()

capturedCutoff shouldBe fixedNow.minus(Duration.ofDays(7))
capturedLimit shouldBe 100
}

test("batch loop stops when deleted < batchSize") {
val callCount = AtomicInteger(0)
val latch = CountDownLatch(1)
val store = stubStore(onRemove = { _, _ ->
val count = callCount.incrementAndGet()
if (count == 1) {
100 // first batch: full
} else {
latch.countDown()
42 // second batch: partial, loop stops
}
})

val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock)
purger.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
purger.stop()

callCount.get() shouldBe 2
}

test("batch loop respects MAX_BATCHES_PER_TICK") {
val callCount = AtomicInteger(0)
val latch = CountDownLatch(1)
val store = stubStore(onRemove = { _, _ ->
val count = callCount.incrementAndGet()
if (count >= 10) latch.countDown()
100 // always full, would loop forever without guard
})

val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock)
purger.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
purger.stop()

callCount.get() shouldBe 10
}

test("exception in tick does not kill scheduler") {
val callCount = AtomicInteger(0)
val latch = CountDownLatch(2)
val store = stubStore(onRemove = { _, _ ->
val count = callCount.incrementAndGet()
latch.countDown()
if (count == 1) throw RuntimeException("db connection lost")
0
})

val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock)
purger.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
purger.stop()

callCount.get() shouldBe 2
}

test("double start is ignored") {
val callCount = AtomicInteger(0)
val latch = CountDownLatch(1)
val store = stubStore(onRemove = { _, _ ->
callCount.incrementAndGet()
latch.countDown()
0
})

val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock)
purger.start()
purger.start() // second start should be ignored
latch.await(2, TimeUnit.SECONDS) shouldBe true
purger.stop()

purger.isRunning() shouldBe false
}

test("isRunning transitions") {
val store = stubStore(onRemove = { _, _ -> 0 })
val purger = OutboxPurger(store, intervalMs = 60_000, batchSize = 100, clock = fixedClock)

purger.isRunning() shouldBe false
purger.start()
purger.isRunning() shouldBe true
purger.stop()
purger.isRunning() shouldBe false
}

test("constructor rejects invalid batchSize") {
shouldThrow<IllegalArgumentException> {
OutboxPurger(stubStore(), batchSize = 0, clock = fixedClock)
}
}

test("constructor rejects zero retentionDuration") {
shouldThrow<IllegalArgumentException> {
OutboxPurger(stubStore(), retentionDuration = Duration.ZERO, clock = fixedClock)
}
}

test("constructor rejects negative intervalMs") {
shouldThrow<IllegalArgumentException> {
OutboxPurger(stubStore(), intervalMs = -1, clock = fixedClock)
}
}

test("start after stop throws") {
val purger = OutboxPurger(stubStore(), intervalMs = 60_000, batchSize = 100, clock = fixedClock)

purger.start()
purger.stop()

shouldThrow<IllegalStateException> {
purger.start()
}.message shouldBe "OutboxPurger cannot be restarted after stop()"
}
})

private fun stubStore(onRemove: (Instant, Int) -> Int = { _, _ -> 0 }) = object : OutboxStore {
override fun persist(entry: OutboxEntry) = entry
override fun claimPending(limit: Int) = emptyList<OutboxEntry>()
override fun updateAfterProcessing(entry: OutboxEntry) = entry
override fun removeDeliveredBefore(time: Instant, limit: Int): Int = onRemove(time, limit)
override fun findOldestCreatedAt(statuses: Set<OutboxStatus>) = emptyMap<OutboxStatus, Instant>()
override fun countByStatuses() = emptyMap<OutboxStatus, Long>()
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@ import com.softwaremill.okapi.core.OutboxEntry
import com.softwaremill.okapi.core.OutboxId
import com.softwaremill.okapi.core.OutboxStatus
import com.softwaremill.okapi.core.OutboxStore
import org.jetbrains.exposed.v1.core.IntegerColumnType
import org.jetbrains.exposed.v1.core.alias
import org.jetbrains.exposed.v1.core.and
import org.jetbrains.exposed.v1.core.count
import org.jetbrains.exposed.v1.core.eq
import org.jetbrains.exposed.v1.core.inList
import org.jetbrains.exposed.v1.core.less
import org.jetbrains.exposed.v1.core.min
import org.jetbrains.exposed.v1.jdbc.deleteWhere
import org.jetbrains.exposed.v1.jdbc.select
import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager
import org.jetbrains.exposed.v1.jdbc.upsert
Expand Down Expand Up @@ -56,10 +53,28 @@ class MysqlOutboxStore(

override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = persist(entry)

override fun removeDeliveredBefore(time: Instant) {
OutboxTable.deleteWhere {
(OutboxTable.status eq OutboxStatus.DELIVERED.name) and (OutboxTable.lastAttempt less time)
}
override fun removeDeliveredBefore(time: Instant, limit: Int): Int {
val sql = """
DELETE FROM ${OutboxTable.tableName} WHERE ${OutboxTable.id.name} IN (
SELECT ${OutboxTable.id.name} FROM (
SELECT ${OutboxTable.id.name} FROM ${OutboxTable.tableName}
WHERE ${OutboxTable.status.name} = '${OutboxStatus.DELIVERED}'
AND ${OutboxTable.lastAttempt.name} < ?
ORDER BY ${OutboxTable.id.name}
LIMIT ?
FOR UPDATE SKIP LOCKED
) AS batch
)
""".trimIndent()

val statement = TransactionManager.current().connection.prepareStatement(sql, false)
statement.fillParameters(
listOf(
OutboxTable.lastAttempt.columnType to time,
IntegerColumnType() to limit,
),
)
return statement.executeUpdate()
}

override fun findOldestCreatedAt(statuses: Set<OutboxStatus>): Map<OutboxStatus, Instant> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
--liquibase formatted sql
--changeset outbox:002

CREATE INDEX idx_outbox_status_last_attempt ON outbox(status, last_attempt);
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-latest.xsd">
<include file="001__create_outbox_table.sql" relativeToChangelogFile="true"/>
<include file="002__add_purger_index.sql" relativeToChangelogFile="true"/>
</databaseChangeLog>
Loading