Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,6 @@ public void performDocumentOperationOnDeletedContainer(OperationType operationTy

Thread.sleep(5000);

// Create an item in the container
TestObject testObject = TestObject.create(this.createdItemPk);
testContainer.createItem(testObject).block();

// Create a different client instance to delete the container
deletingAsyncClient = getClientBuilder()
.endpoint(TestConfigurations.HOST)
Expand All @@ -337,23 +333,11 @@ public void performDocumentOperationOnDeletedContainer(OperationType operationTy
.as("Status code should be 404 (Not Found)")
.isEqualTo(HttpConstants.StatusCodes.NOTFOUND);

// Verify sub-status code is either 0 or 1003

if (ConnectionMode.DIRECT.name().equals(accessor.getConnectionMode(clientToUse))) {
assertThat(diagnosticsContext.getSubStatusCode())
.as("Sub-status code should be 1003")
.isIn(
HttpConstants.SubStatusCodes.OWNER_RESOURCE_NOT_EXISTS
);
}

if (ConnectionMode.GATEWAY.name().equals(accessor.getConnectionMode(clientToUse))) {
assertThat(diagnosticsContext.getSubStatusCode())
.as("Sub-status code should be 0")
.isIn(
HttpConstants.SubStatusCodes.UNKNOWN
);
}
assertThat(diagnosticsContext.getSubStatusCode())
.as("Sub-status code should be 1003")
.isIn(
HttpConstants.SubStatusCodes.OWNER_RESOURCE_NOT_EXISTS
);
} finally {
safeClose(clientToUse);
safeClose(deletingAsyncClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ public void faultInjectionServerErrorRuleTests_ServerErrorResponse(
}

@Test(groups = { "fast", "fi-multi-master", "multi-region" }, dataProvider = "faultInjectionOperationTypeProviderForLeaseNotFound", timeOut = TIMEOUT)
public void faultInjectionServerErrorRuleTests_LeaseNotFound(OperationType operationType, FaultInjectionOperationType faultInjectionOperationType, boolean primaryAddressOnly, boolean isReadMany) throws JsonProcessingException {
public void faultInjectionServerErrorRuleTests_LeaseNotFound(OperationType operationType, FaultInjectionOperationType faultInjectionOperationType, boolean primaryAddressOnly, boolean isReadMany) throws JsonProcessingException, InterruptedException {

boolean shouldRetryCrossRegion = false;

Expand Down Expand Up @@ -996,7 +996,27 @@ public void faultInjectionServerErrorRuleTests_LeaseNotFound(OperationType opera
ruleId,
shouldRetryCrossRegion
);
this.validateAddressRefreshWithForceRefresh(cosmosDiagnostics, (operationType == OperationType.ReadFeed || operationType == OperationType.Query));
// Allow time for the background address refresh to complete before validating diagnostics.
// The address refresh for LEASE_NOT_FOUND is triggered asynchronously via
// startBackgroundAddressRefresh() on Schedulers.boundedElastic().
// Instead of a fixed sleep, poll until the validation passes or a timeout is reached
long addressRefreshDeadlineNanos = System.nanoTime() + Duration.ofSeconds(5).toNanos();
AssertionError lastAssertionError = null;
while (System.nanoTime() < addressRefreshDeadlineNanos) {
try {
this.validateAddressRefreshWithForceRefresh(
cosmosDiagnostics,
(operationType == OperationType.ReadFeed || operationType == OperationType.Query));
lastAssertionError = null;
break;
} catch (AssertionError assertionError) {
lastAssertionError = assertionError;
Thread.sleep(100);
}
}
if (lastAssertionError != null) {
throw lastAssertionError;
}

} finally {
serverErrorRule.disable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,18 @@ protected static void truncateCollection(DocumentCollection collection) {
logger.info("Truncating DocumentCollection {} documents ...", collection.getId());

String altLink = collection.getAltLink();
String[] altLinkSegments = altLink.split("/");
// altLink format: dbs/{dbName}/colls/{collName}
// Normalize altLink so both "dbs/.../colls/..." and "/dbs/.../colls/..." are handled consistently.
String normalizedAltLink = StringUtils.strip(altLink, "/");
String[] altLinkSegments = normalizedAltLink.split("/");
// altLink format (after normalization): dbs/{dbName}/colls/{collName}
String databaseName = altLinkSegments[1];
String containerName = altLinkSegments[3];

CosmosAsyncContainer container = cosmosClient.getDatabase(databaseName).getContainer(containerName);

Flux<CosmosItemOperation> deleteOperations =
container
.queryItems( "SELECT * FROM root", Document.class)
.queryItems( "SELECT * FROM root", options, Document.class)
.byPage()
.publishOn(Schedulers.parallel())
.flatMap(page -> Flux.fromIterable(page.getResults()))
Expand Down Expand Up @@ -287,7 +289,6 @@ protected static void truncateCollection(DocumentCollection collection) {
.byPage()
.publishOn(Schedulers.parallel()).flatMap(page -> Flux.fromIterable(page.getResults()))
.flatMap(udf -> {
RequestOptions requestOptions = new RequestOptions();
return container.getScripts().getUserDefinedFunction(udf.getId()).delete();
})
.then()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ public void dataPlaneRequestHitsLeaseNotFoundAndResourceThrottleFirstPreferredRe
FaultInjectionOperationType faultInjectionOperationType,
boolean shouldUsePreferredRegionsOnClient,
boolean isReadMany,
int hitLimit) {
int hitLimit) throws InterruptedException {

boolean shouldRetryCrossRegion = false;

Expand Down Expand Up @@ -678,6 +678,11 @@ public void dataPlaneRequestHitsLeaseNotFoundAndResourceThrottleFirstPreferredRe

CosmosFaultInjectionHelper.configureFaultInjectionRules(testContainer, Arrays.asList(tooManyRequestsRule, leaseNotFoundFaultRule)).block();

if (shouldRetryCrossRegion) {
// add some delay so to allow the data can be replicated cross regions
Thread.sleep(200);
}

CosmosDiagnostics cosmosDiagnostics
= this.performDocumentOperation(testContainer, operationType, createdItem, testItem -> new PartitionKey(testItem.getMypk()), isReadMany)
.block();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,7 @@ public void ownerNullAcquiring() throws InterruptedException {

bulkInsert(createdFeedCollection, docDefList)
.last()
.flatMap(cosmosItemResponse -> {
.flatMap(response -> {
logger.info("Start first Change feed processor");
return changeFeedProcessorFirst.start().subscribeOn(Schedulers.boundedElastic())
.timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1795,7 +1795,7 @@ public void getCurrentStateWithFaultInjection(FaultInjectionServerErrorType faul
}
}

@Test(groups = {"query" }, timeOut = 2 * TIMEOUT)
@Test(groups = {"query" }, timeOut = 3 * TIMEOUT)
public void readFeedDocumentsWithThroughputControl() throws InterruptedException {
// Create a separate client as throughput control group will be applied to it
CosmosAsyncClient clientWithThroughputControl =
Expand All @@ -1819,8 +1819,15 @@ public void readFeedDocumentsWithThroughputControl() throws InterruptedException
Map<String, JsonNode> receivedDocuments = new ConcurrentHashMap<>();

int maxItemCount = 100; // force the RU usage per requests > 1
int feedCount = maxItemCount * 2; // force to do two fetches
setupReadFeedDocuments(createdDocuments, createdFeedCollection, feedCount);

// force to do two fetches
for (int i = 0; i < 2; i++) {
setupReadFeedDocuments(
createdDocuments,
this.createdDatabase.getContainer(createdFeedCollection.getId()),
maxItemCount,
true);
}

changeFeedProcessor = new ChangeFeedProcessorBuilder()
.hostName(hostName)
Expand Down Expand Up @@ -2344,13 +2351,23 @@ private void setupReadFeedDocuments(
List<InternalObjectNode> createdDocuments,
CosmosAsyncContainer feedCollection,
long count) {

setupReadFeedDocuments(createdDocuments, feedCollection, count, false);
}

private void setupReadFeedDocuments(
List<InternalObjectNode> createdDocuments,
CosmosAsyncContainer feedCollection,
long count,
boolean bulkEnabled) {
List<InternalObjectNode> docDefList = new ArrayList<>();

for(int i = 0; i < count; i++) {
docDefList.add(getDocumentDefinition());
}

createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false));
createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, bulkEnabled));

for (InternalObjectNode current : createdDocuments) {
try {
logger.info("CREATED {}", OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(current));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1688,7 +1688,7 @@ public void endToEndTimeoutConfigShouldBeSuppressed() throws InterruptedExceptio
}
}

@Test(groups = { "long-emulator" }, timeOut = 2 * TIMEOUT)
@Test(groups = { "long-emulator" }, timeOut = 3 * TIMEOUT)
public void readFeedDocumentsWithThroughputControl() throws InterruptedException {
// Create a separate client as throughput control group will be applied to it
CosmosAsyncClient clientWithThroughputControl =
Expand All @@ -1712,12 +1712,15 @@ public void readFeedDocumentsWithThroughputControl() throws InterruptedException
Map<String, JsonNode> receivedDocuments = new ConcurrentHashMap<>();

int maxItemCount = 100; // force the RU usage per requests > 1
int feedCount = maxItemCount * 2; // force to do two fetches
// force to do two fetches
// using the original client to create the docs to isolate possible throttling
setupReadFeedDocuments(
createdDocuments,
this.createdDatabase.getContainer(createdFeedCollection.getId()),
feedCount);
for (int i = 0; i < 2; i++) {
setupReadFeedDocuments(
createdDocuments,
this.createdDatabase.getContainer(createdFeedCollection.getId()),
maxItemCount,
true);
}

changeFeedProcessor = new ChangeFeedProcessorBuilder()
.hostName(hostName)
Expand Down Expand Up @@ -2229,14 +2232,26 @@ public void afterClass() {
safeClose(client);
}

private void setupReadFeedDocuments(List<InternalObjectNode> createdDocuments, CosmosAsyncContainer feedCollection, long count) {
private void setupReadFeedDocuments(
List<InternalObjectNode> createdDocuments,
CosmosAsyncContainer feedCollection,
long count) {
setupReadFeedDocuments(createdDocuments, feedCollection, count, false);
}

private void setupReadFeedDocuments(
List<InternalObjectNode> createdDocuments,
CosmosAsyncContainer feedCollection,
long count,
boolean bulkEnabled) {
List<InternalObjectNode> docDefList = new ArrayList<>();

for(int i = 0; i < count; i++) {
docDefList.add(getDocumentDefinition());
}

createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false));
createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, bulkEnabled));

waitIfNeededForReplicasToCatchUp(getClientBuilder());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ private RxDocumentClientImpl(URI serviceEndpoint,
if (value == null) {
return 1;
}

return value + 1;
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
CosmosException clientException = Utils.as(e, CosmosException.class);
if (isServerNameCacheStaledException(clientException) || isGatewayStaledContainerException(clientException)) {
if (!this.retried) {
logger.debug("Received name cache staled exception for collection [{}] with statusCode {}, subStatusCode {}, going to retry",
collectionLink,
clientException.getStatusCode(),
clientException.getSubStatusCode());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if status code and substatus code are enough to find out which stale exception is hit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it should be, but also just want to log the line that SDK is retrying for the staled cache


// 1. refresh the collection cache if needed
// 2. If the collection rid has changed, then also clean up session container for old containerRid
AtomicReference<String> oldCollectionRid = new AtomicReference<>();
Expand Down
Loading