Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,7 @@ bin/
### Claude Code ###
.claude/
.ai/
CLAUDE.md
CLAUDE.md
docs/specs/
docs/plans/
docs/superpowers/
13 changes: 4 additions & 9 deletions okapi-spring-boot/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,11 @@ plugins {
dependencies {
implementation(project(":okapi-core"))

// Spring provided by the consuming application — compileOnly keeps this module free of version lock-in
compileOnly(libs.springContext)
compileOnly(libs.springTx)

// Spring Boot autoconfiguration support — compileOnly so we don't force Spring Boot on consumers
compileOnly(libs.springBootAutoconfigure)

// Optional postgres store autoconfiguration — compileOnly + @ConditionalOnClass guards runtime absence
compileOnly(project(":okapi-postgres"))

// Optional Liquibase migration support — compileOnly, activated only when liquibase-core is on the runtime classpath
compileOnly(project(":okapi-mysql"))
compileOnly(libs.liquibaseCore)

testImplementation(libs.kotestRunnerJunit5)
Expand All @@ -25,15 +19,16 @@ dependencies {
testImplementation(libs.exposedCore)
testImplementation(libs.springBootAutoconfigure)
testImplementation(project(":okapi-postgres"))

// E2E test dependencies
testImplementation(project(":okapi-mysql"))
testImplementation(project(":okapi-http"))
testImplementation(libs.exposedJdbc)
testImplementation(libs.exposedJson)
testImplementation(libs.exposedJavaTime)
testImplementation(libs.liquibaseCore)
testImplementation(libs.testcontainersPostgresql)
testImplementation(libs.postgresql)
testImplementation(libs.testcontainersMysql)
testImplementation(libs.mysql)
testImplementation(libs.wiremock)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.softwaremill.okapi.core.OutboxProcessor
import com.softwaremill.okapi.core.OutboxPublisher
import com.softwaremill.okapi.core.OutboxStore
import com.softwaremill.okapi.core.RetryPolicy
import com.softwaremill.okapi.mysql.MysqlOutboxStore
import com.softwaremill.okapi.postgres.PostgresOutboxStore
import liquibase.integration.spring.SpringLiquibase
import org.springframework.beans.factory.ObjectProvider
Expand All @@ -32,7 +33,9 @@ import javax.sql.DataSource
* and routed by the `type` field in each entry's deliveryMetadata.
*
* Optional beans with defaults:
* - [OutboxStore] — auto-configured to [PostgresOutboxStore] if `outbox-postgres` is on the classpath
* - [OutboxStore] — auto-configured to [PostgresOutboxStore] or [MysqlOutboxStore]
* depending on which module (`okapi-postgres` / `okapi-mysql`) is on the classpath.
* If both are present, Postgres takes priority. Override by defining your own `@Bean OutboxStore`.
* - [Clock] — defaults to [Clock.systemUTC]
* - [RetryPolicy] — defaults to `maxRetries = 5`
* - [PlatformTransactionManager] — if absent, each store call runs in its own transaction
Expand Down Expand Up @@ -97,38 +100,40 @@ class OutboxAutoConfiguration {
)
}

/**
* Auto-configures [PostgresOutboxStore] and Liquibase schema migration
* when `outbox-postgres` is on the classpath.
* Skipped if the application provides its own [OutboxStore] bean.
*/
@Configuration
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(PostgresOutboxStore::class)
class PostgresStoreConfiguration {
@Bean
@ConditionalOnMissingBean(OutboxStore::class)
fun outboxStore(clock: ObjectProvider<Clock>): OutboxStore = PostgresOutboxStore(clock = clock.getIfAvailable { Clock.systemUTC() })
fun outboxStore(clock: ObjectProvider<Clock>): PostgresOutboxStore =
PostgresOutboxStore(clock = clock.getIfAvailable { Clock.systemUTC() })

/**
* Runs okapi Liquibase migrations automatically when both:
* - `liquibase-core` is on the classpath (i.e. the application already uses Liquibase)
* - a [DataSource] bean is available
*
* The migration is idempotent (CREATE TABLE IF NOT EXISTS) and uses a separate
* Liquibase bean named `okapiLiquibase` to avoid conflicting with the application's
* own Liquibase configuration.
*
* To opt out, define your own bean named `okapiLiquibase` or include the okapi
* changelog manually in your master changelog:
* `classpath:com/softwaremill/okapi/db/changelog.xml`
*/
@Bean("okapiLiquibase")
@Bean("okapiPostgresLiquibase")
@ConditionalOnClass(SpringLiquibase::class)
@ConditionalOnBean(DataSource::class)
@ConditionalOnMissingBean(name = ["okapiLiquibase"])
fun okapiLiquibase(dataSource: DataSource): SpringLiquibase = SpringLiquibase().apply {
@ConditionalOnBean(value = [DataSource::class, PostgresOutboxStore::class])
@ConditionalOnMissingBean(name = ["okapiPostgresLiquibase"])
fun okapiPostgresLiquibase(dataSource: DataSource): SpringLiquibase = SpringLiquibase().apply {
this.dataSource = dataSource
changeLog = "classpath:com/softwaremill/okapi/db/changelog.xml"
}
}

/** When both Postgres and MySQL modules are on the classpath, [PostgresStoreConfiguration] takes priority. */
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(MysqlOutboxStore::class)
class MysqlStoreConfiguration {
@Bean
@ConditionalOnMissingBean(OutboxStore::class)
fun outboxStore(clock: ObjectProvider<Clock>): MysqlOutboxStore =
MysqlOutboxStore(clock = clock.getIfAvailable { Clock.systemUTC() })

@Bean("okapiMysqlLiquibase")
@ConditionalOnClass(SpringLiquibase::class)
@ConditionalOnBean(value = [DataSource::class, MysqlOutboxStore::class])
@ConditionalOnMissingBean(name = ["okapiMysqlLiquibase"])
fun okapiMysqlLiquibase(dataSource: DataSource): SpringLiquibase = SpringLiquibase().apply {
this.dataSource = dataSource
changeLog = "classpath:com/softwaremill/okapi/db/mysql/changelog.xml"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package com.softwaremill.okapi.springboot

import com.github.tomakehurst.wiremock.WireMockServer
import com.github.tomakehurst.wiremock.client.WireMock.aResponse
import com.github.tomakehurst.wiremock.client.WireMock.post
import com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor
import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo
import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
import com.softwaremill.okapi.core.OutboxEntryProcessor
import com.softwaremill.okapi.core.OutboxMessage
import com.softwaremill.okapi.core.OutboxProcessor
import com.softwaremill.okapi.core.OutboxPublisher
import com.softwaremill.okapi.core.OutboxStatus
import com.softwaremill.okapi.core.RetryPolicy
import com.softwaremill.okapi.http.HttpMessageDeliverer
import com.softwaremill.okapi.http.ServiceUrlResolver
import com.softwaremill.okapi.http.httpDeliveryInfo
import com.softwaremill.okapi.mysql.MysqlOutboxStore
import io.kotest.core.spec.style.BehaviorSpec
import io.kotest.matchers.shouldBe
import liquibase.Liquibase
import liquibase.database.DatabaseFactory
import liquibase.database.jvm.JdbcConnection
import liquibase.resource.ClassLoaderResourceAccessor
import org.jetbrains.exposed.v1.jdbc.Database
import org.jetbrains.exposed.v1.jdbc.transactions.transaction
import org.testcontainers.containers.MySQLContainer
import java.sql.DriverManager
import java.time.Clock

class OutboxMysqlEndToEndTest :
BehaviorSpec({
val mysql = MySQLContainer<Nothing>("mysql:8.0")
val wiremock = WireMockServer(wireMockConfig().dynamicPort())

lateinit var store: MysqlOutboxStore
lateinit var publisher: OutboxPublisher
lateinit var processor: OutboxProcessor

beforeSpec {
mysql.start()
wiremock.start()

Database.connect(
url = mysql.jdbcUrl,
driver = mysql.driverClassName,
user = mysql.username,
password = mysql.password,
)

val connection = DriverManager.getConnection(mysql.jdbcUrl, mysql.username, mysql.password)
val db = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(JdbcConnection(connection))
Liquibase("com/softwaremill/okapi/db/mysql/changelog.xml", ClassLoaderResourceAccessor(), db)
.use { it.update("") }
connection.close()

val clock = Clock.systemUTC()
store = MysqlOutboxStore(clock)
publisher = OutboxPublisher(store, clock)

val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" }
val deliverer = HttpMessageDeliverer(urlResolver)
val entryProcessor = OutboxEntryProcessor(deliverer, RetryPolicy(maxRetries = 3), clock)
processor = OutboxProcessor(store, entryProcessor)
}

afterSpec {
wiremock.stop()
mysql.stop()
}

beforeEach {
wiremock.resetAll()
transaction { exec("DELETE FROM outbox") }
}

given("a message published within a transaction") {
`when`("the HTTP endpoint returns 200") {
wiremock.stubFor(
post(urlEqualTo("/api/notify"))
.willReturn(aResponse().withStatus(200)),
)

transaction {
publisher.publish(
OutboxMessage("order.created", """{"orderId":"abc-123"}"""),
httpDeliveryInfo {
serviceName = "notification-service"
endpointPath = "/api/notify"
},
)
}

transaction { processor.processNext() }

val requests = wiremock.findAll(postRequestedFor(urlEqualTo("/api/notify")))
val counts = transaction { store.countByStatuses() }

then("WireMock receives exactly one POST request") {
requests.size shouldBe 1
}
then("request body matches the published payload") {
requests.first().bodyAsString shouldBe """{"orderId":"abc-123"}"""
}
then("entry is marked as DELIVERED") {
counts[OutboxStatus.DELIVERED] shouldBe 1L
}
}

`when`("the HTTP endpoint returns 500") {
wiremock.stubFor(
post(urlEqualTo("/api/notify"))
.willReturn(aResponse().withStatus(500).withBody("Internal Server Error")),
)

transaction {
publisher.publish(
OutboxMessage("order.created", """{"orderId":"xyz-456"}"""),
httpDeliveryInfo {
serviceName = "notification-service"
endpointPath = "/api/notify"
},
)
}

transaction { processor.processNext() }

val counts = transaction { store.countByStatuses() }

then("entry stays PENDING (retriable failure, retries remaining)") {
counts[OutboxStatus.PENDING] shouldBe 1L
}
then("no DELIVERED entries") {
counts[OutboxStatus.DELIVERED] shouldBe 0L
}
}

`when`("the HTTP endpoint returns 400") {
wiremock.stubFor(
post(urlEqualTo("/api/notify"))
.willReturn(aResponse().withStatus(400).withBody("Bad Request")),
)

transaction {
publisher.publish(
OutboxMessage("order.created", """{"orderId":"err-789"}"""),
httpDeliveryInfo {
serviceName = "notification-service"
endpointPath = "/api/notify"
},
)
}

transaction { processor.processNext() }

val counts = transaction { store.countByStatuses() }

then("entry is immediately FAILED (permanent failure)") {
counts[OutboxStatus.FAILED] shouldBe 1L
}
then("no PENDING or DELIVERED entries") {
counts[OutboxStatus.PENDING] shouldBe 0L
counts[OutboxStatus.DELIVERED] shouldBe 0L
}
}

`when`("the endpoint is unreachable") {
wiremock.stubFor(
post(urlEqualTo("/api/notify"))
.willReturn(
aResponse().withFault(
com.github.tomakehurst.wiremock.http.Fault.CONNECTION_RESET_BY_PEER,
),
),
)

transaction {
publisher.publish(
OutboxMessage("order.created", """{"orderId":"net-000"}"""),
httpDeliveryInfo {
serviceName = "notification-service"
endpointPath = "/api/notify"
},
)
}

transaction { processor.processNext() }

val counts = transaction { store.countByStatuses() }

then("entry stays PENDING (retriable network failure)") {
counts[OutboxStatus.PENDING] shouldBe 1L
}
}
}
})
Loading