Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,21 @@ private static ChangeMessageVisibilityBatchRequest createChangeMessageVisibility
identifiedRequest.message()))
.collect(Collectors.toList());

// All requests have the same overrideConfiguration, so it's sufficient to retrieve it from the first request.
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0)
.message()
.overrideConfiguration();
// All requests have the same queueUrl and overrideConfiguration, so retrieve them from the first request.
ChangeMessageVisibilityRequest firstRequest = identifiedRequests.get(0).message();
String queueUrl = firstRequest.queueUrl();
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = firstRequest.overrideConfiguration();

return overrideConfiguration
.map(config -> ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(batchKey)
.queueUrl(queueUrl)
.overrideConfiguration(config.toBuilder()
.applyMutation(USER_AGENT_APPLIER)
.build())
.entries(entries)
.build())
.orElseGet(() -> ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(batchKey)
.queueUrl(queueUrl)
.overrideConfiguration(o -> o
.applyMutation(USER_AGENT_APPLIER)
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ private static DeleteMessageBatchRequest createDeleteMessageBatchRequest(
))
.collect(Collectors.toList());

// Since requests are batched together according to a combination of their queueUrl and overrideConfiguration,
// all requests must have the same overrideConfiguration, so it is sufficient to retrieve it from the first request.
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0).message()
.overrideConfiguration();
// Since requests are batched together by queueUrl + overrideConfiguration, all requests in a batch share the
// same queueUrl and overrideConfiguration; retrieve them from the first request.
DeleteMessageRequest firstRequest = identifiedRequests.get(0).message();
String queueUrl = firstRequest.queueUrl();
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = firstRequest.overrideConfiguration();

return overrideConfiguration.map(
overrideConfig -> DeleteMessageBatchRequest.builder()
.queueUrl(batchKey)
.queueUrl(queueUrl)
.overrideConfiguration(
overrideConfig.toBuilder()
.applyMutation(USER_AGENT_APPLIER)
Expand All @@ -75,7 +76,7 @@ private static DeleteMessageBatchRequest createDeleteMessageBatchRequest(
.build()
).orElseGet(
() -> DeleteMessageBatchRequest.builder()
.queueUrl(batchKey)
.queueUrl(queueUrl)
.overrideConfiguration(o ->
o.applyMutation(USER_AGENT_APPLIER).build()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,21 @@ private static SendMessageBatchRequest createSendMessageBatchRequest(
identifiedRequest.message()))
.collect(Collectors.toList());

// All requests must have the same overrideConfiguration, so retrieve it from the first request.
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0)
.message()
.overrideConfiguration();
// All requests must have the same queueUrl and overrideConfiguration, so retrieve them from the first request.
SendMessageRequest firstRequest = identifiedRequests.get(0).message();
String queueUrl = firstRequest.queueUrl();
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = firstRequest.overrideConfiguration();

return overrideConfiguration
.map(overrideConfig -> SendMessageBatchRequest.builder()
.queueUrl(batchKey)
.queueUrl(queueUrl)
.overrideConfiguration(overrideConfig.toBuilder()
.applyMutation(USER_AGENT_APPLIER)
.build())
.entries(entries)
.build())
.orElseGet(() -> SendMessageBatchRequest.builder()
.queueUrl(batchKey)
.queueUrl(queueUrl)
.overrideConfiguration(o -> o.applyMutation(USER_AGENT_APPLIER))
.entries(entries)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,47 @@ public void changeVisibilityBatchFunctionReturnsWithError_completeMessagesExcept
assertThatThrownBy(() -> response2.get(3, TimeUnit.SECONDS)).hasCauseInstanceOf(SqsException.class).hasMessageContaining("Status Code: 400");
}

@Test
public void sendMessageWithOverrideConfig_batchRequestUsesOriginalQueueUrl() {
String id = "0";
String responseBody = String.format(
"{\"Successful\":[{\"Id\":\"%s\",\"MD5OfMessageBody\":\"dummy\"}]}", id);

stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody(responseBody)));

sendMessageWithOverrideConfig(DEFAULT_QUEUE_URL, "test-body").join();

// The batch request QueueUrl must be the original URL, not batchKey (queueUrl + overrideConfig.hashCode())
verify(anyRequestedFor(anyUrl())
.withRequestBody(containing("\"QueueUrl\":\"" + DEFAULT_QUEUE_URL + "\"")));
}

@Test
public void deleteMessageWithOverrideConfig_batchRequestUsesOriginalQueueUrl() throws Exception {
String id = "0";
String responseBody = String.format("{\"Successful\":[{\"Id\":\"%s\"}]}", id);

stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody(responseBody)));

deleteMessageWithOverrideConfig(DEFAULT_QUEUE_URL, "receipt-handle").get(3, TimeUnit.SECONDS);

verify(anyRequestedFor(anyUrl())
.withRequestBody(containing("\"QueueUrl\":\"" + DEFAULT_QUEUE_URL + "\"")));
}

@Test
public void changeMessageVisibilityWithOverrideConfig_batchRequestUsesOriginalQueueUrl() throws Exception {
String id = "0";
String responseBody = String.format("{\"Successful\":[{\"Id\":\"%s\"}]}", id);

stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody(responseBody)));

changeMessageVisibilityWithOverrideConfig(DEFAULT_QUEUE_URL, "receipt-handle").get(3, TimeUnit.SECONDS);

verify(anyRequestedFor(anyUrl())
.withRequestBody(containing("\"QueueUrl\":\"" + DEFAULT_QUEUE_URL + "\"")));
}

public abstract List<CompletableFuture<SendMessageResponse>> createAndSendSendMessageRequests(String message1,
String message2);

Expand All @@ -366,6 +407,12 @@ public abstract List<CompletableFuture<SendMessageResponse>> createAndSendSendMe

public abstract List<CompletableFuture<ChangeMessageVisibilityResponse>> createAndSendChangeVisibilityRequests();

public abstract CompletableFuture<SendMessageResponse> sendMessageWithOverrideConfig(String queueUrl, String messageBody);

public abstract CompletableFuture<DeleteMessageResponse> deleteMessageWithOverrideConfig(String queueUrl, String receiptHandle);

public abstract CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibilityWithOverrideConfig(String queueUrl, String receiptHandle);

private String getMd5Hash(String message) {
byte[] expectedMd5;
expectedMd5 = Md5Utils.computeMD5Hash(message.getBytes(StandardCharsets.UTF_8));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -103,4 +107,33 @@ public List<CompletableFuture<ChangeMessageVisibilityResponse>> createAndSendCha
responses.add(batchManager.changeMessageVisibility(builder -> builder.queueUrl(DEFAULT_QUEUE_URL)));
return responses;
}

@Override
public CompletableFuture<SendMessageResponse> sendMessageWithOverrideConfig(String queueUrl, String messageBody) {
return batchManager.sendMessage(SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(messageBody)
.overrideConfiguration(AwsRequestOverrideConfiguration.builder().build())
.build());
}

@Override
public CompletableFuture<DeleteMessageResponse> deleteMessageWithOverrideConfig(String queueUrl, String receiptHandle) {
return batchManager.deleteMessage(DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(receiptHandle)
.overrideConfiguration(AwsRequestOverrideConfiguration.builder().build())
.build());
}

@Override
public CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibilityWithOverrideConfig(String queueUrl,
String receiptHandle) {
return batchManager.changeMessageVisibility(ChangeMessageVisibilityRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(receiptHandle)
.visibilityTimeout(30)
.overrideConfiguration(AwsRequestOverrideConfiguration.builder().build())
.build());
}
}