diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java index aede5407719a..c74041dc2a3b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java @@ -307,10 +307,6 @@ public void performDocumentOperationOnDeletedContainer(OperationType operationTy Thread.sleep(5000); - // Create an item in the container - TestObject testObject = TestObject.create(this.createdItemPk); - testContainer.createItem(testObject).block(); - // Create a different client instance to delete the container deletingAsyncClient = getClientBuilder() .endpoint(TestConfigurations.HOST) @@ -337,23 +333,11 @@ public void performDocumentOperationOnDeletedContainer(OperationType operationTy .as("Status code should be 404 (Not Found)") .isEqualTo(HttpConstants.StatusCodes.NOTFOUND); - // Verify sub-status code is either 0 or 1003 - - if (ConnectionMode.DIRECT.name().equals(accessor.getConnectionMode(clientToUse))) { - assertThat(diagnosticsContext.getSubStatusCode()) - .as("Sub-status code should be 1003") - .isIn( - HttpConstants.SubStatusCodes.OWNER_RESOURCE_NOT_EXISTS - ); - } - - if (ConnectionMode.GATEWAY.name().equals(accessor.getConnectionMode(clientToUse))) { - assertThat(diagnosticsContext.getSubStatusCode()) - .as("Sub-status code should be 0") - .isIn( - HttpConstants.SubStatusCodes.UNKNOWN - ); - } + assertThat(diagnosticsContext.getSubStatusCode()) + .as("Sub-status code should be 1003") + .isIn( + HttpConstants.SubStatusCodes.OWNER_RESOURCE_NOT_EXISTS + ); } finally { safeClose(clientToUse); safeClose(deletingAsyncClient); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java index 08e1d7332eab..9d8981bcc375 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java @@ -948,7 +948,7 @@ public void faultInjectionServerErrorRuleTests_ServerErrorResponse( } @Test(groups = { "fast", "fi-multi-master", "multi-region" }, dataProvider = "faultInjectionOperationTypeProviderForLeaseNotFound", timeOut = TIMEOUT) - public void faultInjectionServerErrorRuleTests_LeaseNotFound(OperationType operationType, FaultInjectionOperationType faultInjectionOperationType, boolean primaryAddressOnly, boolean isReadMany) throws JsonProcessingException { + public void faultInjectionServerErrorRuleTests_LeaseNotFound(OperationType operationType, FaultInjectionOperationType faultInjectionOperationType, boolean primaryAddressOnly, boolean isReadMany) throws JsonProcessingException, InterruptedException { boolean shouldRetryCrossRegion = false; @@ -996,7 +996,27 @@ public void faultInjectionServerErrorRuleTests_LeaseNotFound(OperationType opera ruleId, shouldRetryCrossRegion ); - this.validateAddressRefreshWithForceRefresh(cosmosDiagnostics, (operationType == OperationType.ReadFeed || operationType == OperationType.Query)); + // Allow time for the background address refresh to complete before validating diagnostics. + // The address refresh for LEASE_NOT_FOUND is triggered asynchronously via + // startBackgroundAddressRefresh() on Schedulers.boundedElastic(). + // Instead of a fixed sleep, poll until the validation passes or a timeout is reached + long addressRefreshDeadlineNanos = System.nanoTime() + Duration.ofSeconds(5).toNanos(); + AssertionError lastAssertionError = null; + while (System.nanoTime() < addressRefreshDeadlineNanos) { + try { + this.validateAddressRefreshWithForceRefresh( + cosmosDiagnostics, + (operationType == OperationType.ReadFeed || operationType == OperationType.Query)); + lastAssertionError = null; + break; + } catch (AssertionError assertionError) { + lastAssertionError = assertionError; + Thread.sleep(100); + } + } + if (lastAssertionError != null) { + throw lastAssertionError; + } } finally { serverErrorRule.disable(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java index e8f7abc509ab..59b77e5a7f2f 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java @@ -194,8 +194,10 @@ protected static void truncateCollection(DocumentCollection collection) { logger.info("Truncating DocumentCollection {} documents ...", collection.getId()); String altLink = collection.getAltLink(); - String[] altLinkSegments = altLink.split("/"); - // altLink format: dbs/{dbName}/colls/{collName} + // Normalize altLink so both "dbs/.../colls/..." and "/dbs/.../colls/..." are handled consistently. + String normalizedAltLink = StringUtils.strip(altLink, "/"); + String[] altLinkSegments = normalizedAltLink.split("/"); + // altLink format (after normalization): dbs/{dbName}/colls/{collName} String databaseName = altLinkSegments[1]; String containerName = altLinkSegments[3]; @@ -203,7 +205,7 @@ protected static void truncateCollection(DocumentCollection collection) { Flux deleteOperations = container - .queryItems( "SELECT * FROM root", Document.class) + .queryItems( "SELECT * FROM root", options, Document.class) .byPage() .publishOn(Schedulers.parallel()) .flatMap(page -> Flux.fromIterable(page.getResults())) @@ -287,7 +289,6 @@ protected static void truncateCollection(DocumentCollection collection) { .byPage() .publishOn(Schedulers.parallel()).flatMap(page -> Flux.fromIterable(page.getResults())) .flatMap(udf -> { - RequestOptions requestOptions = new RequestOptions(); return container.getScripts().getUserDefinedFunction(udf.getId()).delete(); }) .then() diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index 446ee82008b8..1373af756094 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -606,7 +606,7 @@ public void dataPlaneRequestHitsLeaseNotFoundAndResourceThrottleFirstPreferredRe FaultInjectionOperationType faultInjectionOperationType, boolean shouldUsePreferredRegionsOnClient, boolean isReadMany, - int hitLimit) { + int hitLimit) throws InterruptedException { boolean shouldRetryCrossRegion = false; @@ -678,6 +678,11 @@ public void dataPlaneRequestHitsLeaseNotFoundAndResourceThrottleFirstPreferredRe CosmosFaultInjectionHelper.configureFaultInjectionRules(testContainer, Arrays.asList(tooManyRequestsRule, leaseNotFoundFaultRule)).block(); + if (shouldRetryCrossRegion) { + // add some delay so to allow the data can be replicated cross regions + Thread.sleep(200); + } + CosmosDiagnostics cosmosDiagnostics = this.performDocumentOperation(testContainer, operationType, createdItem, testItem -> new PartitionKey(testItem.getMypk()), isReadMany) .block(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java index 0f28d36c42e8..ca41077c14b2 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java @@ -1339,7 +1339,7 @@ public void ownerNullAcquiring() throws InterruptedException { bulkInsert(createdFeedCollection, docDefList) .last() - .flatMap(cosmosItemResponse -> { + .flatMap(response -> { logger.info("Start first Change feed processor"); return changeFeedProcessorFirst.start().subscribeOn(Schedulers.boundedElastic()) .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index fd4166871667..55b5f384d28d 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -1795,7 +1795,7 @@ public void getCurrentStateWithFaultInjection(FaultInjectionServerErrorType faul } } - @Test(groups = {"query" }, timeOut = 2 * TIMEOUT) + @Test(groups = {"query" }, timeOut = 3 * TIMEOUT) public void readFeedDocumentsWithThroughputControl() throws InterruptedException { // Create a separate client as throughput control group will be applied to it CosmosAsyncClient clientWithThroughputControl = @@ -1819,8 +1819,15 @@ public void readFeedDocumentsWithThroughputControl() throws InterruptedException Map receivedDocuments = new ConcurrentHashMap<>(); int maxItemCount = 100; // force the RU usage per requests > 1 - int feedCount = maxItemCount * 2; // force to do two fetches - setupReadFeedDocuments(createdDocuments, createdFeedCollection, feedCount); + + // force to do two fetches + for (int i = 0; i < 2; i++) { + setupReadFeedDocuments( + createdDocuments, + this.createdDatabase.getContainer(createdFeedCollection.getId()), + maxItemCount, + true); + } changeFeedProcessor = new ChangeFeedProcessorBuilder() .hostName(hostName) @@ -2344,13 +2351,23 @@ private void setupReadFeedDocuments( List createdDocuments, CosmosAsyncContainer feedCollection, long count) { + + setupReadFeedDocuments(createdDocuments, feedCollection, count, false); + } + + private void setupReadFeedDocuments( + List createdDocuments, + CosmosAsyncContainer feedCollection, + long count, + boolean bulkEnabled) { List docDefList = new ArrayList<>(); for(int i = 0; i < count; i++) { docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, bulkEnabled)); + for (InternalObjectNode current : createdDocuments) { try { logger.info("CREATED {}", OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(current)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index 6525617d94dd..36b3cd11de9a 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -1688,7 +1688,7 @@ public void endToEndTimeoutConfigShouldBeSuppressed() throws InterruptedExceptio } } - @Test(groups = { "long-emulator" }, timeOut = 2 * TIMEOUT) + @Test(groups = { "long-emulator" }, timeOut = 3 * TIMEOUT) public void readFeedDocumentsWithThroughputControl() throws InterruptedException { // Create a separate client as throughput control group will be applied to it CosmosAsyncClient clientWithThroughputControl = @@ -1712,12 +1712,15 @@ public void readFeedDocumentsWithThroughputControl() throws InterruptedException Map receivedDocuments = new ConcurrentHashMap<>(); int maxItemCount = 100; // force the RU usage per requests > 1 - int feedCount = maxItemCount * 2; // force to do two fetches + // force to do two fetches // using the original client to create the docs to isolate possible throttling - setupReadFeedDocuments( - createdDocuments, - this.createdDatabase.getContainer(createdFeedCollection.getId()), - feedCount); + for (int i = 0; i < 2; i++) { + setupReadFeedDocuments( + createdDocuments, + this.createdDatabase.getContainer(createdFeedCollection.getId()), + maxItemCount, + true); + } changeFeedProcessor = new ChangeFeedProcessorBuilder() .hostName(hostName) @@ -2229,14 +2232,26 @@ public void afterClass() { safeClose(client); } - private void setupReadFeedDocuments(List createdDocuments, CosmosAsyncContainer feedCollection, long count) { + private void setupReadFeedDocuments( + List createdDocuments, + CosmosAsyncContainer feedCollection, + long count) { + setupReadFeedDocuments(createdDocuments, feedCollection, count, false); + } + + private void setupReadFeedDocuments( + List createdDocuments, + CosmosAsyncContainer feedCollection, + long count, + boolean bulkEnabled) { List docDefList = new ArrayList<>(); for(int i = 0; i < count; i++) { docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, bulkEnabled)); + waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 083389edcd55..4c9643a3eafd 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -510,7 +510,7 @@ private RxDocumentClientImpl(URI serviceEndpoint, if (value == null) { return 1; } - + return value + 1; }); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/StaleResourceRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/StaleResourceRetryPolicy.java index 3e13377dcfc7..14b05436f3a0 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/StaleResourceRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/StaleResourceRetryPolicy.java @@ -87,6 +87,11 @@ public Mono shouldRetry(Exception e) { CosmosException clientException = Utils.as(e, CosmosException.class); if (isServerNameCacheStaledException(clientException) || isGatewayStaledContainerException(clientException)) { if (!this.retried) { + logger.debug("Received name cache staled exception for collection [{}] with statusCode {}, subStatusCode {}, going to retry", + collectionLink, + clientException.getStatusCode(), + clientException.getSubStatusCode()); + // 1. refresh the collection cache if needed // 2. If the collection rid has changed, then also clean up session container for old containerRid AtomicReference oldCollectionRid = new AtomicReference<>();