Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package com.softwaremill.okapi.core

/**
* Outcome of a single delivery attempt by a [MessageDeliverer].
*
* [OutboxEntryProcessor] uses this to decide whether to retry, mark as failed,
* or mark as delivered.
*/
sealed interface DeliveryResult {
data object Success : DeliveryResult

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package com.softwaremill.okapi.core

import java.time.Instant

/**
* Persistent representation of an outbox message with delivery state.
*
* Created via [createPending] and progressed through [retry], [toDelivered],
* or [toFailed] — each returning a new immutable copy.
*/
data class OutboxEntry(
val outboxId: OutboxId,
val messageType: String,
Expand All @@ -15,6 +21,7 @@ data class OutboxEntry(
val lastError: String?,
val deliveryMetadata: String,
) {
/** Returns a copy scheduled for another delivery attempt. */
fun retry(now: Instant, lastError: String): OutboxEntry = copy(
status = OutboxStatus.PENDING,
updatedAt = now,
Expand All @@ -23,20 +30,23 @@ data class OutboxEntry(
lastError = lastError,
)

/** Returns a copy marked as permanently failed. */
fun toFailed(now: Instant, lastError: String): OutboxEntry = copy(
status = OutboxStatus.FAILED,
updatedAt = now,
lastAttempt = now,
lastError = lastError,
)

/** Returns a copy marked as successfully delivered. */
fun toDelivered(now: Instant): OutboxEntry = copy(
status = OutboxStatus.DELIVERED,
updatedAt = now,
lastAttempt = now,
)

companion object {
/** Creates a new PENDING entry from a [message] and [deliveryInfo]. */
fun createPending(message: OutboxMessage, deliveryInfo: DeliveryInfo, now: Instant): OutboxEntry =
createPending(message, deliveryType = deliveryInfo.type, deliveryMetadata = deliveryInfo.serialize(), now = now)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package com.softwaremill.okapi.core

import java.util.UUID

/** Unique identifier for an outbox entry. Wraps a [UUID] for type safety. */
@JvmInline
value class OutboxId(val raw: UUID) {
companion object {
/** Generates a new random identifier. */
fun new(): OutboxId = OutboxId(UUID.randomUUID())
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.softwaremill.okapi.core

/** Business message to be delivered via the transactional outbox. */
data class OutboxMessage(
val messageType: String,
val payload: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.softwaremill.okapi.core

/** Lifecycle status of an [OutboxEntry]. */
enum class OutboxStatus {
PENDING,
DELIVERED,
FAILED,
;

companion object {
/** Resolves a status by matching the given [value] against enum entry names. Throws if unknown. */
fun from(value: String): OutboxStatus = requireNotNull(entries.find { it.name == value }) {
"Unknown outbox status: $value"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.softwaremill.okapi.core

/** Determines how many retries are allowed after the initial delivery attempt before an entry is marked as [OutboxStatus.FAILED]. */
data class RetryPolicy(val maxRetries: Int) {
init {
require(maxRetries >= 0) { "maxRetries must be >= 0, got: $maxRetries" }
}

/** Returns `true` if [currentRetries] has not yet reached [maxRetries]. */
fun shouldRetry(currentRetries: Int): Boolean = currentRetries < maxRetries
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.softwaremill.okapi.core.DeliveryInfo

/**
* Delivery metadata for HTTP webhook transport.
*
* [serviceName] is resolved to a base URL via [ServiceUrlResolver] at delivery time.
* [endpointPath] is appended to form the full URL.
*/
data class HttpDeliveryInfo(
override val type: String = TYPE,
val serviceName: String,
Expand All @@ -22,6 +28,7 @@ data class HttpDeliveryInfo(
const val TYPE = "http"
private val mapper = jacksonObjectMapper()

/** Deserializes from JSON stored in [OutboxEntry.deliveryMetadata]. */
fun deserialize(json: String): HttpDeliveryInfo = mapper.readValue(json)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,16 @@ class HttpDeliveryInfoBuilder {
)
}

/**
* DSL for building [HttpDeliveryInfo].
*
* ```
* val info = httpDeliveryInfo {
* serviceName = "order-service"
* endpointPath = "/api/events"
* httpMethod = HttpMethod.POST
* header("X-Correlation-Id", correlationId)
* }
* ```
*/
fun httpDeliveryInfo(block: HttpDeliveryInfoBuilder.() -> Unit): HttpDeliveryInfo = HttpDeliveryInfoBuilder().apply(block).build()
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration

/**
* [MessageDeliverer] that sends outbox entries as HTTP requests via JDK [HttpClient].
*
* Status code classification:
* - 2xx → [DeliveryResult.Success]
* - 5xx, 429, 408 → [DeliveryResult.RetriableFailure] (configurable via [retriableStatusCodes])
* - other → [DeliveryResult.PermanentFailure]
*
* Connection errors are treated as retriable.
*/
class HttpMessageDeliverer(
private val urlResolver: ServiceUrlResolver,
private val httpClient: HttpClient = defaultHttpClient(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.softwaremill.okapi.http

/** HTTP methods supported by [HttpMessageDeliverer]. */
enum class HttpMethod {
POST,
PUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.softwaremill.okapi.core.DeliveryInfo

/**
* Delivery metadata for Kafka topic transport.
*
* [topic] is required. Optional [partitionKey] controls partition routing.
* Custom [headers] are sent as UTF-8 encoded Kafka record headers.
*/
data class KafkaDeliveryInfo(
override val type: String = TYPE,
val topic: String,
Expand All @@ -20,6 +26,7 @@ data class KafkaDeliveryInfo(
const val TYPE = "kafka"
private val mapper = jacksonObjectMapper()

/** Deserializes from JSON stored in [OutboxEntry.deliveryMetadata]. */
fun deserialize(json: String): KafkaDeliveryInfo = mapper.readValue(json)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,15 @@ class KafkaDeliveryInfoBuilder {
)
}

/**
* DSL for building [KafkaDeliveryInfo].
*
* ```
* val info = kafkaDeliveryInfo {
* topic = "order-events"
* partitionKey = orderId
* header("source", "checkout-service")
* }
* ```
*/
fun kafkaDeliveryInfo(block: KafkaDeliveryInfoBuilder.() -> Unit): KafkaDeliveryInfo = KafkaDeliveryInfoBuilder().apply(block).build()
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.errors.RetriableException
import java.util.concurrent.ExecutionException

/**
* [MessageDeliverer] that publishes outbox entries to Kafka topics.
*
* Uses the provided [Producer] to send records synchronously.
* Kafka [RetriableException]s map to [DeliveryResult.RetriableFailure];
* all other errors map to [DeliveryResult.PermanentFailure].
*/
class KafkaMessageDeliverer(
private val producer: Producer<String, String>,
) : MessageDeliverer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.time.Clock
import java.time.Instant
import java.util.UUID

/** MySQL [OutboxStore] implementation using Exposed. */
class MysqlOutboxStore(
private val clock: Clock,
) : OutboxStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.time.Clock
import java.time.Instant
import java.util.UUID

/** PostgreSQL [OutboxStore] implementation using Exposed. */
class PostgresOutboxStore(
private val clock: Clock,
) : OutboxStore {
Expand Down