From b7e5fd1d2f311b3de3b4a041b4822f50c55bcf77 Mon Sep 17 00:00:00 2001 From: Martin Bonnin Date: Tue, 20 Jan 2026 19:11:32 +0100 Subject: [PATCH] Implement retry for snapshots/deployment upload/status check --- .../task/nmcpPublishWithPublisherApi.kt | 78 +++++----- .../main/kotlin/nmcp/transport/transport.kt | 147 +++++++++++++----- 2 files changed, 146 insertions(+), 79 deletions(-) diff --git a/nmcp-tasks/src/main/kotlin/nmcp/internal/task/nmcpPublishWithPublisherApi.kt b/nmcp-tasks/src/main/kotlin/nmcp/internal/task/nmcpPublishWithPublisherApi.kt index 852b5b5..10b4cca 100644 --- a/nmcp-tasks/src/main/kotlin/nmcp/internal/task/nmcpPublishWithPublisherApi.kt +++ b/nmcp-tasks/src/main/kotlin/nmcp/internal/task/nmcpPublishWithPublisherApi.kt @@ -11,6 +11,8 @@ import kotlin.time.TimeSource.Monotonic.markNow import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.JsonPrimitive +import nmcp.transport.Success +import nmcp.transport.executeWithRetries import nmcp.transport.nmcpClient import okhttp3.MediaType.Companion.toMediaType import okhttp3.MultipartBody @@ -60,20 +62,17 @@ internal fun nmcpPublishWithPublisherApi( val url = baseUrl + "api/v1/publisher/upload?publishingType=$publishingType" logger.lifecycle("Uploading deployment to '$url'") - val deploymentId = Request.Builder() + val request = Request.Builder() .post(body) .addHeader("Authorization", "Bearer $token") .url(url) .build() - .let { - nmcpClient.newCall(it).execute() - }.use { - if (!it.isSuccessful) { - error("Cannot deploy to maven central (status='${it.code}'): ${it.body.string()}") - } + val result = executeWithRetries(logger, nmcpClient, request) - it.body.string() - } + if (result !is Success) { + error("Cannot upload deployment to maven central: ($result)}") + } + val deploymentId = result.body.use { it.readUtf8() } logger.lifecycle("Nmcp: deployment bundle '$deploymentId' uploaded.") @@ -118,6 +117,7 @@ private fun waitFor( } val status = verifyStatus( + logger = logger, deploymentId = deploymentId, baseUrl = baseUrl, token = token, @@ -137,9 +137,6 @@ private fun waitFor( private sealed interface Status -// A deployment has successfully been uploaded to Maven Central -private data object UNKNOWN_QUERY_LATER : Status - // A deployment is uploaded and waiting for processing by the validation service private data object PENDING : Status @@ -159,47 +156,42 @@ private data object PUBLISHED : Status private class FAILED(val error: String) : Status private fun verifyStatus( + logger: GLogger, deploymentId: String, baseUrl: String, token: String, ): Status { - Request.Builder() + val request = Request.Builder() .post(ByteString.EMPTY.toRequestBody()) .addHeader("Authorization", "Bearer $token") .url(baseUrl + "api/v1/publisher/status?id=$deploymentId") .build() - .let { - try { - nmcpClient.newCall(it).execute() - } catch (_: SocketTimeoutException) { - return UNKNOWN_QUERY_LATER - } - }.use { - if (!it.isSuccessful) { - error("Cannot verify deployment $deploymentId status (HTTP status='${it.code}'): ${it.body.string()}") - } + val result = executeWithRetries(logger, nmcpClient, request) + if (result !is Success) { + error("Cannot verify deployment $deploymentId status ($result)") + } - val str = it.body.string() - val element = Json.parseToJsonElement(str) - check(element is JsonObject) { - "Nmcp: unexpected status response for deployment $deploymentId: $str" - } + val str = result.body.use { it.readUtf8() } + val element = Json.parseToJsonElement(str) + check(element is JsonObject) { + "Nmcp: unexpected status response for deployment $deploymentId: $str" + } - val state = element["deploymentState"] - check(state is JsonPrimitive && state.isString) { - "Nmcp: unexpected deploymentState for deployment $deploymentId: $state" - } + val state = element["deploymentState"] + check(state is JsonPrimitive && state.isString) { + "Nmcp: unexpected deploymentState for deployment $deploymentId: $state" + } - return when (state.content) { - "PENDING" -> PENDING - "VALIDATING" -> VALIDATING - "VALIDATED" -> VALIDATED - "PUBLISHING" -> PUBLISHING - "PUBLISHED" -> PUBLISHED - "FAILED" -> { - FAILED(element["errors"].toString()) - } - else -> error("Nmcp: unexpected deploymentState for deployment $deploymentId: $state") - } + return when (state.content) { + "PENDING" -> PENDING + "VALIDATING" -> VALIDATING + "VALIDATED" -> VALIDATED + "PUBLISHING" -> PUBLISHING + "PUBLISHED" -> PUBLISHED + "FAILED" -> { + FAILED(element["errors"].toString()) } + else -> error("Nmcp: unexpected deploymentState for deployment $deploymentId: $state") + } + } diff --git a/nmcp-tasks/src/main/kotlin/nmcp/transport/transport.kt b/nmcp-tasks/src/main/kotlin/nmcp/transport/transport.kt index 2e16475..f57d728 100644 --- a/nmcp-tasks/src/main/kotlin/nmcp/transport/transport.kt +++ b/nmcp-tasks/src/main/kotlin/nmcp/transport/transport.kt @@ -2,13 +2,16 @@ package nmcp.transport import gratatouille.tasks.GLogger import java.io.File +import kotlin.math.pow import okhttp3.HttpUrl.Companion.toHttpUrl import okhttp3.MediaType import okhttp3.MediaType.Companion.toMediaType +import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.RequestBody import okio.BufferedSink import okio.BufferedSource +import okio.IOException import okio.buffer import okio.sink import okio.source @@ -88,25 +91,21 @@ internal class HttpTransport( logger.info("Nmcp: get '$url'") - val response = Request.Builder() + val request = Request.Builder() .get() .url(url) .maybeAddAuthorization(getAuthorization) .build() - .let { - client.newCall(it).execute() - } - if (response.code == 404) { - response.close() + val result = executeWithRetries(logger, client, request) + if (result is HttpError && result.code == 404) { return null } - if (!response.isSuccessful) { - response.close() - error("Nmcp: cannot GET '$url' (statusCode=${response.code}):\n${response.body.string()}") + if (result !is Success) { + error("Nmcp: cannot GET '$url' (${result})") } - return response.body.source() + return result.body } override fun put(path: String, body: Content) { @@ -116,41 +115,117 @@ internal class HttpTransport( logger.info("Nmcp: put '$url'") - Request.Builder() + val request = Request.Builder() .put(body.toRequestBody()) .url(url) .maybeAddAuthorization(putAuthorization) .build() - .let { - client.newCall(it).execute() - }.use { response -> - check(response.isSuccessful) { - buildString { - appendLine("Nmcp: cannot PUT '$url' (statusCode=${response.code}).") - appendLine("Response body: '${response.body.string()}'") - when (response.code) { - 400 -> { - appendLine("Things to double check:") - appendLine("Your artifacts have proper extensions (.jar, .pom, ...).") - appendLine("If publishing a XML file, the XML version is 1.0.") - appendLine("If publishing a snapshot, the artifacts version is ending with `-SNAPSHOT`.") - } - 401 -> { - appendLine("Check your credentials") - appendLine("If publishing a snapshot, make sure you enabled snapshots on your namespace at https://central.sonatype.com/publishing/namespaces.") - } - 403 -> { - appendLine("Check that you are publishing to the correct groupId.") - } - 429 -> { - appendLine("Too many requests, try again later") - } - } + + val result = executeWithRetries(logger, client, request) + if (result is Success) { + result.body.close() + return + } + + val error = buildString { + appendLine("Nmcp: cannot PUT '$url'") + appendLine("$result") + if (result is HttpError) { + when (result.code) { + 400 -> { + appendLine("Things to double check:") + appendLine("Your artifacts have proper extensions (.jar, .pom, ...).") + appendLine("If publishing a XML file, the XML version is 1.0.") + appendLine("If publishing a snapshot, the artifacts version is ending with `-SNAPSHOT`.") + } + 401 -> { + appendLine("Check your credentials") + appendLine("If publishing a snapshot, make sure you enabled snapshots on your namespace at https://central.sonatype.com/publishing/namespaces.") + } + 403 -> { + appendLine("Check that you are publishing to the correct groupId.") + } + 429 -> { + appendLine("Too many requests, try again later") } } } + } + error(error) + } +} + +/** + * In some cases, 401 is actually retryable. + * This is the case for: + * - PUT on htps://central.sonatype.com/repository/maven-snapshots/ + * - verification of a deployment + * + * This is quite unexpected, and we code defensively here to be robust to those cases. + * We also retry other errors. + * + * Example of transient 401: + * ``` + * Execution failed for task ':nmcpPublishAggregationToCentralPortal'. + * > A failure occurred while executing nmcp.internal.task.NmcpPublishWithPublisherApiWorkAction + * > Cannot verify deployment fbed2636-e25d-4538-be7d-7693d475595d status (HTTP status='401'): {"error":{"message":"Invalid token"}} + * ``` + * + * TODO: + * - rework this to not block the thread. + * - move the logic to some upper, sonatype-only layer + * - fine tune the retry logic. Do we want to retry everything like we do here? Or are some HTTP errors actually + * not retryable? + * + * @return the result. If the result is a success, the caller MUST close its body. + */ +internal fun executeWithRetries(logger: GLogger, client: OkHttpClient, request: Request): Result { + var attempt = 0 + val attemptCount = 3 + while(true) { + val result = executeInternal(client, request) + if (result is Success) { + return result + } + if (result is HttpError && result.code == 404) { + // 404 is not retryable + return result + } + if (attempt == attemptCount - 1) { + return result + } + + logger.lifecycle("Nmcp: put '${request.url}' failed (${result}), retrying... (attempt ${attempt + 1}/${attemptCount})") + Thread.sleep(2.0.pow(attempt.toDouble()).toLong() * 1_000) + attempt++ + } +} + +internal fun executeInternal(client: OkHttpClient, request: Request): Result { + return try { + val response = client.newCall(request).execute() + if (response.isSuccessful) { + return Success(response.body.source()) + } + + HttpError(response.code, response.body.string()) + } catch (e: IOException) { + NetworkError(e) + } +} + +internal sealed interface Result +internal class NetworkError(val exception: IOException) : Result { + override fun toString(): String { + return "NetworkError: ${exception.message}" + } +} +internal class HttpError(val code: Int, val body: String): Result { + override fun toString(): String { + return "HTTP error $code: '$body'" } } +internal class Success(val body: BufferedSource) : Result fun Content.toRequestBody(): RequestBody { return object : RequestBody() {