From 50009ece9c1f639525e59c02950d3e9e578fb804 Mon Sep 17 00:00:00 2001 From: anuq Date: Sat, 14 Mar 2026 16:22:15 -0400 Subject: [PATCH] fix(sqs-batch-manager): use original queueUrl instead of batchKey in batch requests When a request has an overrideConfiguration, getBatchKey() returns queueUrl + overrideConfig.hashCode() as a composite key to correctly group requests for batching. However, createXxxBatchRequest() was passing this composite key as the QueueUrl in the actual SQS API call, producing a non-existent URL like: https://sqs.us-west-2.amazonaws.com/123/my-queue-461286369 This causes QueueDoesNotExistException for any caller that sets an overrideConfiguration on their request (e.g. the SQS extended client library which appends a user-agent via overrideConfiguration). Fix: extract queueUrl from the first request in the batch (safe because all requests in a batch share the same queueUrl by grouping invariant) and use that for the QueueUrl field in the batch API call. Affected: SendMessageBatchManager, DeleteMessageBatchManager, ChangeMessageVisibilityBatchManager. Fixes #6478 --- .../ChangeMessageVisibilityBatchManager.java | 12 ++--- .../DeleteMessageBatchManager.java | 13 ++--- .../batchmanager/SendMessageBatchManager.java | 12 ++--- .../batchmanager/BaseSqsBatchManagerTest.java | 47 +++++++++++++++++++ .../SqsAsyncBatchManagerTest.java | 33 +++++++++++++ 5 files changed, 99 insertions(+), 18 deletions(-) diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java index 910866e3088b..855c4729589c 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java @@ -59,21 +59,21 @@ private static ChangeMessageVisibilityBatchRequest createChangeMessageVisibility identifiedRequest.message())) .collect(Collectors.toList()); - // All requests have the same overrideConfiguration, so it's sufficient to retrieve it from the first request. - Optional overrideConfiguration = identifiedRequests.get(0) - .message() - .overrideConfiguration(); + // All requests have the same queueUrl and overrideConfiguration, so retrieve them from the first request. + ChangeMessageVisibilityRequest firstRequest = identifiedRequests.get(0).message(); + String queueUrl = firstRequest.queueUrl(); + Optional overrideConfiguration = firstRequest.overrideConfiguration(); return overrideConfiguration .map(config -> ChangeMessageVisibilityBatchRequest.builder() - .queueUrl(batchKey) + .queueUrl(queueUrl) .overrideConfiguration(config.toBuilder() .applyMutation(USER_AGENT_APPLIER) .build()) .entries(entries) .build()) .orElseGet(() -> ChangeMessageVisibilityBatchRequest.builder() - .queueUrl(batchKey) + .queueUrl(queueUrl) .overrideConfiguration(o -> o .applyMutation(USER_AGENT_APPLIER) .build()) diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java index 683d1f8a97ab..d13f8fb2605f 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java @@ -58,14 +58,15 @@ private static DeleteMessageBatchRequest createDeleteMessageBatchRequest( )) .collect(Collectors.toList()); - // Since requests are batched together according to a combination of their queueUrl and overrideConfiguration, - // all requests must have the same overrideConfiguration, so it is sufficient to retrieve it from the first request. - Optional overrideConfiguration = identifiedRequests.get(0).message() - .overrideConfiguration(); + // Since requests are batched together by queueUrl + overrideConfiguration, all requests in a batch share the + // same queueUrl and overrideConfiguration; retrieve them from the first request. + DeleteMessageRequest firstRequest = identifiedRequests.get(0).message(); + String queueUrl = firstRequest.queueUrl(); + Optional overrideConfiguration = firstRequest.overrideConfiguration(); return overrideConfiguration.map( overrideConfig -> DeleteMessageBatchRequest.builder() - .queueUrl(batchKey) + .queueUrl(queueUrl) .overrideConfiguration( overrideConfig.toBuilder() .applyMutation(USER_AGENT_APPLIER) @@ -75,7 +76,7 @@ private static DeleteMessageBatchRequest createDeleteMessageBatchRequest( .build() ).orElseGet( () -> DeleteMessageBatchRequest.builder() - .queueUrl(batchKey) + .queueUrl(queueUrl) .overrideConfiguration(o -> o.applyMutation(USER_AGENT_APPLIER).build() ) diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java index 18fcdc42192f..e21a477e076f 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java @@ -90,21 +90,21 @@ private static SendMessageBatchRequest createSendMessageBatchRequest( identifiedRequest.message())) .collect(Collectors.toList()); - // All requests must have the same overrideConfiguration, so retrieve it from the first request. - Optional overrideConfiguration = identifiedRequests.get(0) - .message() - .overrideConfiguration(); + // All requests must have the same queueUrl and overrideConfiguration, so retrieve them from the first request. + SendMessageRequest firstRequest = identifiedRequests.get(0).message(); + String queueUrl = firstRequest.queueUrl(); + Optional overrideConfiguration = firstRequest.overrideConfiguration(); return overrideConfiguration .map(overrideConfig -> SendMessageBatchRequest.builder() - .queueUrl(batchKey) + .queueUrl(queueUrl) .overrideConfiguration(overrideConfig.toBuilder() .applyMutation(USER_AGENT_APPLIER) .build()) .entries(entries) .build()) .orElseGet(() -> SendMessageBatchRequest.builder() - .queueUrl(batchKey) + .queueUrl(queueUrl) .overrideConfiguration(o -> o.applyMutation(USER_AGENT_APPLIER)) .entries(entries) .build()); diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BaseSqsBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BaseSqsBatchManagerTest.java index dc02826d1e90..e468b8c4b9c4 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BaseSqsBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BaseSqsBatchManagerTest.java @@ -357,6 +357,47 @@ public void changeVisibilityBatchFunctionReturnsWithError_completeMessagesExcept assertThatThrownBy(() -> response2.get(3, TimeUnit.SECONDS)).hasCauseInstanceOf(SqsException.class).hasMessageContaining("Status Code: 400"); } + @Test + public void sendMessageWithOverrideConfig_batchRequestUsesOriginalQueueUrl() { + String id = "0"; + String responseBody = String.format( + "{\"Successful\":[{\"Id\":\"%s\",\"MD5OfMessageBody\":\"dummy\"}]}", id); + + stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody(responseBody))); + + sendMessageWithOverrideConfig(DEFAULT_QUEUE_URL, "test-body").join(); + + // The batch request QueueUrl must be the original URL, not batchKey (queueUrl + overrideConfig.hashCode()) + verify(anyRequestedFor(anyUrl()) + .withRequestBody(containing("\"QueueUrl\":\"" + DEFAULT_QUEUE_URL + "\""))); + } + + @Test + public void deleteMessageWithOverrideConfig_batchRequestUsesOriginalQueueUrl() throws Exception { + String id = "0"; + String responseBody = String.format("{\"Successful\":[{\"Id\":\"%s\"}]}", id); + + stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody(responseBody))); + + deleteMessageWithOverrideConfig(DEFAULT_QUEUE_URL, "receipt-handle").get(3, TimeUnit.SECONDS); + + verify(anyRequestedFor(anyUrl()) + .withRequestBody(containing("\"QueueUrl\":\"" + DEFAULT_QUEUE_URL + "\""))); + } + + @Test + public void changeMessageVisibilityWithOverrideConfig_batchRequestUsesOriginalQueueUrl() throws Exception { + String id = "0"; + String responseBody = String.format("{\"Successful\":[{\"Id\":\"%s\"}]}", id); + + stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody(responseBody))); + + changeMessageVisibilityWithOverrideConfig(DEFAULT_QUEUE_URL, "receipt-handle").get(3, TimeUnit.SECONDS); + + verify(anyRequestedFor(anyUrl()) + .withRequestBody(containing("\"QueueUrl\":\"" + DEFAULT_QUEUE_URL + "\""))); + } + public abstract List> createAndSendSendMessageRequests(String message1, String message2); @@ -366,6 +407,12 @@ public abstract List> createAndSendSendMe public abstract List> createAndSendChangeVisibilityRequests(); + public abstract CompletableFuture sendMessageWithOverrideConfig(String queueUrl, String messageBody); + + public abstract CompletableFuture deleteMessageWithOverrideConfig(String queueUrl, String receiptHandle); + + public abstract CompletableFuture changeMessageVisibilityWithOverrideConfig(String queueUrl, String receiptHandle); + private String getMd5Hash(String message) { byte[] expectedMd5; expectedMd5 = Md5Utils.computeMD5Hash(message.getBytes(StandardCharsets.UTF_8)); diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManagerTest.java index c1348253daed..8d0f8b9cc5e0 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManagerTest.java @@ -20,6 +20,10 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -103,4 +107,33 @@ public List> createAndSendCha responses.add(batchManager.changeMessageVisibility(builder -> builder.queueUrl(DEFAULT_QUEUE_URL))); return responses; } + + @Override + public CompletableFuture sendMessageWithOverrideConfig(String queueUrl, String messageBody) { + return batchManager.sendMessage(SendMessageRequest.builder() + .queueUrl(queueUrl) + .messageBody(messageBody) + .overrideConfiguration(AwsRequestOverrideConfiguration.builder().build()) + .build()); + } + + @Override + public CompletableFuture deleteMessageWithOverrideConfig(String queueUrl, String receiptHandle) { + return batchManager.deleteMessage(DeleteMessageRequest.builder() + .queueUrl(queueUrl) + .receiptHandle(receiptHandle) + .overrideConfiguration(AwsRequestOverrideConfiguration.builder().build()) + .build()); + } + + @Override + public CompletableFuture changeMessageVisibilityWithOverrideConfig(String queueUrl, + String receiptHandle) { + return batchManager.changeMessageVisibility(ChangeMessageVisibilityRequest.builder() + .queueUrl(queueUrl) + .receiptHandle(receiptHandle) + .visibilityTimeout(30) + .overrideConfiguration(AwsRequestOverrideConfiguration.builder().build()) + .build()); + } }