diff --git a/.gitignore b/.gitignore index 3f2dd53..86a412e 100644 --- a/.gitignore +++ b/.gitignore @@ -44,4 +44,7 @@ bin/ ### Claude Code ### .claude/ .ai/ -CLAUDE.md \ No newline at end of file +CLAUDE.md +docs/specs/ +docs/plans/ +docs/superpowers/ diff --git a/okapi-spring-boot/build.gradle.kts b/okapi-spring-boot/build.gradle.kts index 1997b52..596a9ef 100644 --- a/okapi-spring-boot/build.gradle.kts +++ b/okapi-spring-boot/build.gradle.kts @@ -5,17 +5,11 @@ plugins { dependencies { implementation(project(":okapi-core")) - // Spring provided by the consuming application — compileOnly keeps this module free of version lock-in compileOnly(libs.springContext) compileOnly(libs.springTx) - - // Spring Boot autoconfiguration support — compileOnly so we don't force Spring Boot on consumers compileOnly(libs.springBootAutoconfigure) - - // Optional postgres store autoconfiguration — compileOnly + @ConditionalOnClass guards runtime absence compileOnly(project(":okapi-postgres")) - - // Optional Liquibase migration support — compileOnly, activated only when liquibase-core is on the runtime classpath + compileOnly(project(":okapi-mysql")) compileOnly(libs.liquibaseCore) testImplementation(libs.kotestRunnerJunit5) @@ -25,8 +19,7 @@ dependencies { testImplementation(libs.exposedCore) testImplementation(libs.springBootAutoconfigure) testImplementation(project(":okapi-postgres")) - - // E2E test dependencies + testImplementation(project(":okapi-mysql")) testImplementation(project(":okapi-http")) testImplementation(libs.exposedJdbc) testImplementation(libs.exposedJson) @@ -34,6 +27,8 @@ dependencies { testImplementation(libs.liquibaseCore) testImplementation(libs.testcontainersPostgresql) testImplementation(libs.postgresql) + testImplementation(libs.testcontainersMysql) + testImplementation(libs.mysql) testImplementation(libs.wiremock) } diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt index 05ccc4e..013e0af 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt @@ -7,6 +7,7 @@ import com.softwaremill.okapi.core.OutboxProcessor import com.softwaremill.okapi.core.OutboxPublisher import com.softwaremill.okapi.core.OutboxStore import com.softwaremill.okapi.core.RetryPolicy +import com.softwaremill.okapi.mysql.MysqlOutboxStore import com.softwaremill.okapi.postgres.PostgresOutboxStore import liquibase.integration.spring.SpringLiquibase import org.springframework.beans.factory.ObjectProvider @@ -32,7 +33,9 @@ import javax.sql.DataSource * and routed by the `type` field in each entry's deliveryMetadata. * * Optional beans with defaults: - * - [OutboxStore] — auto-configured to [PostgresOutboxStore] if `outbox-postgres` is on the classpath + * - [OutboxStore] — auto-configured to [PostgresOutboxStore] or [MysqlOutboxStore] + * depending on which module (`okapi-postgres` / `okapi-mysql`) is on the classpath. + * If both are present, Postgres takes priority. Override by defining your own `@Bean OutboxStore`. * - [Clock] — defaults to [Clock.systemUTC] * - [RetryPolicy] — defaults to `maxRetries = 5` * - [PlatformTransactionManager] — if absent, each store call runs in its own transaction @@ -97,38 +100,40 @@ class OutboxAutoConfiguration { ) } - /** - * Auto-configures [PostgresOutboxStore] and Liquibase schema migration - * when `outbox-postgres` is on the classpath. - * Skipped if the application provides its own [OutboxStore] bean. - */ - @Configuration + @Configuration(proxyBeanMethods = false) @ConditionalOnClass(PostgresOutboxStore::class) class PostgresStoreConfiguration { @Bean @ConditionalOnMissingBean(OutboxStore::class) - fun outboxStore(clock: ObjectProvider): OutboxStore = PostgresOutboxStore(clock = clock.getIfAvailable { Clock.systemUTC() }) + fun outboxStore(clock: ObjectProvider): PostgresOutboxStore = + PostgresOutboxStore(clock = clock.getIfAvailable { Clock.systemUTC() }) - /** - * Runs okapi Liquibase migrations automatically when both: - * - `liquibase-core` is on the classpath (i.e. the application already uses Liquibase) - * - a [DataSource] bean is available - * - * The migration is idempotent (CREATE TABLE IF NOT EXISTS) and uses a separate - * Liquibase bean named `okapiLiquibase` to avoid conflicting with the application's - * own Liquibase configuration. - * - * To opt out, define your own bean named `okapiLiquibase` or include the okapi - * changelog manually in your master changelog: - * `classpath:com/softwaremill/okapi/db/changelog.xml` - */ - @Bean("okapiLiquibase") + @Bean("okapiPostgresLiquibase") @ConditionalOnClass(SpringLiquibase::class) - @ConditionalOnBean(DataSource::class) - @ConditionalOnMissingBean(name = ["okapiLiquibase"]) - fun okapiLiquibase(dataSource: DataSource): SpringLiquibase = SpringLiquibase().apply { + @ConditionalOnBean(value = [DataSource::class, PostgresOutboxStore::class]) + @ConditionalOnMissingBean(name = ["okapiPostgresLiquibase"]) + fun okapiPostgresLiquibase(dataSource: DataSource): SpringLiquibase = SpringLiquibase().apply { this.dataSource = dataSource changeLog = "classpath:com/softwaremill/okapi/db/changelog.xml" } } + + /** When both Postgres and MySQL modules are on the classpath, [PostgresStoreConfiguration] takes priority. */ + @Configuration(proxyBeanMethods = false) + @ConditionalOnClass(MysqlOutboxStore::class) + class MysqlStoreConfiguration { + @Bean + @ConditionalOnMissingBean(OutboxStore::class) + fun outboxStore(clock: ObjectProvider): MysqlOutboxStore = + MysqlOutboxStore(clock = clock.getIfAvailable { Clock.systemUTC() }) + + @Bean("okapiMysqlLiquibase") + @ConditionalOnClass(SpringLiquibase::class) + @ConditionalOnBean(value = [DataSource::class, MysqlOutboxStore::class]) + @ConditionalOnMissingBean(name = ["okapiMysqlLiquibase"]) + fun okapiMysqlLiquibase(dataSource: DataSource): SpringLiquibase = SpringLiquibase().apply { + this.dataSource = dataSource + changeLog = "classpath:com/softwaremill/okapi/db/mysql/changelog.xml" + } + } } diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxMysqlEndToEndTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxMysqlEndToEndTest.kt new file mode 100644 index 0000000..18153ba --- /dev/null +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxMysqlEndToEndTest.kt @@ -0,0 +1,196 @@ +package com.softwaremill.okapi.springboot + +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.client.WireMock.aResponse +import com.github.tomakehurst.wiremock.client.WireMock.post +import com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor +import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo +import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig +import com.softwaremill.okapi.core.OutboxEntryProcessor +import com.softwaremill.okapi.core.OutboxMessage +import com.softwaremill.okapi.core.OutboxProcessor +import com.softwaremill.okapi.core.OutboxPublisher +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.core.RetryPolicy +import com.softwaremill.okapi.http.HttpMessageDeliverer +import com.softwaremill.okapi.http.ServiceUrlResolver +import com.softwaremill.okapi.http.httpDeliveryInfo +import com.softwaremill.okapi.mysql.MysqlOutboxStore +import io.kotest.core.spec.style.BehaviorSpec +import io.kotest.matchers.shouldBe +import liquibase.Liquibase +import liquibase.database.DatabaseFactory +import liquibase.database.jvm.JdbcConnection +import liquibase.resource.ClassLoaderResourceAccessor +import org.jetbrains.exposed.v1.jdbc.Database +import org.jetbrains.exposed.v1.jdbc.transactions.transaction +import org.testcontainers.containers.MySQLContainer +import java.sql.DriverManager +import java.time.Clock + +class OutboxMysqlEndToEndTest : + BehaviorSpec({ + val mysql = MySQLContainer("mysql:8.0") + val wiremock = WireMockServer(wireMockConfig().dynamicPort()) + + lateinit var store: MysqlOutboxStore + lateinit var publisher: OutboxPublisher + lateinit var processor: OutboxProcessor + + beforeSpec { + mysql.start() + wiremock.start() + + Database.connect( + url = mysql.jdbcUrl, + driver = mysql.driverClassName, + user = mysql.username, + password = mysql.password, + ) + + val connection = DriverManager.getConnection(mysql.jdbcUrl, mysql.username, mysql.password) + val db = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(JdbcConnection(connection)) + Liquibase("com/softwaremill/okapi/db/mysql/changelog.xml", ClassLoaderResourceAccessor(), db) + .use { it.update("") } + connection.close() + + val clock = Clock.systemUTC() + store = MysqlOutboxStore(clock) + publisher = OutboxPublisher(store, clock) + + val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } + val deliverer = HttpMessageDeliverer(urlResolver) + val entryProcessor = OutboxEntryProcessor(deliverer, RetryPolicy(maxRetries = 3), clock) + processor = OutboxProcessor(store, entryProcessor) + } + + afterSpec { + wiremock.stop() + mysql.stop() + } + + beforeEach { + wiremock.resetAll() + transaction { exec("DELETE FROM outbox") } + } + + given("a message published within a transaction") { + `when`("the HTTP endpoint returns 200") { + wiremock.stubFor( + post(urlEqualTo("/api/notify")) + .willReturn(aResponse().withStatus(200)), + ) + + transaction { + publisher.publish( + OutboxMessage("order.created", """{"orderId":"abc-123"}"""), + httpDeliveryInfo { + serviceName = "notification-service" + endpointPath = "/api/notify" + }, + ) + } + + transaction { processor.processNext() } + + val requests = wiremock.findAll(postRequestedFor(urlEqualTo("/api/notify"))) + val counts = transaction { store.countByStatuses() } + + then("WireMock receives exactly one POST request") { + requests.size shouldBe 1 + } + then("request body matches the published payload") { + requests.first().bodyAsString shouldBe """{"orderId":"abc-123"}""" + } + then("entry is marked as DELIVERED") { + counts[OutboxStatus.DELIVERED] shouldBe 1L + } + } + + `when`("the HTTP endpoint returns 500") { + wiremock.stubFor( + post(urlEqualTo("/api/notify")) + .willReturn(aResponse().withStatus(500).withBody("Internal Server Error")), + ) + + transaction { + publisher.publish( + OutboxMessage("order.created", """{"orderId":"xyz-456"}"""), + httpDeliveryInfo { + serviceName = "notification-service" + endpointPath = "/api/notify" + }, + ) + } + + transaction { processor.processNext() } + + val counts = transaction { store.countByStatuses() } + + then("entry stays PENDING (retriable failure, retries remaining)") { + counts[OutboxStatus.PENDING] shouldBe 1L + } + then("no DELIVERED entries") { + counts[OutboxStatus.DELIVERED] shouldBe 0L + } + } + + `when`("the HTTP endpoint returns 400") { + wiremock.stubFor( + post(urlEqualTo("/api/notify")) + .willReturn(aResponse().withStatus(400).withBody("Bad Request")), + ) + + transaction { + publisher.publish( + OutboxMessage("order.created", """{"orderId":"err-789"}"""), + httpDeliveryInfo { + serviceName = "notification-service" + endpointPath = "/api/notify" + }, + ) + } + + transaction { processor.processNext() } + + val counts = transaction { store.countByStatuses() } + + then("entry is immediately FAILED (permanent failure)") { + counts[OutboxStatus.FAILED] shouldBe 1L + } + then("no PENDING or DELIVERED entries") { + counts[OutboxStatus.PENDING] shouldBe 0L + counts[OutboxStatus.DELIVERED] shouldBe 0L + } + } + + `when`("the endpoint is unreachable") { + wiremock.stubFor( + post(urlEqualTo("/api/notify")) + .willReturn( + aResponse().withFault( + com.github.tomakehurst.wiremock.http.Fault.CONNECTION_RESET_BY_PEER, + ), + ), + ) + + transaction { + publisher.publish( + OutboxMessage("order.created", """{"orderId":"net-000"}"""), + httpDeliveryInfo { + serviceName = "notification-service" + endpointPath = "/api/notify" + }, + ) + } + + transaction { processor.processNext() } + + val counts = transaction { store.countByStatuses() } + + then("entry stays PENDING (retriable network failure)") { + counts[OutboxStatus.PENDING] shouldBe 1L + } + } + } + })