From cc4689f96d91518a6bed6a8fdd496cd76cde4b4c Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 30 Jan 2026 13:52:36 -0800 Subject: [PATCH 01/20] fix test pipeline --- .../com/azure/cosmos/implementation/RxDocumentClientImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index c8345e6b996a..86a048f2ebad 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -508,7 +508,7 @@ private RxDocumentClientImpl(URI serviceEndpoint, if (value == null) { return 1; } - + return value + 1; }); From 54dbd8bac1fac281ff33fb414b439724e629ea89 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 6 Feb 2026 11:50:44 -0800 Subject: [PATCH 02/20] fix few tests part 2 --- .../cosmos/implementation/TestSuiteBase.java | 75 ++++++++-- .../com/azure/cosmos/rx/TestSuiteBase.java | 138 +++++++++++++----- 2 files changed, 157 insertions(+), 56 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java index 5c59341d59c0..cfb5e01fec65 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java @@ -5,6 +5,8 @@ import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; +import com.azure.cosmos.CosmosException; import com.azure.cosmos.CosmosNettyLeakDetectorFactory; import com.azure.cosmos.DirectConnectionConfig; import com.azure.cosmos.DocumentClientTest; @@ -19,7 +21,10 @@ import com.azure.cosmos.models.CompositePath; import com.azure.cosmos.models.CompositePathSortOrder; import com.azure.cosmos.models.CosmosClientTelemetryConfig; +import com.azure.cosmos.models.CosmosItemOperation; import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperations; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.IncludedPath; import com.azure.cosmos.models.IndexingPolicy; @@ -199,24 +204,62 @@ protected static void truncateCollection(DocumentCollection collection) { logger.info("Truncating DocumentCollection {} documents ...", collection.getId()); - houseKeepingClient.queryDocuments(collection.getSelfLink(), "SELECT * FROM root", state, Document.class) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(doc -> { - RequestOptions requestOptions = new RequestOptions(); - - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - Object propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - propertyValue = Undefined.value(); + String altLink = collection.getAltLink(); + String[] altLinkSegments = altLink.split("/"); + // altLink format: dbs/{dbName}/colls/{collName} + String databaseName = altLinkSegments[1]; + String containerName = altLinkSegments[3]; + + Flux deleteOperations = + houseKeepingClient.queryDocuments(collection.getSelfLink(), "SELECT * FROM root", state, Document.class) + .publishOn(Schedulers.parallel()) + .flatMap(page -> Flux.fromIterable(page.getResults())) + .map(doc -> { + PartitionKey partitionKey; + if (paths != null && !paths.isEmpty()) { + List pkPath = PathParser.getPathParts(paths.get(0)); + Object propertyValue = doc.getObjectByPath(pkPath); + if (propertyValue == null) { + propertyValue = Undefined.value(); + } + partitionKey = new PartitionKey(propertyValue); + } else { + partitionKey = PartitionKey.NONE; } - requestOptions.setPartitionKey(new PartitionKey(propertyValue)); - } - - return houseKeepingClient.deleteDocument(doc.getSelfLink(), requestOptions); - }).then().block(); + return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey); + }); + + CosmosBulkExecutionOptions bulkOptions = new CosmosBulkExecutionOptions(); + ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor() + .getImpl(bulkOptions) + .setCosmosEndToEndLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) + .build()); + + cosmosClient.getDatabase(databaseName) + .getContainer(containerName) + .executeBulkOperations(deleteOperations, bulkOptions) + .flatMap(response -> { + if (response.getException() != null) { + Exception ex = response.getException(); + if (ex instanceof CosmosException) { + CosmosException cosmosException = (CosmosException) ex; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND + && cosmosException.getSubStatusCode() == 0) { + return Mono.empty(); + } + } + return Mono.error(ex); + } + if (response.getResponse() != null + && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { + return Mono.empty(); + } + return Mono.just(response); + }) + .blockLast(); logger.info("Truncating DocumentCollection {} triggers ...", collection.getId()); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index cef37b160f3b..a8d3391de2d5 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -45,8 +45,11 @@ import com.azure.cosmos.models.CosmosContainerRequestOptions; import com.azure.cosmos.models.CosmosDatabaseProperties; import com.azure.cosmos.models.CosmosDatabaseResponse; +import com.azure.cosmos.models.CosmosItemOperation; import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperations; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.CosmosResponse; import com.azure.cosmos.models.CosmosStoredProcedureRequestOptions; @@ -253,29 +256,56 @@ protected static void cleanUpContainer(CosmosAsyncContainer cosmosContainer) { options.setMaxDegreeOfParallelism(-1); int maxItemCount = 100; - cosmosContainer.queryItems("SELECT * FROM root", options, InternalObjectNode.class) - .byPage(maxItemCount) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(doc -> { - - PartitionKey partitionKey = null; - - Object propertyValue = null; - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - partitionKey = PartitionKey.NONE; + Flux deleteOperations = + cosmosContainer.queryItems("SELECT * FROM root", options, InternalObjectNode.class) + .byPage(maxItemCount) + .publishOn(Schedulers.parallel()) + .flatMap(page -> Flux.fromIterable(page.getResults())) + .map(doc -> { + PartitionKey partitionKey; + if (paths != null && !paths.isEmpty()) { + List pkPath = PathParser.getPathParts(paths.get(0)); + Object propertyValue = doc.getObjectByPath(pkPath); + if (propertyValue == null) { + partitionKey = PartitionKey.NONE; + } else { + partitionKey = new PartitionKey(propertyValue); + } } else { - partitionKey = new PartitionKey(propertyValue); + partitionKey = new PartitionKey(null); } - } else { - partitionKey = new PartitionKey(null); - } - return cosmosContainer.deleteItem(doc.getId(), partitionKey); - }).then().block(); + return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey); + }); + + CosmosBulkExecutionOptions bulkOptions = new CosmosBulkExecutionOptions(); + ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor() + .getImpl(bulkOptions) + .setCosmosEndToEndLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) + .build()); + + cosmosContainer.executeBulkOperations(deleteOperations, bulkOptions) + .flatMap(response -> { + if (response.getException() != null) { + Exception ex = response.getException(); + if (ex instanceof CosmosException) { + CosmosException cosmosException = (CosmosException) ex; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND + && cosmosException.getSubStatusCode() == 0) { + return Mono.empty(); + } + } + return Mono.error(ex); + } + if (response.getResponse() != null + && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { + return Mono.empty(); + } + return Mono.just(response); + }) + .blockLast(); } protected static void truncateCollection(CosmosAsyncContainer cosmosContainer) { @@ -341,29 +371,57 @@ private static void truncateCollectionInternal(CosmosAsyncContainer cosmosContai logger.info("Truncating collection {} documents ...", cosmosContainer.getId()); - cosmosContainer.queryItems("SELECT * FROM root", options, InternalObjectNode.class) - .byPage(maxItemCount) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(doc -> { - - PartitionKey partitionKey = null; - - Object propertyValue = null; - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - partitionKey = PartitionKey.NONE; + Flux deleteOperations = + cosmosContainer.queryItems("SELECT * FROM root", options, InternalObjectNode.class) + .byPage(maxItemCount) + .publishOn(Schedulers.parallel()) + .flatMap(page -> Flux.fromIterable(page.getResults())) + .map(doc -> { + PartitionKey partitionKey; + if (paths != null && !paths.isEmpty()) { + List pkPath = PathParser.getPathParts(paths.get(0)); + Object propertyValue = doc.getObjectByPath(pkPath); + if (propertyValue == null) { + partitionKey = PartitionKey.NONE; + } else { + partitionKey = new PartitionKey(propertyValue); + } } else { - partitionKey = new PartitionKey(propertyValue); + partitionKey = new PartitionKey(null); } - } else { - partitionKey = new PartitionKey(null); - } - return cosmosContainer.deleteItem(doc.getId(), partitionKey); - }).then().block(); + return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey); + }); + + CosmosBulkExecutionOptions truncateBulkOptions = new CosmosBulkExecutionOptions(); + ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor() + .getImpl(truncateBulkOptions) + .setCosmosEndToEndLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) + .build()); + + cosmosContainer + .executeBulkOperations(deleteOperations, truncateBulkOptions) + .flatMap(response -> { + if (response.getException() != null) { + Exception ex = response.getException(); + if (ex instanceof CosmosException) { + CosmosException cosmosException = (CosmosException) ex; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND + && cosmosException.getSubStatusCode() == 0) { + return Mono.empty(); + } + } + return Mono.error(ex); + } + if (response.getResponse() != null + && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { + return Mono.empty(); + } + return Mono.just(response); + }) + .blockLast(); expectCount(cosmosContainer, 0); From d2f3e1711a4dbf33c6c99df4cc6fc30336efbeb8 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 6 Feb 2026 12:40:25 -0800 Subject: [PATCH 03/20] fix some tests --- .../azure/cosmos/CosmosDiagnosticsTest.java | 3 - .../cosmos/rx/OrderbyDocumentQueryTest.java | 4 +- .../com/azure/cosmos/rx/TestSuiteBase.java | 61 +++++++++++++------ .../FullFidelityChangeFeedProcessorTest.java | 6 +- .../IncrementalChangeFeedProcessorTest.java | 6 +- .../IncrementalChangeFeedProcessorTest.java | 6 +- 6 files changed, 55 insertions(+), 31 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java index b0d6543dd70c..9875951afac0 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java @@ -66,7 +66,6 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -592,8 +591,6 @@ public void databaseAccountToClients() { cosmosDiagnosticsNode.get("clientCfgs").get("clientEndpoints").get(TestConfigurations.HOST).asInt(); assertThat(updatedClientCount).isEqualTo(clientCount + 1); - } catch (JsonMappingException e) { - throw new RuntimeException(e); } catch (JsonProcessingException e) { throw new RuntimeException(e); } finally { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java index e02330cbf9b6..3276a14d7d60 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java @@ -644,7 +644,7 @@ public InternalObjectNode createDocument(CosmosAsyncContainer cosmosContainer, M return BridgeInternal.getProperties(cosmosContainer.createItem(docDefinition).block()); } - public List bulkInsert(CosmosAsyncContainer cosmosContainer, List> keyValuePropsList) { + public List bulkInsertDocs(CosmosAsyncContainer cosmosContainer, List> keyValuePropsList) { ArrayList result = new ArrayList(); @@ -740,7 +740,7 @@ public void before_OrderbyDocumentQueryTest() throws Exception { props = new HashMap<>(); keyValuePropsList.add(props); - createdDocuments = bulkInsert(createdCollection, keyValuePropsList); + createdDocuments = bulkInsertDocs(createdCollection, keyValuePropsList); for(int i = 0; i < 10; i++) { Map p = new HashMap<>(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index a8d3391de2d5..4f93c1617417 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -30,6 +30,7 @@ import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.InternalObjectNode; +import com.azure.cosmos.implementation.PartitionKeyHelper; import com.azure.cosmos.implementation.PathParser; import com.azure.cosmos.implementation.QueryFeedOperationState; import com.azure.cosmos.implementation.Resource; @@ -49,6 +50,7 @@ import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperationResponse; import com.azure.cosmos.models.CosmosBulkOperations; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.CosmosResponse; @@ -100,7 +102,6 @@ public abstract class TestSuiteBase extends CosmosAsyncClientTest { - private static final int DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL = 500; private static final ObjectMapper objectMapper = new ObjectMapper(); protected static final int TIMEOUT = 40000; @@ -665,34 +666,60 @@ public static InternalObjectNode createDocument(CosmosAsyncContainer cosmosConta return BridgeInternal.getProperties(cosmosContainer.createItem(item, options).block()); } - public Flux> bulkInsert(CosmosAsyncContainer cosmosContainer, - List documentDefinitionList, - int concurrencyLevel) { + public Flux> bulkInsert(CosmosAsyncContainer cosmosContainer, + List documentDefinitionList) { - CosmosItemRequestOptions options = new CosmosItemRequestOptions() - .setCosmosEndToEndOperationLatencyPolicyConfig( - new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofHours(1)) - .build() - ); - List>> result = - new ArrayList<>(documentDefinitionList.size()); + CosmosContainerProperties cosmosContainerProperties = cosmosContainer.read().block().getProperties(); + PartitionKeyDefinition pkDef = cosmosContainerProperties.getPartitionKeyDefinition(); + + List operations = new ArrayList<>(documentDefinitionList.size()); for (T docDef : documentDefinitionList) { - result.add(cosmosContainer.createItem(docDef, options)); + InternalObjectNode internalNode = InternalObjectNode.fromObjectToInternalObjectNode(docDef); + PartitionKey partitionKey = PartitionKeyHelper.extractPartitionKeyFromDocument(internalNode, pkDef); + if (partitionKey == null) { + partitionKey = PartitionKey.NONE; + } + operations.add(CosmosBulkOperations.getCreateItemOperation(docDef, partitionKey)); } - return Flux.merge(Flux.fromIterable(result), concurrencyLevel); + CosmosBulkExecutionOptions bulkOptions = new CosmosBulkExecutionOptions(); + ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor() + .getImpl(bulkOptions) + .setCosmosEndToEndLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) + .build()); + + return cosmosContainer + .executeBulkOperations(Flux.fromIterable(operations), bulkOptions) + .flatMap(response -> { + if (response.getException() != null) { + Exception ex = response.getException(); + if (ex instanceof CosmosException) { + CosmosException cosmosException = (CosmosException) ex; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.CONFLICT + && cosmosException.getSubStatusCode() == 0) { + return Mono.empty(); + } + } + return Mono.error(ex); + } + return Mono.just(response); + }); } + public List bulkInsertBlocking(CosmosAsyncContainer cosmosContainer, List documentDefinitionList) { - return bulkInsert(cosmosContainer, documentDefinitionList, DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL) + bulkInsert(cosmosContainer, documentDefinitionList) .publishOn(Schedulers.parallel()) - .map(itemResponse -> itemResponse.getItem()) - .collectList() + .then() .block(); + + return documentDefinitionList; } public void voidBulkInsertBlocking(CosmosAsyncContainer cosmosContainer, List documentDefinitionList) { - bulkInsert(cosmosContainer, documentDefinitionList, DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL) + bulkInsert(cosmosContainer, documentDefinitionList) .publishOn(Schedulers.parallel()) .then() .block(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java index fa0116f3dbad..abc7348a14f0 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java @@ -1263,7 +1263,7 @@ public void staledLeaseAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - bulkInsert(createdFeedCollection, docDefList, FEED_COUNT).blockLast(); + bulkInsert(createdFeedCollection, docDefList).blockLast(); // Wait for the feed processor to receive and process the documents. Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); @@ -1337,7 +1337,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - bulkInsert(createdFeedCollection, docDefList, FEED_COUNT) + bulkInsert(createdFeedCollection, docDefList) .last() .flatMap(cosmosItemResponse -> { logger.info("Start first Change feed processor"); @@ -1389,7 +1389,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList1.add(getDocumentDefinition()); } - return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) + return bulkInsert(createdFeedCollection, docDefList1) .last(); }); })) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index a543309ef37a..0c91d6ec5039 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -987,7 +987,7 @@ public void staledLeaseAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - return bulkInsert(createdFeedCollection, docDefList, FEED_COUNT) + return bulkInsert(createdFeedCollection, docDefList) .last() .delayElement(Duration.ofMillis(1000)) .flatMap(cosmosItemResponse -> { @@ -1074,7 +1074,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - bulkInsert(createdFeedCollection, docDefList, FEED_COUNT) + bulkInsert(createdFeedCollection, docDefList) .last() .flatMap(cosmosItemResponse -> { logger.info("Start first Change feed processor"); @@ -1126,7 +1126,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList1.add(getDocumentDefinition()); } - return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) + return bulkInsert(createdFeedCollection, docDefList1) .last(); }); })) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index ad70d2997423..4e54d81ba417 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -1004,7 +1004,7 @@ public void staledLeaseAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - return bulkInsert(createdFeedCollection, docDefList, FEED_COUNT) + return bulkInsert(createdFeedCollection, docDefList) .last() .delayElement(Duration.ofMillis(1000)) .flatMap(cosmosItemResponse -> { @@ -1090,7 +1090,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - bulkInsert(createdFeedCollection, docDefList, FEED_COUNT) + bulkInsert(createdFeedCollection, docDefList) .last() .flatMap(cosmosItemResponse -> { log.info("Start first Change feed processor"); @@ -1143,7 +1143,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList1.add(getDocumentDefinition()); } - return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) + return bulkInsert(createdFeedCollection, docDefList1) .last(); }); })) From 47c7aa16ee3fab269d6610e5f0048e1ee6e61b18 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 6 Feb 2026 12:54:22 -0800 Subject: [PATCH 04/20] rename --- .../EndToEndTimeOutValidationTests.java | 2 +- .../azure/cosmos/rx/AggregateQueryTests.java | 2 +- .../azure/cosmos/rx/DistinctQueryTests.java | 2 +- .../azure/cosmos/rx/FeedRangeQueryTests.java | 2 +- .../azure/cosmos/rx/GroupByQueryTests.java | 2 +- .../cosmos/rx/MultiOrderByQueryTests.java | 2 +- .../cosmos/rx/OffsetLimitQueryTests.java | 2 +- .../cosmos/rx/OrderbyDocumentQueryTest.java | 2 +- .../cosmos/rx/ParallelDocumentQueryTest.java | 4 +- .../azure/cosmos/rx/QueryValidationTests.java | 2 +- .../cosmos/rx/ReadFeedDocumentsTest.java | 2 +- .../rx/ReadFeedStoredProceduresTest.java | 2 +- .../azure/cosmos/rx/ReadFeedTriggersTest.java | 2 +- .../com/azure/cosmos/rx/ReadFeedUdfsTest.java | 2 +- .../rx/SinglePartitionDocumentQueryTest.java | 2 +- .../SinglePartitionReadFeedDocumentsTest.java | 2 +- .../cosmos/rx/StoredProcedureQueryTest.java | 2 +- .../com/azure/cosmos/rx/TestSuiteBase.java | 69 +------------------ .../com/azure/cosmos/rx/TopQueryTests.java | 2 +- .../com/azure/cosmos/rx/TriggerQueryTest.java | 2 +- .../cosmos/rx/TriggerUpsertReplaceTest.java | 2 +- .../rx/UserDefinedFunctionQueryTest.java | 2 +- .../UserDefinedFunctionUpsertReplaceTest.java | 2 +- .../cosmos/rx/VeryLargeDocumentQueryTest.java | 2 +- .../implementation/RxDocumentClientImpl.java | 2 +- 25 files changed, 27 insertions(+), 92 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java index 55950284fc00..eac18cead8af 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java @@ -62,7 +62,7 @@ public CosmosAsyncClient initializeClient(CosmosEndToEndOperationLatencyPolicyCo try { createdContainer = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdContainer); + cleanUpContainer(createdContainer); createdDocuments.addAll(this.insertDocuments(DEFAULT_NUM_DOCUMENTS, null, createdContainer)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java index e6d58eb148bc..523430b7fad4 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java @@ -266,7 +266,7 @@ public void before_AggregateQueryTests() throws Throwable { client = this.getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); try { - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); } catch (Throwable throwable) { throwable = Exceptions.unwrap(throwable); if (!(throwable instanceof CosmosException)) { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java index 062c891cf09d..ca386623c9a4 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java @@ -379,7 +379,7 @@ public void afterClass() { public void beforeClass() throws Exception { client = this.getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); bulkInsert(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java index de2b0d9bddd0..f4d86b535a4b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java @@ -161,7 +161,7 @@ private List queryAndGetResults(SqlQuerySpec querySpec, CosmosQueryReques public void beforeClass() throws Exception { client = this.getClientBuilder().buildAsyncClient(); createdContainer = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdContainer); + cleanUpContainer(createdContainer); createdDocuments.addAll(this.insertDocuments( DEFAULT_NUM_DOCUMENTS_PER_PKEY, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java index 1d3f0adc9b08..6ae065c39cdd 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java @@ -195,7 +195,7 @@ public void afterClass() { public void beforeClass() throws Exception { client = this.getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); bulkInsert(); waitIfNeededForReplicasToCatchUp(this.getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java index a5e9e832aa1c..f59d2e1a9752 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java @@ -120,7 +120,7 @@ public void before_MultiOrderByQueryTests() throws Exception { documents = new ArrayList<>(); client = getClientBuilder().buildAsyncClient(); documentCollection = getSharedMultiPartitionCosmosContainerWithCompositeAndSpatialIndexes(client); - truncateCollection(documentCollection); + cleanUpContainer(documentCollection); expectCount(documentCollection, 0); int numberOfDocuments = 4; diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java index 227c4db66133..b3ad601f322f 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java @@ -344,7 +344,7 @@ public void afterClass() { public void beforeClass() throws Exception { client = this.getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); bulkInsert(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java index 3276a14d7d60..2279faa033dc 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java @@ -672,7 +672,7 @@ public void before_OrderbyDocumentQueryTest() throws Exception { client = getClientBuilder().buildAsyncClient(); createdDatabase = getSharedCosmosDatabase(client); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); String containerName = "roundTripsContainer-" + UUID.randomUUID(); createdDatabase.createContainer(containerName, "/mypk", diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java index ff84a3efa2b9..37b8941a265d 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java @@ -485,10 +485,10 @@ public void before_ParallelDocumentQueryTest() { private List prepareCosmosContainer(CosmosAsyncContainer cosmosContainer) { try { - truncateCollection(cosmosContainer); + cleanUpContainer(cosmosContainer); } catch (Throwable firstChanceException) { try { - truncateCollection(cosmosContainer); + cleanUpContainer(cosmosContainer); } catch (Throwable lastChanceException) { String message = String.format("container %s truncation failed due to first chance %s followed by last chance %s", cosmosContainer, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java index bc44a2824acd..80443137a22c 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java @@ -84,7 +84,7 @@ public void beforeClass() throws Exception { client = this.getClientBuilder().buildAsyncClient(); createdDatabase = getSharedCosmosDatabase(client); createdContainer = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdContainer); + cleanUpContainer(createdContainer); createdDocuments.addAll(this.insertDocuments(DEFAULT_NUM_DOCUMENTS, null, createdContainer)); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java index 66c902239162..43b98694dd5c 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java @@ -83,7 +83,7 @@ public void readDocuments_withoutEnableCrossPartitionQuery() { public void before_ReadFeedDocumentsTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); List docDefList = new ArrayList<>(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedStoredProceduresTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedStoredProceduresTest.java index 3a28ac6e3c3d..fa99650d2dd8 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedStoredProceduresTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedStoredProceduresTest.java @@ -56,7 +56,7 @@ public void readStoredProcedures() throws Exception { public void before_ReadFeedStoredProceduresTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for (int i = 0; i < 5; i++) { createdStoredProcedures.add(createStoredProcedures(createdCollection)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedTriggersTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedTriggersTest.java index 555e535b65a6..2c42722406ac 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedTriggersTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedTriggersTest.java @@ -54,7 +54,7 @@ public void readTriggers() throws Exception { public void before_ReadFeedTriggersTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for (int i = 0; i < 5; i++) { this.createdTriggers.add(this.createTriggers(createdCollection)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedUdfsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedUdfsTest.java index 2fb5e20495cb..870689f0c8f9 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedUdfsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedUdfsTest.java @@ -58,7 +58,7 @@ public void readUserDefinedFunctions() throws Exception { public void before_ReadFeedUdfsTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for (int i = 0; i < 5; i++) { createdUserDefinedFunctions.add(createUserDefinedFunctions(createdCollection)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java index ddb421861ef0..0aebb62bfcee 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java @@ -312,7 +312,7 @@ public InternalObjectNode createDocument(CosmosAsyncContainer cosmosContainer, i public void before_SinglePartitionDocumentQueryTest() throws Exception { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedSinglePartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for(int i = 0; i < 5; i++) { createdDocuments.add(createDocument(createdCollection, i)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java index fb8de9b4852a..cf1cf698546a 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java @@ -55,7 +55,7 @@ public void readDocuments() { public void before_SinglePartitionReadFeedDocumentsTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedSinglePartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); List docDefList = new ArrayList<>(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/StoredProcedureQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/StoredProcedureQueryTest.java index e18cd765c41b..7d87e90fce5c 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/StoredProcedureQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/StoredProcedureQueryTest.java @@ -127,7 +127,7 @@ public CosmosStoredProcedureProperties createStoredProc(CosmosAsyncContainer cos public void before_StoredProcedureQueryTest() throws Exception { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for (int i = 0; i < 5; i++) { createdStoredProcs.add(createStoredProc(createdCollection)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 4f93c1617417..8b6c0a8cd109 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -245,77 +245,12 @@ public void afterSuitEmulatorVNext() { } protected static void cleanUpContainer(CosmosAsyncContainer cosmosContainer) { - CosmosContainerProperties cosmosContainerProperties = cosmosContainer.read().block().getProperties(); - String cosmosContainerId = cosmosContainerProperties.getId(); - logger.info("Truncating collection {} ...", cosmosContainerId); - List paths = cosmosContainerProperties.getPartitionKeyDefinition().getPaths(); - CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); - options.setCosmosEndToEndOperationLatencyPolicyConfig( - new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofHours(1)) - .build() - ); - options.setMaxDegreeOfParallelism(-1); - int maxItemCount = 100; - - Flux deleteOperations = - cosmosContainer.queryItems("SELECT * FROM root", options, InternalObjectNode.class) - .byPage(maxItemCount) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .map(doc -> { - PartitionKey partitionKey; - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - Object propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - partitionKey = PartitionKey.NONE; - } else { - partitionKey = new PartitionKey(propertyValue); - } - } else { - partitionKey = new PartitionKey(null); - } - - return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey); - }); - - CosmosBulkExecutionOptions bulkOptions = new CosmosBulkExecutionOptions(); - ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper - .getCosmosBulkExecutionOptionsAccessor() - .getImpl(bulkOptions) - .setCosmosEndToEndLatencyPolicyConfig( - new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) - .build()); - - cosmosContainer.executeBulkOperations(deleteOperations, bulkOptions) - .flatMap(response -> { - if (response.getException() != null) { - Exception ex = response.getException(); - if (ex instanceof CosmosException) { - CosmosException cosmosException = (CosmosException) ex; - if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND - && cosmosException.getSubStatusCode() == 0) { - return Mono.empty(); - } - } - return Mono.error(ex); - } - if (response.getResponse() != null - && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { - return Mono.empty(); - } - return Mono.just(response); - }) - .blockLast(); - } - - protected static void truncateCollection(CosmosAsyncContainer cosmosContainer) { try { int i = 0; while (i < 100) { try { - truncateCollectionInternal(cosmosContainer); + cleanUpContainerInternal(cosmosContainer); return; } catch (CosmosException exception) { if (exception.getStatusCode() != HttpConstants.StatusCodes.TOO_MANY_REQUESTS @@ -357,7 +292,7 @@ protected static void expectCount(CosmosAsyncContainer cosmosContainer, int expe assertThat(counts.get(0)).isEqualTo(expectedCount); } - private static void truncateCollectionInternal(CosmosAsyncContainer cosmosContainer) { + private static void cleanUpContainerInternal(CosmosAsyncContainer cosmosContainer) { CosmosContainerProperties cosmosContainerProperties = cosmosContainer.read().block().getProperties(); String cosmosContainerId = cosmosContainerProperties.getId(); logger.info("Truncating collection {} ...", cosmosContainerId); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java index 1fad912332e9..41562c68cb80 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java @@ -248,7 +248,7 @@ public void afterClass() { public void before_TopQueryTests() throws Exception { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedSinglePartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); bulkInsert(client); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerQueryTest.java index 3b850c42ff18..8866cd5f7c0b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerQueryTest.java @@ -137,7 +137,7 @@ public CosmosTriggerProperties createTrigger(CosmosAsyncContainer cosmosContaine public void before_TriggerQueryTest() throws Exception { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); createdTriggers.clear(); for(int i = 0; i < 5; i++) { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerUpsertReplaceTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerUpsertReplaceTest.java index d2a4870a1c10..1db1bedf1845 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerUpsertReplaceTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerUpsertReplaceTest.java @@ -73,7 +73,7 @@ public void replaceTrigger() throws Exception { public void before_TriggerUpsertReplaceTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); } @AfterClass(groups = { "fast" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionQueryTest.java index 67775af2557e..74229a7ae529 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionQueryTest.java @@ -137,7 +137,7 @@ public CosmosUserDefinedFunctionProperties createUserDefinedFunction(CosmosAsync public void before_UserDefinedFunctionQueryTest() throws Exception { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for(int i = 0; i < 5; i++) { createdUDF.add(createUserDefinedFunction(createdCollection)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionUpsertReplaceTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionUpsertReplaceTest.java index c9d19969c8dc..c432fe871487 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionUpsertReplaceTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionUpsertReplaceTest.java @@ -73,7 +73,7 @@ public void replaceUserDefinedFunction() throws Exception { public void before_UserDefinedFunctionUpsertReplaceTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); } @AfterClass(groups = { "fast" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/VeryLargeDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/VeryLargeDocumentQueryTest.java index 68e68c50731d..9e7b8af99a5f 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/VeryLargeDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/VeryLargeDocumentQueryTest.java @@ -84,7 +84,7 @@ private void createLargeDocument() { public void before_VeryLargeDocumentQueryTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); } @AfterClass(groups = { "query" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 86a048f2ebad..c8345e6b996a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -508,7 +508,7 @@ private RxDocumentClientImpl(URI serviceEndpoint, if (value == null) { return 1; } - + return value + 1; }); From 8ccf0cc238b920624a3dbf185cf8e1b96b15e4dc Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 6 Feb 2026 13:09:30 -0800 Subject: [PATCH 05/20] Address PR #47933 review comments - Add isSuccessStatusCode() validation for non-2xx bulk responses in all 3 sites (impl/TestSuiteBase truncateCollection, rx/TestSuiteBase cleanUpContainerInternal, rx/TestSuiteBase bulkInsert) - Keep 409/Conflict handling in bulkInsert but validate other non-2xx responses - Return server-created items from bulkInsertBlocking via getItem(clazz) - Use PartitionKey.NONE instead of new PartitionKey(null) in cleanUpContainerInternal --- .../cosmos/implementation/TestSuiteBase.java | 5 ++++ .../com/azure/cosmos/rx/TestSuiteBase.java | 28 +++++++++++++++---- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java index cfb5e01fec65..f2cb95647921 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java @@ -257,6 +257,11 @@ protected static void truncateCollection(DocumentCollection collection) { && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { return Mono.empty(); } + if (response.getResponse() != null + && !response.getResponse().isSuccessStatusCode()) { + return Mono.error(new IllegalStateException( + "Bulk delete operation failed with status code " + response.getResponse().getStatusCode())); + } return Mono.just(response); }) .blockLast(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 8b6c0a8cd109..921994bbaeaa 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -323,7 +323,7 @@ private static void cleanUpContainerInternal(CosmosAsyncContainer cosmosContaine partitionKey = new PartitionKey(propertyValue); } } else { - partitionKey = new PartitionKey(null); + partitionKey = PartitionKey.NONE; } return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey); @@ -355,6 +355,11 @@ private static void cleanUpContainerInternal(CosmosAsyncContainer cosmosContaine && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { return Mono.empty(); } + if (response.getResponse() != null + && !response.getResponse().isSuccessStatusCode()) { + return Mono.error(new IllegalStateException( + "Bulk delete operation failed with status code " + response.getResponse().getStatusCode())); + } return Mono.just(response); }) .blockLast(); @@ -639,18 +644,31 @@ public Flux> bulkInsert(CosmosAsyncConta } return Mono.error(ex); } + if (response.getResponse() != null + && !response.getResponse().isSuccessStatusCode() + && response.getResponse().getStatusCode() != HttpConstants.StatusCodes.CONFLICT) { + return Mono.error(new IllegalStateException( + "Bulk insert operation failed with status code " + response.getResponse().getStatusCode())); + } return Mono.just(response); }); } + @SuppressWarnings("unchecked") public List bulkInsertBlocking(CosmosAsyncContainer cosmosContainer, List documentDefinitionList) { - bulkInsert(cosmosContainer, documentDefinitionList) + if (documentDefinitionList == null || documentDefinitionList.isEmpty()) { + return documentDefinitionList; + } + + Class clazz = (Class) documentDefinitionList.get(0).getClass(); + + return bulkInsert(cosmosContainer, documentDefinitionList) .publishOn(Schedulers.parallel()) - .then() + .filter(response -> response.getResponse() != null) + .map(response -> response.getResponse().getItem(clazz)) + .collectList() .block(); - - return documentDefinitionList; } public void voidBulkInsertBlocking(CosmosAsyncContainer cosmosContainer, List documentDefinitionList) { From 6e960fc06b95f53f2d9796ddab14ba742ee0e692 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 6 Feb 2026 13:30:35 -0800 Subject: [PATCH 06/20] simplify code --- .../cosmos/implementation/TestSuiteBase.java | 285 +++++++----------- 1 file changed, 116 insertions(+), 169 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java index f2cb95647921..8a6eca9855ac 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java @@ -4,6 +4,7 @@ import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.CosmosClientBuilder; import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; import com.azure.cosmos.CosmosException; @@ -178,177 +179,123 @@ public void afterSuite() { protected static void truncateCollection(DocumentCollection collection) { logger.info("Truncating DocumentCollection {} ...", collection.getId()); - AsyncDocumentClient houseKeepingClient = createGatewayHouseKeepingDocumentClient().build(); - try { - List paths = collection.getPartitionKey().getPaths(); - - try (CosmosAsyncClient cosmosClient = new CosmosClientBuilder() - .key(TestConfigurations.MASTER_KEY) - .endpoint(TestConfigurations.HOST) - .buildAsyncClient()) { - CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); - options.setMaxDegreeOfParallelism(-1); - QueryFeedOperationState state = new QueryFeedOperationState( - cosmosClient, - "truncateCollection", - collection.getSelfLink(), - collection.getId(), - ResourceType.Document, - OperationType.Query, - null, - options, - new CosmosPagedFluxOptions() - ); - - ModelBridgeInternal.setQueryRequestOptionsMaxItemCount(options, 100); - - logger.info("Truncating DocumentCollection {} documents ...", collection.getId()); - - String altLink = collection.getAltLink(); - String[] altLinkSegments = altLink.split("/"); - // altLink format: dbs/{dbName}/colls/{collName} - String databaseName = altLinkSegments[1]; - String containerName = altLinkSegments[3]; - - Flux deleteOperations = - houseKeepingClient.queryDocuments(collection.getSelfLink(), "SELECT * FROM root", state, Document.class) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .map(doc -> { - PartitionKey partitionKey; - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - Object propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - propertyValue = Undefined.value(); - } - partitionKey = new PartitionKey(propertyValue); - } else { - partitionKey = PartitionKey.NONE; - } - - return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey); - }); - - CosmosBulkExecutionOptions bulkOptions = new CosmosBulkExecutionOptions(); - ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper - .getCosmosBulkExecutionOptionsAccessor() - .getImpl(bulkOptions) - .setCosmosEndToEndLatencyPolicyConfig( - new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) - .build()); - - cosmosClient.getDatabase(databaseName) - .getContainer(containerName) - .executeBulkOperations(deleteOperations, bulkOptions) - .flatMap(response -> { - if (response.getException() != null) { - Exception ex = response.getException(); - if (ex instanceof CosmosException) { - CosmosException cosmosException = (CosmosException) ex; - if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND - && cosmosException.getSubStatusCode() == 0) { - return Mono.empty(); - } + List paths = collection.getPartitionKey().getPaths(); + + try (CosmosAsyncClient cosmosClient = new CosmosClientBuilder() + .key(TestConfigurations.MASTER_KEY) + .endpoint(TestConfigurations.HOST) + .buildAsyncClient()) { + + CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); + options.setMaxDegreeOfParallelism(-1); + + ModelBridgeInternal.setQueryRequestOptionsMaxItemCount(options, 100); + + logger.info("Truncating DocumentCollection {} documents ...", collection.getId()); + + String altLink = collection.getAltLink(); + String[] altLinkSegments = altLink.split("/"); + // altLink format: dbs/{dbName}/colls/{collName} + String databaseName = altLinkSegments[1]; + String containerName = altLinkSegments[3]; + + CosmosAsyncContainer container = cosmosClient.getDatabase(databaseName).getContainer(containerName); + + Flux deleteOperations = + container + .queryItems( "SELECT * FROM root", Document.class) + .byPage() + .publishOn(Schedulers.parallel()) + .flatMap(page -> Flux.fromIterable(page.getResults())) + .map(doc -> { + PartitionKey partitionKey; + if (paths != null && !paths.isEmpty()) { + List pkPath = PathParser.getPathParts(paths.get(0)); + Object propertyValue = doc.getObjectByPath(pkPath); + if (propertyValue == null) { + propertyValue = Undefined.value(); + } + partitionKey = new PartitionKey(propertyValue); + } else { + partitionKey = PartitionKey.NONE; + } + + return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey); + }); + + CosmosBulkExecutionOptions bulkOptions = new CosmosBulkExecutionOptions(); + ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor() + .getImpl(bulkOptions) + .setCosmosEndToEndLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) + .build()); + + cosmosClient.getDatabase(databaseName) + .getContainer(containerName) + .executeBulkOperations(deleteOperations, bulkOptions) + .flatMap(response -> { + if (response.getException() != null) { + Exception ex = response.getException(); + if (ex instanceof CosmosException) { + CosmosException cosmosException = (CosmosException) ex; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND + && cosmosException.getSubStatusCode() == 0) { + return Mono.empty(); } - return Mono.error(ex); - } - if (response.getResponse() != null - && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { - return Mono.empty(); } - if (response.getResponse() != null - && !response.getResponse().isSuccessStatusCode()) { - return Mono.error(new IllegalStateException( - "Bulk delete operation failed with status code " + response.getResponse().getStatusCode())); - } - return Mono.just(response); - }) - .blockLast(); - - logger.info("Truncating DocumentCollection {} triggers ...", collection.getId()); - - state = new QueryFeedOperationState( - cosmosClient, - "truncateTriggers", - collection.getSelfLink(), - collection.getId(), - ResourceType.Document, - OperationType.Query, - null, - options, - new CosmosPagedFluxOptions() - ); - houseKeepingClient.queryTriggers(collection.getSelfLink(), "SELECT * FROM root", state) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(trigger -> { - RequestOptions requestOptions = new RequestOptions(); - - // if (paths != null && !paths.isEmpty()) { - // Object propertyValue = trigger.getObjectByPath(PathParser.getPathParts(paths.get(0))); - // requestOptions.partitionKey(new PartitionKey(propertyValue)); - // } - - return houseKeepingClient.deleteTrigger(trigger.getSelfLink(), requestOptions); - }).then().block(); - - logger.info("Truncating DocumentCollection {} storedProcedures ...", collection.getId()); - - state = new QueryFeedOperationState( - cosmosClient, - "truncateStoredProcs", - collection.getSelfLink(), - collection.getId(), - ResourceType.Document, - OperationType.Query, - null, - options, - new CosmosPagedFluxOptions() - ); - houseKeepingClient.queryStoredProcedures(collection.getSelfLink(), "SELECT * FROM root", state) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(storedProcedure -> { - RequestOptions requestOptions = new RequestOptions(); - - // if (paths != null && !paths.isEmpty()) { - // Object propertyValue = storedProcedure.getObjectByPath(PathParser.getPathParts(paths.get(0))); - // requestOptions.partitionKey(new PartitionKey(propertyValue)); - // } - - return houseKeepingClient.deleteStoredProcedure(storedProcedure.getSelfLink(), requestOptions); - }).then().block(); - - logger.info("Truncating DocumentCollection {} udfs ...", collection.getId()); - - state = new QueryFeedOperationState( - cosmosClient, - "truncateUserDefinedFunctions", - collection.getSelfLink(), - collection.getId(), - ResourceType.Document, - OperationType.Query, - null, - options, - new CosmosPagedFluxOptions() - ); - houseKeepingClient.queryUserDefinedFunctions(collection.getSelfLink(), "SELECT * FROM root", state) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(udf -> { - RequestOptions requestOptions = new RequestOptions(); - - // if (paths != null && !paths.isEmpty()) { - // Object propertyValue = udf.getObjectByPath(PathParser.getPathParts(paths.get(0))); - // requestOptions.partitionKey(new PartitionKey(propertyValue)); - // } - - return houseKeepingClient.deleteUserDefinedFunction(udf.getSelfLink(), requestOptions); - }).then().block(); - } - } finally { - houseKeepingClient.close(); + return Mono.error(ex); + } + if (response.getResponse() != null + && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { + return Mono.empty(); + } + if (response.getResponse() != null + && !response.getResponse().isSuccessStatusCode()) { + return Mono.error(new IllegalStateException( + "Bulk delete operation failed with status code " + response.getResponse().getStatusCode())); + } + return Mono.just(response); + }) + .blockLast(); + + logger.info("Truncating DocumentCollection {} triggers ...", collection.getId()); + + container + .getScripts() + .queryTriggers("SELECT * FROM root", new CosmosQueryRequestOptions()) + .byPage() + .publishOn(Schedulers.parallel()) + .flatMap(page -> Flux.fromIterable(page.getResults())) + .flatMap(trigger -> container.getScripts().getTrigger(trigger.getId()).delete()).then().block(); + + logger.info("Truncating DocumentCollection {} storedProcedures ...", collection.getId()); + + container + .getScripts() + .queryStoredProcedures("SELECT * FROM root", new CosmosQueryRequestOptions()) + .byPage() + .publishOn(Schedulers.parallel()) + .flatMap(page -> Flux.fromIterable(page.getResults())) + .flatMap(storedProcedure -> { + return container.getScripts().getStoredProcedure(storedProcedure.getId()).delete(); + }) + .then() + .block(); + + logger.info("Truncating DocumentCollection {} udfs ...", collection.getId()); + + container + .getScripts() + .queryUserDefinedFunctions("SELECT * FROM root", new CosmosQueryRequestOptions()) + .byPage() + .publishOn(Schedulers.parallel()).flatMap(page -> Flux.fromIterable(page.getResults())) + .flatMap(udf -> { + RequestOptions requestOptions = new RequestOptions(); + return container.getScripts().getUserDefinedFunction(udf.getId()).delete(); + }) + .then() + .block(); } logger.info("Finished truncating DocumentCollection {}.", collection.getId()); From fab4fd009fc426b2feabf10eb4be387d60ca3cfc Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 6 Feb 2026 13:40:13 -0800 Subject: [PATCH 07/20] Replace IllegalStateException with CosmosException in bulk error propagation Use BridgeInternal.createCosmosException() with status code and substatus code from CosmosBulkItemResponse instead of generic IllegalStateException. --- .../azure/cosmos/implementation/TestSuiteBase.java | 8 ++++++-- .../java/com/azure/cosmos/rx/TestSuiteBase.java | 14 ++++++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java index 8a6eca9855ac..0672d930c991 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation; +import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosAsyncContainer; @@ -252,8 +253,11 @@ protected static void truncateCollection(DocumentCollection collection) { } if (response.getResponse() != null && !response.getResponse().isSuccessStatusCode()) { - return Mono.error(new IllegalStateException( - "Bulk delete operation failed with status code " + response.getResponse().getStatusCode())); + CosmosException bulkException = BridgeInternal.createCosmosException( + response.getResponse().getStatusCode(), + "Bulk delete operation failed with status code " + response.getResponse().getStatusCode()); + BridgeInternal.setSubStatusCode(bulkException, response.getResponse().getSubStatusCode()); + return Mono.error(bulkException); } return Mono.just(response); }) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 921994bbaeaa..b335371d9f04 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -357,8 +357,11 @@ private static void cleanUpContainerInternal(CosmosAsyncContainer cosmosContaine } if (response.getResponse() != null && !response.getResponse().isSuccessStatusCode()) { - return Mono.error(new IllegalStateException( - "Bulk delete operation failed with status code " + response.getResponse().getStatusCode())); + CosmosException bulkException = BridgeInternal.createCosmosException( + response.getResponse().getStatusCode(), + "Bulk delete operation failed with status code " + response.getResponse().getStatusCode()); + BridgeInternal.setSubStatusCode(bulkException, response.getResponse().getSubStatusCode()); + return Mono.error(bulkException); } return Mono.just(response); }) @@ -647,8 +650,11 @@ public Flux> bulkInsert(CosmosAsyncConta if (response.getResponse() != null && !response.getResponse().isSuccessStatusCode() && response.getResponse().getStatusCode() != HttpConstants.StatusCodes.CONFLICT) { - return Mono.error(new IllegalStateException( - "Bulk insert operation failed with status code " + response.getResponse().getStatusCode())); + CosmosException bulkException = BridgeInternal.createCosmosException( + response.getResponse().getStatusCode(), + "Bulk insert operation failed with status code " + response.getResponse().getStatusCode()); + BridgeInternal.setSubStatusCode(bulkException, response.getResponse().getSubStatusCode()); + return Mono.error(bulkException); } return Mono.just(response); }); From 6df45b338cb984f2aaa59fd1ede78da65705aa7e Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 6 Feb 2026 15:18:31 -0800 Subject: [PATCH 08/20] test --- .../cosmos/implementation/TestSuiteBase.java | 12 ++---------- .../com/azure/cosmos/rx/TestSuiteBase.java | 18 ++++++------------ 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java index 0672d930c991..e8f7abc509ab 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java @@ -180,7 +180,6 @@ public void afterSuite() { protected static void truncateCollection(DocumentCollection collection) { logger.info("Truncating DocumentCollection {} ...", collection.getId()); - List paths = collection.getPartitionKey().getPaths(); try (CosmosAsyncClient cosmosClient = new CosmosClientBuilder() .key(TestConfigurations.MASTER_KEY) @@ -209,15 +208,8 @@ protected static void truncateCollection(DocumentCollection collection) { .publishOn(Schedulers.parallel()) .flatMap(page -> Flux.fromIterable(page.getResults())) .map(doc -> { - PartitionKey partitionKey; - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - Object propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - propertyValue = Undefined.value(); - } - partitionKey = new PartitionKey(propertyValue); - } else { + PartitionKey partitionKey = PartitionKeyHelper.extractPartitionKeyFromDocument(doc, collection.getPartitionKey()); + if (partitionKey == null) { partitionKey = PartitionKey.NONE; } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index b335371d9f04..97adc285c015 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -31,7 +31,6 @@ import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.InternalObjectNode; import com.azure.cosmos.implementation.PartitionKeyHelper; -import com.azure.cosmos.implementation.PathParser; import com.azure.cosmos.implementation.QueryFeedOperationState; import com.azure.cosmos.implementation.Resource; import com.azure.cosmos.implementation.TestConfigurations; @@ -296,7 +295,6 @@ private static void cleanUpContainerInternal(CosmosAsyncContainer cosmosContaine CosmosContainerProperties cosmosContainerProperties = cosmosContainer.read().block().getProperties(); String cosmosContainerId = cosmosContainerProperties.getId(); logger.info("Truncating collection {} ...", cosmosContainerId); - List paths = cosmosContainerProperties.getPartitionKeyDefinition().getPaths(); CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); options.setCosmosEndToEndOperationLatencyPolicyConfig( new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofHours(1)) @@ -313,16 +311,12 @@ private static void cleanUpContainerInternal(CosmosAsyncContainer cosmosContaine .publishOn(Schedulers.parallel()) .flatMap(page -> Flux.fromIterable(page.getResults())) .map(doc -> { - PartitionKey partitionKey; - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - Object propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - partitionKey = PartitionKey.NONE; - } else { - partitionKey = new PartitionKey(propertyValue); - } - } else { + PartitionKey partitionKey = + PartitionKeyHelper.extractPartitionKeyFromDocument( + doc, + cosmosContainerProperties.getPartitionKeyDefinition()); + + if (partitionKey == null) { partitionKey = PartitionKey.NONE; } From 797ea8bd232e18298b914d2753999e917c14b1e4 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 6 Feb 2026 19:36:47 -0800 Subject: [PATCH 09/20] Add bulkEnabled parameter and rename insert methods - Add insertUsingPointOperations for one-by-one createItem path - Add bulkEnabled parameter to insertAllItemsBlocking/voidInsertAllItemsBlocking - Rename bulkInsertBlocking -> insertAllItemsBlocking - Rename voidBulkInsertBlocking -> voidInsertAllItemsBlocking - Update all callers with explicit bulkEnabled=true --- .../EndToEndTimeOutValidationTests.java | 2 +- .../azure/cosmos/rx/AggregateQueryTests.java | 2 +- .../rx/BackPressureCrossPartitionTest.java | 5 ++- .../com/azure/cosmos/rx/BackPressureTest.java | 2 +- ...ContainerCreateDeleteWithSameNameTest.java | 2 +- .../azure/cosmos/rx/DistinctQueryTests.java | 4 +- .../azure/cosmos/rx/FeedRangeQueryTests.java | 2 +- .../azure/cosmos/rx/GroupByQueryTests.java | 2 +- .../cosmos/rx/MultiOrderByQueryTests.java | 2 +- .../cosmos/rx/OffsetLimitQueryTests.java | 2 +- .../cosmos/rx/OrderbyDocumentQueryTest.java | 2 +- .../cosmos/rx/ParallelDocumentQueryTest.java | 2 +- .../azure/cosmos/rx/QueryValidationTests.java | 4 +- .../cosmos/rx/ReadFeedDocumentsTest.java | 2 +- .../SinglePartitionReadFeedDocumentsTest.java | 2 +- .../com/azure/cosmos/rx/TestSuiteBase.java | 42 +++++++++++++++++-- .../ChangeFeedProcessorMigrationTests.java | 2 +- .../FullFidelityChangeFeedProcessorTest.java | 4 +- .../IncrementalChangeFeedProcessorTest.java | 4 +- .../IncrementalChangeFeedProcessorTest.java | 6 +-- 20 files changed, 66 insertions(+), 29 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java index eac18cead8af..aa5004539f92 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java @@ -504,7 +504,7 @@ private List insertDocuments(int documentCount, List partiti partitionKeys == null ? UUID.randomUUID().toString() : partitionKeys.get(random.nextInt(partitionKeys.size())))); } - List documentInserted = bulkInsertBlocking(container, documentsToInsert); + List documentInserted = insertAllItemsBlocking(container, documentsToInsert, true); waitIfNeededForReplicasToCatchUp(this.getClientBuilder()); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java index 523430b7fad4..02a59815e730 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java @@ -113,7 +113,7 @@ public void queryDocumentsWithAggregates(Boolean qmEnabled) throws Exception { public void bulkInsert() { generateTestData(); - voidBulkInsertBlocking(createdCollection, docs); + voidInsertAllItemsBlocking(createdCollection, docs, true); } public void generateTestData() { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java index 08d3927a162b..9abcee71b131 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java @@ -206,9 +206,10 @@ public void before_BackPressureCrossPartitionTest() { docDefList.add(getDocumentDefinition(i)); } - createdDocuments = bulkInsertBlocking( + createdDocuments = insertAllItemsBlocking( createdCollection, - docDefList); + docDefList, + true); numberOfPartitions = CosmosBridgeInternal.getAsyncDocumentClient(client).readPartitionKeyRanges(getCollectionLink(), (CosmosQueryRequestOptions) null) .flatMap(p -> Flux.fromIterable(p.getResults())).collectList().single().block().size(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureTest.java index c133d0e81241..bac1eb1719f4 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureTest.java @@ -285,7 +285,7 @@ public void before_BackPressureTest() throws Exception { docDefList.add(getDocumentDefinition(i)); } - createdDocuments = bulkInsertBlocking(createdCollection, docDefList); + createdDocuments = insertAllItemsBlocking(createdCollection, docDefList, true); waitIfNeededForReplicasToCatchUp(getClientBuilder()); warmUp(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ContainerCreateDeleteWithSameNameTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ContainerCreateDeleteWithSameNameTest.java index 55a06016b767..9de5984d98ee 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ContainerCreateDeleteWithSameNameTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ContainerCreateDeleteWithSameNameTest.java @@ -929,7 +929,7 @@ private void setupReadFeedDocuments(List createdDocuments, CosmosAsy docDefList.add(TestObject.creatNewTestObject()); } - createdDocuments.addAll(bulkInsertBlocking(feedContainer, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedContainer, docDefList, true)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java index ca386623c9a4..c2c6fb4cf5ff 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java @@ -325,8 +325,8 @@ public void queryDocumentsWithOrderBy(String query, boolean matchedOrderBy) { public void bulkInsert() { generateTestData(); - voidBulkInsertBlocking(createdCollection, docs); - voidBulkInsertBlocking(createdCollection, propertiesDocs); + voidInsertAllItemsBlocking(createdCollection, docs, true); + voidInsertAllItemsBlocking(createdCollection, propertiesDocs, true); } public void generateTestData() { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java index f4d86b535a4b..21775691d2ef 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java @@ -187,7 +187,7 @@ private List insertDocuments( .size())))); } - List documentInserted = bulkInsertBlocking(container, documentsToInsert); + List documentInserted = insertAllItemsBlocking(container, documentsToInsert, true); waitIfNeededForReplicasToCatchUp(this.getClientBuilder()); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java index 6ae065c39cdd..23428c9b7544 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java @@ -145,7 +145,7 @@ public void queryDocuments(Triple, Integer> gro public void bulkInsert() { generateTestData(INSERT_DOCUMENTS_CNT); - voidBulkInsertBlocking(createdCollection, docs); + voidInsertAllItemsBlocking(createdCollection, docs, true); } public void generateTestData(int documentCnt) { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java index f59d2e1a9752..9a6427952ca6 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java @@ -161,7 +161,7 @@ public void before_MultiOrderByQueryTests() throws Exception { } } - voidBulkInsertBlocking(documentCollection, documents); + voidInsertAllItemsBlocking(documentCollection, documents, true); expectCount(documentCollection, documents.size()); waitIfNeededForReplicasToCatchUp(getClientBuilder()); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java index b3ad601f322f..1e8824d8a0ff 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java @@ -305,7 +305,7 @@ private List queryWithContinuationTokens(String query, int p public void bulkInsert() { generateTestData(); - voidBulkInsertBlocking(createdCollection, docs); + voidInsertAllItemsBlocking(createdCollection, docs, true); } public void generateTestData() { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java index 2279faa033dc..821ca11c4898 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java @@ -653,7 +653,7 @@ public List bulkInsertDocs(CosmosAsyncContainer cosmosContai result.add(docDefinition); } - return bulkInsertBlocking(cosmosContainer, result); + return insertAllItemsBlocking(cosmosContainer, result, true); } @BeforeMethod(groups = { "query" }) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java index 37b8941a265d..94d3895a0177 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java @@ -509,7 +509,7 @@ private List prepareCosmosContainer(CosmosAsyncContainer cos docDefList.add(getDocumentDefinition(99)); } - List items = bulkInsertBlocking(cosmosContainer, docDefList); + List items = insertAllItemsBlocking(cosmosContainer, docDefList, true); waitIfNeededForReplicasToCatchUp(getClientBuilder()); return items; } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java index 80443137a22c..dc3b508f4a0a 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java @@ -570,7 +570,7 @@ private List createDocumentsWithUndefinedAndNullValues(CosmosAsyncCo } docsToInsert.add(objectNode); } - return bulkInsertBlocking(container, docsToInsert); + return insertAllItemsBlocking(container, docsToInsert, true); } @Test(groups = {"query"}, timeOut = TIMEOUT) @@ -668,7 +668,7 @@ private List insertDocuments(int documentCount, List partiti partitionKeys == null ? UUID.randomUUID().toString() : partitionKeys.get(random.nextInt(partitionKeys.size())))); } - List documentInserted = bulkInsertBlocking(container, documentsToInsert); + List documentInserted = insertAllItemsBlocking(container, documentsToInsert, true); waitIfNeededForReplicasToCatchUp(this.getClientBuilder()); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java index 43b98694dd5c..5fd5100b5855 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java @@ -91,7 +91,7 @@ public void before_ReadFeedDocumentsTest() { docDefList.add(getDocumentDefinition()); } - createdDocuments = bulkInsertBlocking(createdCollection, docDefList); + createdDocuments = insertAllItemsBlocking(createdCollection, docDefList, true); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java index cf1cf698546a..a975079aa354 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java @@ -63,7 +63,7 @@ public void before_SinglePartitionReadFeedDocumentsTest() { docDefList.add(getDocumentDefinition()); } - createdDocuments = bulkInsertBlocking(createdCollection, docDefList); + createdDocuments = insertAllItemsBlocking(createdCollection, docDefList, true); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 97adc285c015..5ec2b6241653 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -101,6 +101,7 @@ public abstract class TestSuiteBase extends CosmosAsyncClientTest { + private static final int DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL = 5; private static final ObjectMapper objectMapper = new ObjectMapper(); protected static final int TIMEOUT = 40000; @@ -653,14 +654,39 @@ public Flux> bulkInsert(CosmosAsyncConta return Mono.just(response); }); } + + private Flux> insertUsingPointOperations(CosmosAsyncContainer cosmosContainer, + List documentDefinitionList) { + CosmosItemRequestOptions options = new CosmosItemRequestOptions() + .setCosmosEndToEndOperationLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofHours(1)) + .build() + ); + + List>> result = new ArrayList<>(documentDefinitionList.size()); + for (T docDef : documentDefinitionList) { + result.add(cosmosContainer.createItem(docDef, options)); + } + + return Flux.merge(Flux.fromIterable(result), DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL); + } @SuppressWarnings("unchecked") - public List bulkInsertBlocking(CosmosAsyncContainer cosmosContainer, - List documentDefinitionList) { + public List insertAllItemsBlocking(CosmosAsyncContainer cosmosContainer, + List documentDefinitionList, + boolean bulkEnabled) { if (documentDefinitionList == null || documentDefinitionList.isEmpty()) { return documentDefinitionList; } + if (!bulkEnabled) { + return insertUsingPointOperations(cosmosContainer, documentDefinitionList) + .publishOn(Schedulers.parallel()) + .map(CosmosItemResponse::getItem) + .collectList() + .block(); + } + Class clazz = (Class) documentDefinitionList.get(0).getClass(); return bulkInsert(cosmosContainer, documentDefinitionList) @@ -671,7 +697,17 @@ public List bulkInsertBlocking(CosmosAsyncContainer cosmosContainer, .block(); } - public void voidBulkInsertBlocking(CosmosAsyncContainer cosmosContainer, List documentDefinitionList) { + public void voidInsertAllItemsBlocking(CosmosAsyncContainer cosmosContainer, + List documentDefinitionList, + boolean bulkEnabled) { + if (!bulkEnabled) { + insertUsingPointOperations(cosmosContainer, documentDefinitionList) + .publishOn(Schedulers.parallel()) + .then() + .block(); + return; + } + bulkInsert(cosmosContainer, documentDefinitionList) .publishOn(Schedulers.parallel()) .then() diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/ChangeFeedProcessorMigrationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/ChangeFeedProcessorMigrationTests.java index b9d474ccb0a4..f9b4c51535f7 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/ChangeFeedProcessorMigrationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/ChangeFeedProcessorMigrationTests.java @@ -211,7 +211,7 @@ private void setupReadFeedDocuments( docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, true)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java index abc7348a14f0..0f28d36c42e8 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java @@ -2153,7 +2153,7 @@ private void setupReadFeedDocuments(List createdDocuments, C logger.info("Adding the following item to bulk list: {}", item); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } @@ -2164,7 +2164,7 @@ private void createReadFeedDocuments(List createdDocuments, docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index 0c91d6ec5039..fd4166871667 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -2350,7 +2350,7 @@ private void setupReadFeedDocuments( docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); for (InternalObjectNode current : createdDocuments) { try { logger.info("CREATED {}", OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(current)); @@ -2368,7 +2368,7 @@ private void createReadFeedDocuments(List createdDocuments, docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index 4e54d81ba417..6525617d94dd 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -2236,7 +2236,7 @@ private void setupReadFeedDocuments(List createdDocuments, C docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } @@ -2247,7 +2247,7 @@ private void setupReadFeedDocumentsForAllVersionsAndDeletes(List createdDocuments, docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } From 3899f57a48631d72f6a9632e580b4d2285b59f79 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 30 Jan 2026 13:52:36 -0800 Subject: [PATCH 10/20] fix test pipeline --- .../com/azure/cosmos/implementation/RxDocumentClientImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index c8345e6b996a..86a048f2ebad 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -508,7 +508,7 @@ private RxDocumentClientImpl(URI serviceEndpoint, if (value == null) { return 1; } - + return value + 1; }); From 42cd881098c73c24973e800a067e625ffc4b82f2 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Sat, 7 Feb 2026 21:10:42 -0800 Subject: [PATCH 11/20] fix --- .../com/azure/cosmos/CosmosNotFoundTests.java | 3 +++ ...InjectionServerErrorRuleOnDirectTests.java | 6 ++++- .../cosmos/rx/ClientRetryPolicyE2ETests.java | 22 +++++++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java index aede5407719a..20afb328684e 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java @@ -340,6 +340,9 @@ public void performDocumentOperationOnDeletedContainer(OperationType operationTy // Verify sub-status code is either 0 or 1003 if (ConnectionMode.DIRECT.name().equals(accessor.getConnectionMode(clientToUse))) { + if (diagnosticsContext.getSubStatusCode() != HttpConstants.SubStatusCodes.OWNER_RESOURCE_NOT_EXISTS) { + logger.error("CosmosNotFoundTests-performDocumentOperationOnDeletedContainer {}", diagnosticsContext.toJson()); + } assertThat(diagnosticsContext.getSubStatusCode()) .as("Sub-status code should be 1003") .isIn( diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java index 08e1d7332eab..638d1e205c82 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java @@ -948,7 +948,7 @@ public void faultInjectionServerErrorRuleTests_ServerErrorResponse( } @Test(groups = { "fast", "fi-multi-master", "multi-region" }, dataProvider = "faultInjectionOperationTypeProviderForLeaseNotFound", timeOut = TIMEOUT) - public void faultInjectionServerErrorRuleTests_LeaseNotFound(OperationType operationType, FaultInjectionOperationType faultInjectionOperationType, boolean primaryAddressOnly, boolean isReadMany) throws JsonProcessingException { + public void faultInjectionServerErrorRuleTests_LeaseNotFound(OperationType operationType, FaultInjectionOperationType faultInjectionOperationType, boolean primaryAddressOnly, boolean isReadMany) throws JsonProcessingException, InterruptedException { boolean shouldRetryCrossRegion = false; @@ -996,6 +996,10 @@ public void faultInjectionServerErrorRuleTests_LeaseNotFound(OperationType opera ruleId, shouldRetryCrossRegion ); + // Allow time for the background address refresh to complete before validating diagnostics. + // The address refresh for LEASE_NOT_FOUND is triggered asynchronously via + // startBackgroundAddressRefresh() on Schedulers.boundedElastic(). + Thread.sleep(500); this.validateAddressRefreshWithForceRefresh(cosmosDiagnostics, (operationType == OperationType.ReadFeed || operationType == OperationType.Query)); } finally { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index 446ee82008b8..9251ae020fac 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -578,6 +578,17 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( assertThat(diagnosticsContext.getContactedRegionNames().size()).isEqualTo(2); assertThat(diagnosticsContext.getStatusCode()).isLessThan(HttpConstants.StatusCodes.BADREQUEST); + if (diagnosticsContext.getDuration().toMillis() > 5000) { + logger.error( + "ClientRetryPolicy_dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion longer than 5s - {} {} {} {} {} {} {}", + operationType, + faultInjectionOperationType, + shouldUsePreferredRegionsOnClient, + isReadMany, + hitLimit, + true, + diagnosticsContext.toJson()); + } assertThat(diagnosticsContext.getDuration()).isLessThan(Duration.ofSeconds(5)); } else { assertThat(cosmosDiagnostics).isNotNull(); @@ -588,6 +599,17 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( assertThat(diagnosticsContext.getContactedRegionNames().size()).isEqualTo(1); assertThat(diagnosticsContext.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.SERVICE_UNAVAILABLE); assertThat(diagnosticsContext.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.LEASE_NOT_FOUND); + if (diagnosticsContext.getDuration().toMillis() > 5000) { + logger.error( + "ClientRetryPolicy_dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion longer than 5s - {} {} {} {} {} {} {}", + operationType, + faultInjectionOperationType, + shouldUsePreferredRegionsOnClient, + isReadMany, + hitLimit, + false, + diagnosticsContext.toJson()); + } assertThat(diagnosticsContext.getDuration()).isLessThan(Duration.ofSeconds(5)); } From cc91f6e7bc1e20c7ddc82cb96d954938db9152b1 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Sun, 8 Feb 2026 11:12:27 -0800 Subject: [PATCH 12/20] fix NotFoundException tests --- .../src/test/java/com/azure/cosmos/CosmosNotFoundTests.java | 4 ---- .../cosmos/implementation/StaleResourceRetryPolicy.java | 5 +++++ 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java index 20afb328684e..e7c4a7fe3fd4 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java @@ -307,10 +307,6 @@ public void performDocumentOperationOnDeletedContainer(OperationType operationTy Thread.sleep(5000); - // Create an item in the container - TestObject testObject = TestObject.create(this.createdItemPk); - testContainer.createItem(testObject).block(); - // Create a different client instance to delete the container deletingAsyncClient = getClientBuilder() .endpoint(TestConfigurations.HOST) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/StaleResourceRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/StaleResourceRetryPolicy.java index 3e13377dcfc7..14b05436f3a0 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/StaleResourceRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/StaleResourceRetryPolicy.java @@ -87,6 +87,11 @@ public Mono shouldRetry(Exception e) { CosmosException clientException = Utils.as(e, CosmosException.class); if (isServerNameCacheStaledException(clientException) || isGatewayStaledContainerException(clientException)) { if (!this.retried) { + logger.debug("Received name cache staled exception for collection [{}] with statusCode {}, subStatusCode {}, going to retry", + collectionLink, + clientException.getStatusCode(), + clientException.getSubStatusCode()); + // 1. refresh the collection cache if needed // 2. If the collection rid has changed, then also clean up session container for old containerRid AtomicReference oldCollectionRid = new AtomicReference<>(); From 4e8b099a4de607ee0f7d2f41681a95bfcfc692aa Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 30 Jan 2026 13:52:36 -0800 Subject: [PATCH 13/20] fix test pipeline --- .../com/azure/cosmos/implementation/RxDocumentClientImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 4c9643a3eafd..083389edcd55 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -510,7 +510,7 @@ private RxDocumentClientImpl(URI serviceEndpoint, if (value == null) { return 1; } - + return value + 1; }); From 804014f11a177ff8d5893c65f40fcedfeec0d821 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Sun, 8 Feb 2026 13:34:42 -0800 Subject: [PATCH 14/20] fix --- .../com/azure/cosmos/CosmosNotFoundTests.java | 25 ++++--------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java index e7c4a7fe3fd4..c74041dc2a3b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java @@ -333,26 +333,11 @@ public void performDocumentOperationOnDeletedContainer(OperationType operationTy .as("Status code should be 404 (Not Found)") .isEqualTo(HttpConstants.StatusCodes.NOTFOUND); - // Verify sub-status code is either 0 or 1003 - - if (ConnectionMode.DIRECT.name().equals(accessor.getConnectionMode(clientToUse))) { - if (diagnosticsContext.getSubStatusCode() != HttpConstants.SubStatusCodes.OWNER_RESOURCE_NOT_EXISTS) { - logger.error("CosmosNotFoundTests-performDocumentOperationOnDeletedContainer {}", diagnosticsContext.toJson()); - } - assertThat(diagnosticsContext.getSubStatusCode()) - .as("Sub-status code should be 1003") - .isIn( - HttpConstants.SubStatusCodes.OWNER_RESOURCE_NOT_EXISTS - ); - } - - if (ConnectionMode.GATEWAY.name().equals(accessor.getConnectionMode(clientToUse))) { - assertThat(diagnosticsContext.getSubStatusCode()) - .as("Sub-status code should be 0") - .isIn( - HttpConstants.SubStatusCodes.UNKNOWN - ); - } + assertThat(diagnosticsContext.getSubStatusCode()) + .as("Sub-status code should be 1003") + .isIn( + HttpConstants.SubStatusCodes.OWNER_RESOURCE_NOT_EXISTS + ); } finally { safeClose(clientToUse); safeClose(deletingAsyncClient); From 190ab923e10ff38a1abae85600710388589c8662 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Sun, 8 Feb 2026 16:12:08 -0800 Subject: [PATCH 15/20] fix --- .../cosmos/rx/ClientRetryPolicyE2ETests.java | 7 ++++- .../IncrementalChangeFeedProcessorTest.java | 24 ++++++++++++--- .../IncrementalChangeFeedProcessorTest.java | 30 ++++++++++++++----- 3 files changed, 48 insertions(+), 13 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index 9251ae020fac..eb97686c53e0 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -628,7 +628,7 @@ public void dataPlaneRequestHitsLeaseNotFoundAndResourceThrottleFirstPreferredRe FaultInjectionOperationType faultInjectionOperationType, boolean shouldUsePreferredRegionsOnClient, boolean isReadMany, - int hitLimit) { + int hitLimit) throws InterruptedException { boolean shouldRetryCrossRegion = false; @@ -700,6 +700,11 @@ public void dataPlaneRequestHitsLeaseNotFoundAndResourceThrottleFirstPreferredRe CosmosFaultInjectionHelper.configureFaultInjectionRules(testContainer, Arrays.asList(tooManyRequestsRule, leaseNotFoundFaultRule)).block(); + if (shouldRetryCrossRegion) { + // add some delay so to allow the data can be replicated cross regions + Thread.sleep(200); + } + CosmosDiagnostics cosmosDiagnostics = this.performDocumentOperation(testContainer, operationType, createdItem, testItem -> new PartitionKey(testItem.getMypk()), isReadMany) .block(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index fd4166871667..32bce0706e93 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -1795,7 +1795,7 @@ public void getCurrentStateWithFaultInjection(FaultInjectionServerErrorType faul } } - @Test(groups = {"query" }, timeOut = 2 * TIMEOUT) + @Test(groups = {"query" }, timeOut = 3 * TIMEOUT) public void readFeedDocumentsWithThroughputControl() throws InterruptedException { // Create a separate client as throughput control group will be applied to it CosmosAsyncClient clientWithThroughputControl = @@ -1819,8 +1819,15 @@ public void readFeedDocumentsWithThroughputControl() throws InterruptedException Map receivedDocuments = new ConcurrentHashMap<>(); int maxItemCount = 100; // force the RU usage per requests > 1 - int feedCount = maxItemCount * 2; // force to do two fetches - setupReadFeedDocuments(createdDocuments, createdFeedCollection, feedCount); + + // force to do two fetches + for (int i = 0; i < 2; i++) { + setupReadFeedDocuments( + createdDocuments, + this.createdDatabase.getContainer(createdFeedCollection.getId()), + maxItemCount, + true); + } changeFeedProcessor = new ChangeFeedProcessorBuilder() .hostName(hostName) @@ -2344,13 +2351,22 @@ private void setupReadFeedDocuments( List createdDocuments, CosmosAsyncContainer feedCollection, long count) { + + setupReadFeedDocuments(createdDocuments, feedCollection, count, false); + } + + private void setupReadFeedDocuments( + List createdDocuments, + CosmosAsyncContainer feedCollection, + long count, + boolean bulkEnabled) { List docDefList = new ArrayList<>(); for(int i = 0; i < count; i++) { docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, bulkEnabled)); for (InternalObjectNode current : createdDocuments) { try { logger.info("CREATED {}", OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(current)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index 6525617d94dd..351bbea423f1 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -1688,7 +1688,7 @@ public void endToEndTimeoutConfigShouldBeSuppressed() throws InterruptedExceptio } } - @Test(groups = { "long-emulator" }, timeOut = 2 * TIMEOUT) + @Test(groups = { "long-emulator" }, timeOut = 3 * TIMEOUT) public void readFeedDocumentsWithThroughputControl() throws InterruptedException { // Create a separate client as throughput control group will be applied to it CosmosAsyncClient clientWithThroughputControl = @@ -1712,12 +1712,15 @@ public void readFeedDocumentsWithThroughputControl() throws InterruptedException Map receivedDocuments = new ConcurrentHashMap<>(); int maxItemCount = 100; // force the RU usage per requests > 1 - int feedCount = maxItemCount * 2; // force to do two fetches + // force to do two fetches // using the original client to create the docs to isolate possible throttling - setupReadFeedDocuments( - createdDocuments, - this.createdDatabase.getContainer(createdFeedCollection.getId()), - feedCount); + for (int i = 0; i < 2; i++) { + setupReadFeedDocuments( + createdDocuments, + this.createdDatabase.getContainer(createdFeedCollection.getId()), + maxItemCount, + true); + } changeFeedProcessor = new ChangeFeedProcessorBuilder() .hostName(hostName) @@ -2229,14 +2232,25 @@ public void afterClass() { safeClose(client); } - private void setupReadFeedDocuments(List createdDocuments, CosmosAsyncContainer feedCollection, long count) { + private void setupReadFeedDocuments( + List createdDocuments, + CosmosAsyncContainer feedCollection, + long count) { + setupReadFeedDocuments(createdDocuments, feedCollection, count, false); + } + + private void setupReadFeedDocuments( + List createdDocuments, + CosmosAsyncContainer feedCollection, + long count, + boolean bulkEnabled) { List docDefList = new ArrayList<>(); for(int i = 0; i < count; i++) { docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, bulkEnabled)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } From a75dc92b9ce33d6e51399a444da205ca0b61576f Mon Sep 17 00:00:00 2001 From: Annie Liang <64233642+xinlian12@users.noreply.github.com> Date: Mon, 9 Feb 2026 10:30:36 -0800 Subject: [PATCH 16/20] Update sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../test/java/com/azure/cosmos/implementation/TestSuiteBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java index e8f7abc509ab..9756b16b366d 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java @@ -287,7 +287,6 @@ protected static void truncateCollection(DocumentCollection collection) { .byPage() .publishOn(Schedulers.parallel()).flatMap(page -> Flux.fromIterable(page.getResults())) .flatMap(udf -> { - RequestOptions requestOptions = new RequestOptions(); return container.getScripts().getUserDefinedFunction(udf.getId()).delete(); }) .then() From baf9fd9688d6c65a224ac316424421a1adf04e65 Mon Sep 17 00:00:00 2001 From: Annie Liang <64233642+xinlian12@users.noreply.github.com> Date: Mon, 9 Feb 2026 10:31:14 -0800 Subject: [PATCH 17/20] Update sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- ...InjectionServerErrorRuleOnDirectTests.java | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java index 638d1e205c82..9d8981bcc375 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java @@ -999,8 +999,24 @@ public void faultInjectionServerErrorRuleTests_LeaseNotFound(OperationType opera // Allow time for the background address refresh to complete before validating diagnostics. // The address refresh for LEASE_NOT_FOUND is triggered asynchronously via // startBackgroundAddressRefresh() on Schedulers.boundedElastic(). - Thread.sleep(500); - this.validateAddressRefreshWithForceRefresh(cosmosDiagnostics, (operationType == OperationType.ReadFeed || operationType == OperationType.Query)); + // Instead of a fixed sleep, poll until the validation passes or a timeout is reached + long addressRefreshDeadlineNanos = System.nanoTime() + Duration.ofSeconds(5).toNanos(); + AssertionError lastAssertionError = null; + while (System.nanoTime() < addressRefreshDeadlineNanos) { + try { + this.validateAddressRefreshWithForceRefresh( + cosmosDiagnostics, + (operationType == OperationType.ReadFeed || operationType == OperationType.Query)); + lastAssertionError = null; + break; + } catch (AssertionError assertionError) { + lastAssertionError = assertionError; + Thread.sleep(100); + } + } + if (lastAssertionError != null) { + throw lastAssertionError; + } } finally { serverErrorRule.disable(); From 94823e41c510f5dd510fed44fb1671b337f2f230 Mon Sep 17 00:00:00 2001 From: Annie Liang <64233642+xinlian12@users.noreply.github.com> Date: Mon, 9 Feb 2026 10:37:19 -0800 Subject: [PATCH 18/20] Update sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../java/com/azure/cosmos/implementation/TestSuiteBase.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java index 9756b16b366d..7ccbb8454aec 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java @@ -194,8 +194,10 @@ protected static void truncateCollection(DocumentCollection collection) { logger.info("Truncating DocumentCollection {} documents ...", collection.getId()); String altLink = collection.getAltLink(); - String[] altLinkSegments = altLink.split("/"); - // altLink format: dbs/{dbName}/colls/{collName} + // Normalize altLink so both "dbs/.../colls/..." and "/dbs/.../colls/..." are handled consistently. + String normalizedAltLink = StringUtils.strip(altLink, "/"); + String[] altLinkSegments = normalizedAltLink.split("/"); + // altLink format (after normalization): dbs/{dbName}/colls/{collName} String databaseName = altLinkSegments[1]; String containerName = altLinkSegments[3]; From ef1326b294f9b61ba5db14bbc9c2132bd194be8f Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Mon, 9 Feb 2026 10:38:17 -0800 Subject: [PATCH 19/20] fix --- .../java/com/azure/cosmos/implementation/TestSuiteBase.java | 2 +- .../epkversion/FullFidelityChangeFeedProcessorTest.java | 2 +- .../pkversion/IncrementalChangeFeedProcessorTest.java | 2 +- .../com/azure/cosmos/implementation/RxDocumentClientImpl.java | 1 - 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java index e8f7abc509ab..0978b73cbc55 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java @@ -203,7 +203,7 @@ protected static void truncateCollection(DocumentCollection collection) { Flux deleteOperations = container - .queryItems( "SELECT * FROM root", Document.class) + .queryItems( "SELECT * FROM root", options, Document.class) .byPage() .publishOn(Schedulers.parallel()) .flatMap(page -> Flux.fromIterable(page.getResults())) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java index 0f28d36c42e8..ca41077c14b2 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java @@ -1339,7 +1339,7 @@ public void ownerNullAcquiring() throws InterruptedException { bulkInsert(createdFeedCollection, docDefList) .last() - .flatMap(cosmosItemResponse -> { + .flatMap(response -> { logger.info("Start first Change feed processor"); return changeFeedProcessorFirst.start().subscribeOn(Schedulers.boundedElastic()) .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index a7a7f49f285f..e0d4f7605bb1 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -2251,7 +2251,7 @@ private void setupReadFeedDocuments( } createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); - + waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 083389edcd55..a2733a2603ee 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -510,7 +510,6 @@ private RxDocumentClientImpl(URI serviceEndpoint, if (value == null) { return 1; } - return value + 1; }); From 86f9617ba53eb020c2fd23331c130d1f22be046b Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Mon, 9 Feb 2026 10:43:35 -0800 Subject: [PATCH 20/20] change --- .../cosmos/rx/ClientRetryPolicyE2ETests.java | 22 ------------------- .../IncrementalChangeFeedProcessorTest.java | 2 +- .../IncrementalChangeFeedProcessorTest.java | 2 +- .../implementation/RxDocumentClientImpl.java | 1 + 4 files changed, 3 insertions(+), 24 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index eb97686c53e0..1373af756094 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -578,17 +578,6 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( assertThat(diagnosticsContext.getContactedRegionNames().size()).isEqualTo(2); assertThat(diagnosticsContext.getStatusCode()).isLessThan(HttpConstants.StatusCodes.BADREQUEST); - if (diagnosticsContext.getDuration().toMillis() > 5000) { - logger.error( - "ClientRetryPolicy_dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion longer than 5s - {} {} {} {} {} {} {}", - operationType, - faultInjectionOperationType, - shouldUsePreferredRegionsOnClient, - isReadMany, - hitLimit, - true, - diagnosticsContext.toJson()); - } assertThat(diagnosticsContext.getDuration()).isLessThan(Duration.ofSeconds(5)); } else { assertThat(cosmosDiagnostics).isNotNull(); @@ -599,17 +588,6 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( assertThat(diagnosticsContext.getContactedRegionNames().size()).isEqualTo(1); assertThat(diagnosticsContext.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.SERVICE_UNAVAILABLE); assertThat(diagnosticsContext.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.LEASE_NOT_FOUND); - if (diagnosticsContext.getDuration().toMillis() > 5000) { - logger.error( - "ClientRetryPolicy_dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion longer than 5s - {} {} {} {} {} {} {}", - operationType, - faultInjectionOperationType, - shouldUsePreferredRegionsOnClient, - isReadMany, - hitLimit, - false, - diagnosticsContext.toJson()); - } assertThat(diagnosticsContext.getDuration()).isLessThan(Duration.ofSeconds(5)); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index 4c26904d5a3e..55b5f384d28d 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -2366,7 +2366,7 @@ private void setupReadFeedDocuments( docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, bulkEnabled)); for (InternalObjectNode current : createdDocuments) { try { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index e0d4f7605bb1..36b3cd11de9a 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -2250,7 +2250,7 @@ private void setupReadFeedDocuments( docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, bulkEnabled)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index a2733a2603ee..4c9643a3eafd 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -510,6 +510,7 @@ private RxDocumentClientImpl(URI serviceEndpoint, if (value == null) { return 1; } + return value + 1; });