From cc4689f96d91518a6bed6a8fdd496cd76cde4b4c Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 30 Jan 2026 13:52:36 -0800 Subject: [PATCH 01/10] fix test pipeline --- .../com/azure/cosmos/implementation/RxDocumentClientImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index c8345e6b996a..86a048f2ebad 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -508,7 +508,7 @@ private RxDocumentClientImpl(URI serviceEndpoint, if (value == null) { return 1; } - + return value + 1; }); From 54dbd8bac1fac281ff33fb414b439724e629ea89 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 6 Feb 2026 11:50:44 -0800 Subject: [PATCH 02/10] fix few tests part 2 --- .../cosmos/implementation/TestSuiteBase.java | 75 ++++++++-- .../com/azure/cosmos/rx/TestSuiteBase.java | 138 +++++++++++++----- 2 files changed, 157 insertions(+), 56 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java index 5c59341d59c0..cfb5e01fec65 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java @@ -5,6 +5,8 @@ import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; +import com.azure.cosmos.CosmosException; import com.azure.cosmos.CosmosNettyLeakDetectorFactory; import com.azure.cosmos.DirectConnectionConfig; import com.azure.cosmos.DocumentClientTest; @@ -19,7 +21,10 @@ import com.azure.cosmos.models.CompositePath; import com.azure.cosmos.models.CompositePathSortOrder; import com.azure.cosmos.models.CosmosClientTelemetryConfig; +import com.azure.cosmos.models.CosmosItemOperation; import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperations; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.IncludedPath; import com.azure.cosmos.models.IndexingPolicy; @@ -199,24 +204,62 @@ protected static void truncateCollection(DocumentCollection collection) { logger.info("Truncating DocumentCollection {} documents ...", collection.getId()); - houseKeepingClient.queryDocuments(collection.getSelfLink(), "SELECT * FROM root", state, Document.class) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(doc -> { - RequestOptions requestOptions = new RequestOptions(); - - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - Object propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - propertyValue = Undefined.value(); + String altLink = collection.getAltLink(); + String[] altLinkSegments = altLink.split("/"); + // altLink format: dbs/{dbName}/colls/{collName} + String databaseName = altLinkSegments[1]; + String containerName = altLinkSegments[3]; + + Flux deleteOperations = + houseKeepingClient.queryDocuments(collection.getSelfLink(), "SELECT * FROM root", state, Document.class) + .publishOn(Schedulers.parallel()) + .flatMap(page -> Flux.fromIterable(page.getResults())) + .map(doc -> { + PartitionKey partitionKey; + if (paths != null && !paths.isEmpty()) { + List pkPath = PathParser.getPathParts(paths.get(0)); + Object propertyValue = doc.getObjectByPath(pkPath); + if (propertyValue == null) { + propertyValue = Undefined.value(); + } + partitionKey = new PartitionKey(propertyValue); + } else { + partitionKey = PartitionKey.NONE; } - requestOptions.setPartitionKey(new PartitionKey(propertyValue)); - } - - return houseKeepingClient.deleteDocument(doc.getSelfLink(), requestOptions); - }).then().block(); + return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey); + }); + + CosmosBulkExecutionOptions bulkOptions = new CosmosBulkExecutionOptions(); + ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor() + .getImpl(bulkOptions) + .setCosmosEndToEndLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) + .build()); + + cosmosClient.getDatabase(databaseName) + .getContainer(containerName) + .executeBulkOperations(deleteOperations, bulkOptions) + .flatMap(response -> { + if (response.getException() != null) { + Exception ex = response.getException(); + if (ex instanceof CosmosException) { + CosmosException cosmosException = (CosmosException) ex; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND + && cosmosException.getSubStatusCode() == 0) { + return Mono.empty(); + } + } + return Mono.error(ex); + } + if (response.getResponse() != null + && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { + return Mono.empty(); + } + return Mono.just(response); + }) + .blockLast(); logger.info("Truncating DocumentCollection {} triggers ...", collection.getId()); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index cef37b160f3b..a8d3391de2d5 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -45,8 +45,11 @@ import com.azure.cosmos.models.CosmosContainerRequestOptions; import com.azure.cosmos.models.CosmosDatabaseProperties; import com.azure.cosmos.models.CosmosDatabaseResponse; +import com.azure.cosmos.models.CosmosItemOperation; import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperations; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.CosmosResponse; import com.azure.cosmos.models.CosmosStoredProcedureRequestOptions; @@ -253,29 +256,56 @@ protected static void cleanUpContainer(CosmosAsyncContainer cosmosContainer) { options.setMaxDegreeOfParallelism(-1); int maxItemCount = 100; - cosmosContainer.queryItems("SELECT * FROM root", options, InternalObjectNode.class) - .byPage(maxItemCount) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(doc -> { - - PartitionKey partitionKey = null; - - Object propertyValue = null; - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - partitionKey = PartitionKey.NONE; + Flux deleteOperations = + cosmosContainer.queryItems("SELECT * FROM root", options, InternalObjectNode.class) + .byPage(maxItemCount) + .publishOn(Schedulers.parallel()) + .flatMap(page -> Flux.fromIterable(page.getResults())) + .map(doc -> { + PartitionKey partitionKey; + if (paths != null && !paths.isEmpty()) { + List pkPath = PathParser.getPathParts(paths.get(0)); + Object propertyValue = doc.getObjectByPath(pkPath); + if (propertyValue == null) { + partitionKey = PartitionKey.NONE; + } else { + partitionKey = new PartitionKey(propertyValue); + } } else { - partitionKey = new PartitionKey(propertyValue); + partitionKey = new PartitionKey(null); } - } else { - partitionKey = new PartitionKey(null); - } - return cosmosContainer.deleteItem(doc.getId(), partitionKey); - }).then().block(); + return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey); + }); + + CosmosBulkExecutionOptions bulkOptions = new CosmosBulkExecutionOptions(); + ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor() + .getImpl(bulkOptions) + .setCosmosEndToEndLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) + .build()); + + cosmosContainer.executeBulkOperations(deleteOperations, bulkOptions) + .flatMap(response -> { + if (response.getException() != null) { + Exception ex = response.getException(); + if (ex instanceof CosmosException) { + CosmosException cosmosException = (CosmosException) ex; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND + && cosmosException.getSubStatusCode() == 0) { + return Mono.empty(); + } + } + return Mono.error(ex); + } + if (response.getResponse() != null + && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { + return Mono.empty(); + } + return Mono.just(response); + }) + .blockLast(); } protected static void truncateCollection(CosmosAsyncContainer cosmosContainer) { @@ -341,29 +371,57 @@ private static void truncateCollectionInternal(CosmosAsyncContainer cosmosContai logger.info("Truncating collection {} documents ...", cosmosContainer.getId()); - cosmosContainer.queryItems("SELECT * FROM root", options, InternalObjectNode.class) - .byPage(maxItemCount) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(doc -> { - - PartitionKey partitionKey = null; - - Object propertyValue = null; - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - partitionKey = PartitionKey.NONE; + Flux deleteOperations = + cosmosContainer.queryItems("SELECT * FROM root", options, InternalObjectNode.class) + .byPage(maxItemCount) + .publishOn(Schedulers.parallel()) + .flatMap(page -> Flux.fromIterable(page.getResults())) + .map(doc -> { + PartitionKey partitionKey; + if (paths != null && !paths.isEmpty()) { + List pkPath = PathParser.getPathParts(paths.get(0)); + Object propertyValue = doc.getObjectByPath(pkPath); + if (propertyValue == null) { + partitionKey = PartitionKey.NONE; + } else { + partitionKey = new PartitionKey(propertyValue); + } } else { - partitionKey = new PartitionKey(propertyValue); + partitionKey = new PartitionKey(null); } - } else { - partitionKey = new PartitionKey(null); - } - return cosmosContainer.deleteItem(doc.getId(), partitionKey); - }).then().block(); + return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey); + }); + + CosmosBulkExecutionOptions truncateBulkOptions = new CosmosBulkExecutionOptions(); + ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor() + .getImpl(truncateBulkOptions) + .setCosmosEndToEndLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) + .build()); + + cosmosContainer + .executeBulkOperations(deleteOperations, truncateBulkOptions) + .flatMap(response -> { + if (response.getException() != null) { + Exception ex = response.getException(); + if (ex instanceof CosmosException) { + CosmosException cosmosException = (CosmosException) ex; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND + && cosmosException.getSubStatusCode() == 0) { + return Mono.empty(); + } + } + return Mono.error(ex); + } + if (response.getResponse() != null + && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { + return Mono.empty(); + } + return Mono.just(response); + }) + .blockLast(); expectCount(cosmosContainer, 0); From d2f3e1711a4dbf33c6c99df4cc6fc30336efbeb8 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 6 Feb 2026 12:40:25 -0800 Subject: [PATCH 03/10] fix some tests --- .../azure/cosmos/CosmosDiagnosticsTest.java | 3 - .../cosmos/rx/OrderbyDocumentQueryTest.java | 4 +- .../com/azure/cosmos/rx/TestSuiteBase.java | 61 +++++++++++++------ .../FullFidelityChangeFeedProcessorTest.java | 6 +- .../IncrementalChangeFeedProcessorTest.java | 6 +- .../IncrementalChangeFeedProcessorTest.java | 6 +- 6 files changed, 55 insertions(+), 31 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java index b0d6543dd70c..9875951afac0 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java @@ -66,7 +66,6 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -592,8 +591,6 @@ public void databaseAccountToClients() { cosmosDiagnosticsNode.get("clientCfgs").get("clientEndpoints").get(TestConfigurations.HOST).asInt(); assertThat(updatedClientCount).isEqualTo(clientCount + 1); - } catch (JsonMappingException e) { - throw new RuntimeException(e); } catch (JsonProcessingException e) { throw new RuntimeException(e); } finally { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java index e02330cbf9b6..3276a14d7d60 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java @@ -644,7 +644,7 @@ public InternalObjectNode createDocument(CosmosAsyncContainer cosmosContainer, M return BridgeInternal.getProperties(cosmosContainer.createItem(docDefinition).block()); } - public List bulkInsert(CosmosAsyncContainer cosmosContainer, List> keyValuePropsList) { + public List bulkInsertDocs(CosmosAsyncContainer cosmosContainer, List> keyValuePropsList) { ArrayList result = new ArrayList(); @@ -740,7 +740,7 @@ public void before_OrderbyDocumentQueryTest() throws Exception { props = new HashMap<>(); keyValuePropsList.add(props); - createdDocuments = bulkInsert(createdCollection, keyValuePropsList); + createdDocuments = bulkInsertDocs(createdCollection, keyValuePropsList); for(int i = 0; i < 10; i++) { Map p = new HashMap<>(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index a8d3391de2d5..4f93c1617417 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -30,6 +30,7 @@ import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.InternalObjectNode; +import com.azure.cosmos.implementation.PartitionKeyHelper; import com.azure.cosmos.implementation.PathParser; import com.azure.cosmos.implementation.QueryFeedOperationState; import com.azure.cosmos.implementation.Resource; @@ -49,6 +50,7 @@ import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperationResponse; import com.azure.cosmos.models.CosmosBulkOperations; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.CosmosResponse; @@ -100,7 +102,6 @@ public abstract class TestSuiteBase extends CosmosAsyncClientTest { - private static final int DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL = 500; private static final ObjectMapper objectMapper = new ObjectMapper(); protected static final int TIMEOUT = 40000; @@ -665,34 +666,60 @@ public static InternalObjectNode createDocument(CosmosAsyncContainer cosmosConta return BridgeInternal.getProperties(cosmosContainer.createItem(item, options).block()); } - public Flux> bulkInsert(CosmosAsyncContainer cosmosContainer, - List documentDefinitionList, - int concurrencyLevel) { + public Flux> bulkInsert(CosmosAsyncContainer cosmosContainer, + List documentDefinitionList) { - CosmosItemRequestOptions options = new CosmosItemRequestOptions() - .setCosmosEndToEndOperationLatencyPolicyConfig( - new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofHours(1)) - .build() - ); - List>> result = - new ArrayList<>(documentDefinitionList.size()); + CosmosContainerProperties cosmosContainerProperties = cosmosContainer.read().block().getProperties(); + PartitionKeyDefinition pkDef = cosmosContainerProperties.getPartitionKeyDefinition(); + + List operations = new ArrayList<>(documentDefinitionList.size()); for (T docDef : documentDefinitionList) { - result.add(cosmosContainer.createItem(docDef, options)); + InternalObjectNode internalNode = InternalObjectNode.fromObjectToInternalObjectNode(docDef); + PartitionKey partitionKey = PartitionKeyHelper.extractPartitionKeyFromDocument(internalNode, pkDef); + if (partitionKey == null) { + partitionKey = PartitionKey.NONE; + } + operations.add(CosmosBulkOperations.getCreateItemOperation(docDef, partitionKey)); } - return Flux.merge(Flux.fromIterable(result), concurrencyLevel); + CosmosBulkExecutionOptions bulkOptions = new CosmosBulkExecutionOptions(); + ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor() + .getImpl(bulkOptions) + .setCosmosEndToEndLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) + .build()); + + return cosmosContainer + .executeBulkOperations(Flux.fromIterable(operations), bulkOptions) + .flatMap(response -> { + if (response.getException() != null) { + Exception ex = response.getException(); + if (ex instanceof CosmosException) { + CosmosException cosmosException = (CosmosException) ex; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.CONFLICT + && cosmosException.getSubStatusCode() == 0) { + return Mono.empty(); + } + } + return Mono.error(ex); + } + return Mono.just(response); + }); } + public List bulkInsertBlocking(CosmosAsyncContainer cosmosContainer, List documentDefinitionList) { - return bulkInsert(cosmosContainer, documentDefinitionList, DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL) + bulkInsert(cosmosContainer, documentDefinitionList) .publishOn(Schedulers.parallel()) - .map(itemResponse -> itemResponse.getItem()) - .collectList() + .then() .block(); + + return documentDefinitionList; } public void voidBulkInsertBlocking(CosmosAsyncContainer cosmosContainer, List documentDefinitionList) { - bulkInsert(cosmosContainer, documentDefinitionList, DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL) + bulkInsert(cosmosContainer, documentDefinitionList) .publishOn(Schedulers.parallel()) .then() .block(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java index fa0116f3dbad..abc7348a14f0 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java @@ -1263,7 +1263,7 @@ public void staledLeaseAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - bulkInsert(createdFeedCollection, docDefList, FEED_COUNT).blockLast(); + bulkInsert(createdFeedCollection, docDefList).blockLast(); // Wait for the feed processor to receive and process the documents. Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); @@ -1337,7 +1337,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - bulkInsert(createdFeedCollection, docDefList, FEED_COUNT) + bulkInsert(createdFeedCollection, docDefList) .last() .flatMap(cosmosItemResponse -> { logger.info("Start first Change feed processor"); @@ -1389,7 +1389,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList1.add(getDocumentDefinition()); } - return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) + return bulkInsert(createdFeedCollection, docDefList1) .last(); }); })) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index a543309ef37a..0c91d6ec5039 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -987,7 +987,7 @@ public void staledLeaseAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - return bulkInsert(createdFeedCollection, docDefList, FEED_COUNT) + return bulkInsert(createdFeedCollection, docDefList) .last() .delayElement(Duration.ofMillis(1000)) .flatMap(cosmosItemResponse -> { @@ -1074,7 +1074,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - bulkInsert(createdFeedCollection, docDefList, FEED_COUNT) + bulkInsert(createdFeedCollection, docDefList) .last() .flatMap(cosmosItemResponse -> { logger.info("Start first Change feed processor"); @@ -1126,7 +1126,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList1.add(getDocumentDefinition()); } - return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) + return bulkInsert(createdFeedCollection, docDefList1) .last(); }); })) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index ad70d2997423..4e54d81ba417 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -1004,7 +1004,7 @@ public void staledLeaseAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - return bulkInsert(createdFeedCollection, docDefList, FEED_COUNT) + return bulkInsert(createdFeedCollection, docDefList) .last() .delayElement(Duration.ofMillis(1000)) .flatMap(cosmosItemResponse -> { @@ -1090,7 +1090,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - bulkInsert(createdFeedCollection, docDefList, FEED_COUNT) + bulkInsert(createdFeedCollection, docDefList) .last() .flatMap(cosmosItemResponse -> { log.info("Start first Change feed processor"); @@ -1143,7 +1143,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList1.add(getDocumentDefinition()); } - return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) + return bulkInsert(createdFeedCollection, docDefList1) .last(); }); })) From 47c7aa16ee3fab269d6610e5f0048e1ee6e61b18 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 6 Feb 2026 12:54:22 -0800 Subject: [PATCH 04/10] rename --- .../EndToEndTimeOutValidationTests.java | 2 +- .../azure/cosmos/rx/AggregateQueryTests.java | 2 +- .../azure/cosmos/rx/DistinctQueryTests.java | 2 +- .../azure/cosmos/rx/FeedRangeQueryTests.java | 2 +- .../azure/cosmos/rx/GroupByQueryTests.java | 2 +- .../cosmos/rx/MultiOrderByQueryTests.java | 2 +- .../cosmos/rx/OffsetLimitQueryTests.java | 2 +- .../cosmos/rx/OrderbyDocumentQueryTest.java | 2 +- .../cosmos/rx/ParallelDocumentQueryTest.java | 4 +- .../azure/cosmos/rx/QueryValidationTests.java | 2 +- .../cosmos/rx/ReadFeedDocumentsTest.java | 2 +- .../rx/ReadFeedStoredProceduresTest.java | 2 +- .../azure/cosmos/rx/ReadFeedTriggersTest.java | 2 +- .../com/azure/cosmos/rx/ReadFeedUdfsTest.java | 2 +- .../rx/SinglePartitionDocumentQueryTest.java | 2 +- .../SinglePartitionReadFeedDocumentsTest.java | 2 +- .../cosmos/rx/StoredProcedureQueryTest.java | 2 +- .../com/azure/cosmos/rx/TestSuiteBase.java | 69 +------------------ .../com/azure/cosmos/rx/TopQueryTests.java | 2 +- .../com/azure/cosmos/rx/TriggerQueryTest.java | 2 +- .../cosmos/rx/TriggerUpsertReplaceTest.java | 2 +- .../rx/UserDefinedFunctionQueryTest.java | 2 +- .../UserDefinedFunctionUpsertReplaceTest.java | 2 +- .../cosmos/rx/VeryLargeDocumentQueryTest.java | 2 +- .../implementation/RxDocumentClientImpl.java | 2 +- 25 files changed, 27 insertions(+), 92 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java index 55950284fc00..eac18cead8af 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java @@ -62,7 +62,7 @@ public CosmosAsyncClient initializeClient(CosmosEndToEndOperationLatencyPolicyCo try { createdContainer = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdContainer); + cleanUpContainer(createdContainer); createdDocuments.addAll(this.insertDocuments(DEFAULT_NUM_DOCUMENTS, null, createdContainer)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java index e6d58eb148bc..523430b7fad4 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java @@ -266,7 +266,7 @@ public void before_AggregateQueryTests() throws Throwable { client = this.getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); try { - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); } catch (Throwable throwable) { throwable = Exceptions.unwrap(throwable); if (!(throwable instanceof CosmosException)) { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java index 062c891cf09d..ca386623c9a4 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java @@ -379,7 +379,7 @@ public void afterClass() { public void beforeClass() throws Exception { client = this.getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); bulkInsert(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java index de2b0d9bddd0..f4d86b535a4b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java @@ -161,7 +161,7 @@ private List queryAndGetResults(SqlQuerySpec querySpec, CosmosQueryReques public void beforeClass() throws Exception { client = this.getClientBuilder().buildAsyncClient(); createdContainer = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdContainer); + cleanUpContainer(createdContainer); createdDocuments.addAll(this.insertDocuments( DEFAULT_NUM_DOCUMENTS_PER_PKEY, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java index 1d3f0adc9b08..6ae065c39cdd 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java @@ -195,7 +195,7 @@ public void afterClass() { public void beforeClass() throws Exception { client = this.getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); bulkInsert(); waitIfNeededForReplicasToCatchUp(this.getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java index a5e9e832aa1c..f59d2e1a9752 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java @@ -120,7 +120,7 @@ public void before_MultiOrderByQueryTests() throws Exception { documents = new ArrayList<>(); client = getClientBuilder().buildAsyncClient(); documentCollection = getSharedMultiPartitionCosmosContainerWithCompositeAndSpatialIndexes(client); - truncateCollection(documentCollection); + cleanUpContainer(documentCollection); expectCount(documentCollection, 0); int numberOfDocuments = 4; diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java index 227c4db66133..b3ad601f322f 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java @@ -344,7 +344,7 @@ public void afterClass() { public void beforeClass() throws Exception { client = this.getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); bulkInsert(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java index 3276a14d7d60..2279faa033dc 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java @@ -672,7 +672,7 @@ public void before_OrderbyDocumentQueryTest() throws Exception { client = getClientBuilder().buildAsyncClient(); createdDatabase = getSharedCosmosDatabase(client); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); String containerName = "roundTripsContainer-" + UUID.randomUUID(); createdDatabase.createContainer(containerName, "/mypk", diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java index ff84a3efa2b9..37b8941a265d 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java @@ -485,10 +485,10 @@ public void before_ParallelDocumentQueryTest() { private List prepareCosmosContainer(CosmosAsyncContainer cosmosContainer) { try { - truncateCollection(cosmosContainer); + cleanUpContainer(cosmosContainer); } catch (Throwable firstChanceException) { try { - truncateCollection(cosmosContainer); + cleanUpContainer(cosmosContainer); } catch (Throwable lastChanceException) { String message = String.format("container %s truncation failed due to first chance %s followed by last chance %s", cosmosContainer, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java index bc44a2824acd..80443137a22c 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java @@ -84,7 +84,7 @@ public void beforeClass() throws Exception { client = this.getClientBuilder().buildAsyncClient(); createdDatabase = getSharedCosmosDatabase(client); createdContainer = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdContainer); + cleanUpContainer(createdContainer); createdDocuments.addAll(this.insertDocuments(DEFAULT_NUM_DOCUMENTS, null, createdContainer)); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java index 66c902239162..43b98694dd5c 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java @@ -83,7 +83,7 @@ public void readDocuments_withoutEnableCrossPartitionQuery() { public void before_ReadFeedDocumentsTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); List docDefList = new ArrayList<>(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedStoredProceduresTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedStoredProceduresTest.java index 3a28ac6e3c3d..fa99650d2dd8 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedStoredProceduresTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedStoredProceduresTest.java @@ -56,7 +56,7 @@ public void readStoredProcedures() throws Exception { public void before_ReadFeedStoredProceduresTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for (int i = 0; i < 5; i++) { createdStoredProcedures.add(createStoredProcedures(createdCollection)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedTriggersTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedTriggersTest.java index 555e535b65a6..2c42722406ac 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedTriggersTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedTriggersTest.java @@ -54,7 +54,7 @@ public void readTriggers() throws Exception { public void before_ReadFeedTriggersTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for (int i = 0; i < 5; i++) { this.createdTriggers.add(this.createTriggers(createdCollection)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedUdfsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedUdfsTest.java index 2fb5e20495cb..870689f0c8f9 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedUdfsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedUdfsTest.java @@ -58,7 +58,7 @@ public void readUserDefinedFunctions() throws Exception { public void before_ReadFeedUdfsTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for (int i = 0; i < 5; i++) { createdUserDefinedFunctions.add(createUserDefinedFunctions(createdCollection)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java index ddb421861ef0..0aebb62bfcee 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java @@ -312,7 +312,7 @@ public InternalObjectNode createDocument(CosmosAsyncContainer cosmosContainer, i public void before_SinglePartitionDocumentQueryTest() throws Exception { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedSinglePartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for(int i = 0; i < 5; i++) { createdDocuments.add(createDocument(createdCollection, i)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java index fb8de9b4852a..cf1cf698546a 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java @@ -55,7 +55,7 @@ public void readDocuments() { public void before_SinglePartitionReadFeedDocumentsTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedSinglePartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); List docDefList = new ArrayList<>(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/StoredProcedureQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/StoredProcedureQueryTest.java index e18cd765c41b..7d87e90fce5c 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/StoredProcedureQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/StoredProcedureQueryTest.java @@ -127,7 +127,7 @@ public CosmosStoredProcedureProperties createStoredProc(CosmosAsyncContainer cos public void before_StoredProcedureQueryTest() throws Exception { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for (int i = 0; i < 5; i++) { createdStoredProcs.add(createStoredProc(createdCollection)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 4f93c1617417..8b6c0a8cd109 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -245,77 +245,12 @@ public void afterSuitEmulatorVNext() { } protected static void cleanUpContainer(CosmosAsyncContainer cosmosContainer) { - CosmosContainerProperties cosmosContainerProperties = cosmosContainer.read().block().getProperties(); - String cosmosContainerId = cosmosContainerProperties.getId(); - logger.info("Truncating collection {} ...", cosmosContainerId); - List paths = cosmosContainerProperties.getPartitionKeyDefinition().getPaths(); - CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); - options.setCosmosEndToEndOperationLatencyPolicyConfig( - new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofHours(1)) - .build() - ); - options.setMaxDegreeOfParallelism(-1); - int maxItemCount = 100; - - Flux deleteOperations = - cosmosContainer.queryItems("SELECT * FROM root", options, InternalObjectNode.class) - .byPage(maxItemCount) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .map(doc -> { - PartitionKey partitionKey; - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - Object propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - partitionKey = PartitionKey.NONE; - } else { - partitionKey = new PartitionKey(propertyValue); - } - } else { - partitionKey = new PartitionKey(null); - } - - return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey); - }); - - CosmosBulkExecutionOptions bulkOptions = new CosmosBulkExecutionOptions(); - ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper - .getCosmosBulkExecutionOptionsAccessor() - .getImpl(bulkOptions) - .setCosmosEndToEndLatencyPolicyConfig( - new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) - .build()); - - cosmosContainer.executeBulkOperations(deleteOperations, bulkOptions) - .flatMap(response -> { - if (response.getException() != null) { - Exception ex = response.getException(); - if (ex instanceof CosmosException) { - CosmosException cosmosException = (CosmosException) ex; - if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND - && cosmosException.getSubStatusCode() == 0) { - return Mono.empty(); - } - } - return Mono.error(ex); - } - if (response.getResponse() != null - && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { - return Mono.empty(); - } - return Mono.just(response); - }) - .blockLast(); - } - - protected static void truncateCollection(CosmosAsyncContainer cosmosContainer) { try { int i = 0; while (i < 100) { try { - truncateCollectionInternal(cosmosContainer); + cleanUpContainerInternal(cosmosContainer); return; } catch (CosmosException exception) { if (exception.getStatusCode() != HttpConstants.StatusCodes.TOO_MANY_REQUESTS @@ -357,7 +292,7 @@ protected static void expectCount(CosmosAsyncContainer cosmosContainer, int expe assertThat(counts.get(0)).isEqualTo(expectedCount); } - private static void truncateCollectionInternal(CosmosAsyncContainer cosmosContainer) { + private static void cleanUpContainerInternal(CosmosAsyncContainer cosmosContainer) { CosmosContainerProperties cosmosContainerProperties = cosmosContainer.read().block().getProperties(); String cosmosContainerId = cosmosContainerProperties.getId(); logger.info("Truncating collection {} ...", cosmosContainerId); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java index 1fad912332e9..41562c68cb80 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java @@ -248,7 +248,7 @@ public void afterClass() { public void before_TopQueryTests() throws Exception { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedSinglePartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); bulkInsert(client); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerQueryTest.java index 3b850c42ff18..8866cd5f7c0b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerQueryTest.java @@ -137,7 +137,7 @@ public CosmosTriggerProperties createTrigger(CosmosAsyncContainer cosmosContaine public void before_TriggerQueryTest() throws Exception { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); createdTriggers.clear(); for(int i = 0; i < 5; i++) { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerUpsertReplaceTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerUpsertReplaceTest.java index d2a4870a1c10..1db1bedf1845 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerUpsertReplaceTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerUpsertReplaceTest.java @@ -73,7 +73,7 @@ public void replaceTrigger() throws Exception { public void before_TriggerUpsertReplaceTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); } @AfterClass(groups = { "fast" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionQueryTest.java index 67775af2557e..74229a7ae529 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionQueryTest.java @@ -137,7 +137,7 @@ public CosmosUserDefinedFunctionProperties createUserDefinedFunction(CosmosAsync public void before_UserDefinedFunctionQueryTest() throws Exception { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for(int i = 0; i < 5; i++) { createdUDF.add(createUserDefinedFunction(createdCollection)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionUpsertReplaceTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionUpsertReplaceTest.java index c9d19969c8dc..c432fe871487 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionUpsertReplaceTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionUpsertReplaceTest.java @@ -73,7 +73,7 @@ public void replaceUserDefinedFunction() throws Exception { public void before_UserDefinedFunctionUpsertReplaceTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); } @AfterClass(groups = { "fast" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/VeryLargeDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/VeryLargeDocumentQueryTest.java index 68e68c50731d..9e7b8af99a5f 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/VeryLargeDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/VeryLargeDocumentQueryTest.java @@ -84,7 +84,7 @@ private void createLargeDocument() { public void before_VeryLargeDocumentQueryTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); } @AfterClass(groups = { "query" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 86a048f2ebad..c8345e6b996a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -508,7 +508,7 @@ private RxDocumentClientImpl(URI serviceEndpoint, if (value == null) { return 1; } - + return value + 1; }); From 8ccf0cc238b920624a3dbf185cf8e1b96b15e4dc Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 6 Feb 2026 13:09:30 -0800 Subject: [PATCH 05/10] Address PR #47933 review comments - Add isSuccessStatusCode() validation for non-2xx bulk responses in all 3 sites (impl/TestSuiteBase truncateCollection, rx/TestSuiteBase cleanUpContainerInternal, rx/TestSuiteBase bulkInsert) - Keep 409/Conflict handling in bulkInsert but validate other non-2xx responses - Return server-created items from bulkInsertBlocking via getItem(clazz) - Use PartitionKey.NONE instead of new PartitionKey(null) in cleanUpContainerInternal --- .../cosmos/implementation/TestSuiteBase.java | 5 ++++ .../com/azure/cosmos/rx/TestSuiteBase.java | 28 +++++++++++++++---- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java index cfb5e01fec65..f2cb95647921 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java @@ -257,6 +257,11 @@ protected static void truncateCollection(DocumentCollection collection) { && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { return Mono.empty(); } + if (response.getResponse() != null + && !response.getResponse().isSuccessStatusCode()) { + return Mono.error(new IllegalStateException( + "Bulk delete operation failed with status code " + response.getResponse().getStatusCode())); + } return Mono.just(response); }) .blockLast(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 8b6c0a8cd109..921994bbaeaa 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -323,7 +323,7 @@ private static void cleanUpContainerInternal(CosmosAsyncContainer cosmosContaine partitionKey = new PartitionKey(propertyValue); } } else { - partitionKey = new PartitionKey(null); + partitionKey = PartitionKey.NONE; } return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey); @@ -355,6 +355,11 @@ private static void cleanUpContainerInternal(CosmosAsyncContainer cosmosContaine && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { return Mono.empty(); } + if (response.getResponse() != null + && !response.getResponse().isSuccessStatusCode()) { + return Mono.error(new IllegalStateException( + "Bulk delete operation failed with status code " + response.getResponse().getStatusCode())); + } return Mono.just(response); }) .blockLast(); @@ -639,18 +644,31 @@ public Flux> bulkInsert(CosmosAsyncConta } return Mono.error(ex); } + if (response.getResponse() != null + && !response.getResponse().isSuccessStatusCode() + && response.getResponse().getStatusCode() != HttpConstants.StatusCodes.CONFLICT) { + return Mono.error(new IllegalStateException( + "Bulk insert operation failed with status code " + response.getResponse().getStatusCode())); + } return Mono.just(response); }); } + @SuppressWarnings("unchecked") public List bulkInsertBlocking(CosmosAsyncContainer cosmosContainer, List documentDefinitionList) { - bulkInsert(cosmosContainer, documentDefinitionList) + if (documentDefinitionList == null || documentDefinitionList.isEmpty()) { + return documentDefinitionList; + } + + Class clazz = (Class) documentDefinitionList.get(0).getClass(); + + return bulkInsert(cosmosContainer, documentDefinitionList) .publishOn(Schedulers.parallel()) - .then() + .filter(response -> response.getResponse() != null) + .map(response -> response.getResponse().getItem(clazz)) + .collectList() .block(); - - return documentDefinitionList; } public void voidBulkInsertBlocking(CosmosAsyncContainer cosmosContainer, List documentDefinitionList) { From 6e960fc06b95f53f2d9796ddab14ba742ee0e692 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 6 Feb 2026 13:30:35 -0800 Subject: [PATCH 06/10] simplify code --- .../cosmos/implementation/TestSuiteBase.java | 285 +++++++----------- 1 file changed, 116 insertions(+), 169 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java index f2cb95647921..8a6eca9855ac 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java @@ -4,6 +4,7 @@ import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.CosmosClientBuilder; import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; import com.azure.cosmos.CosmosException; @@ -178,177 +179,123 @@ public void afterSuite() { protected static void truncateCollection(DocumentCollection collection) { logger.info("Truncating DocumentCollection {} ...", collection.getId()); - AsyncDocumentClient houseKeepingClient = createGatewayHouseKeepingDocumentClient().build(); - try { - List paths = collection.getPartitionKey().getPaths(); - - try (CosmosAsyncClient cosmosClient = new CosmosClientBuilder() - .key(TestConfigurations.MASTER_KEY) - .endpoint(TestConfigurations.HOST) - .buildAsyncClient()) { - CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); - options.setMaxDegreeOfParallelism(-1); - QueryFeedOperationState state = new QueryFeedOperationState( - cosmosClient, - "truncateCollection", - collection.getSelfLink(), - collection.getId(), - ResourceType.Document, - OperationType.Query, - null, - options, - new CosmosPagedFluxOptions() - ); - - ModelBridgeInternal.setQueryRequestOptionsMaxItemCount(options, 100); - - logger.info("Truncating DocumentCollection {} documents ...", collection.getId()); - - String altLink = collection.getAltLink(); - String[] altLinkSegments = altLink.split("/"); - // altLink format: dbs/{dbName}/colls/{collName} - String databaseName = altLinkSegments[1]; - String containerName = altLinkSegments[3]; - - Flux deleteOperations = - houseKeepingClient.queryDocuments(collection.getSelfLink(), "SELECT * FROM root", state, Document.class) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .map(doc -> { - PartitionKey partitionKey; - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - Object propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - propertyValue = Undefined.value(); - } - partitionKey = new PartitionKey(propertyValue); - } else { - partitionKey = PartitionKey.NONE; - } - - return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey); - }); - - CosmosBulkExecutionOptions bulkOptions = new CosmosBulkExecutionOptions(); - ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper - .getCosmosBulkExecutionOptionsAccessor() - .getImpl(bulkOptions) - .setCosmosEndToEndLatencyPolicyConfig( - new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) - .build()); - - cosmosClient.getDatabase(databaseName) - .getContainer(containerName) - .executeBulkOperations(deleteOperations, bulkOptions) - .flatMap(response -> { - if (response.getException() != null) { - Exception ex = response.getException(); - if (ex instanceof CosmosException) { - CosmosException cosmosException = (CosmosException) ex; - if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND - && cosmosException.getSubStatusCode() == 0) { - return Mono.empty(); - } + List paths = collection.getPartitionKey().getPaths(); + + try (CosmosAsyncClient cosmosClient = new CosmosClientBuilder() + .key(TestConfigurations.MASTER_KEY) + .endpoint(TestConfigurations.HOST) + .buildAsyncClient()) { + + CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); + options.setMaxDegreeOfParallelism(-1); + + ModelBridgeInternal.setQueryRequestOptionsMaxItemCount(options, 100); + + logger.info("Truncating DocumentCollection {} documents ...", collection.getId()); + + String altLink = collection.getAltLink(); + String[] altLinkSegments = altLink.split("/"); + // altLink format: dbs/{dbName}/colls/{collName} + String databaseName = altLinkSegments[1]; + String containerName = altLinkSegments[3]; + + CosmosAsyncContainer container = cosmosClient.getDatabase(databaseName).getContainer(containerName); + + Flux deleteOperations = + container + .queryItems( "SELECT * FROM root", Document.class) + .byPage() + .publishOn(Schedulers.parallel()) + .flatMap(page -> Flux.fromIterable(page.getResults())) + .map(doc -> { + PartitionKey partitionKey; + if (paths != null && !paths.isEmpty()) { + List pkPath = PathParser.getPathParts(paths.get(0)); + Object propertyValue = doc.getObjectByPath(pkPath); + if (propertyValue == null) { + propertyValue = Undefined.value(); + } + partitionKey = new PartitionKey(propertyValue); + } else { + partitionKey = PartitionKey.NONE; + } + + return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey); + }); + + CosmosBulkExecutionOptions bulkOptions = new CosmosBulkExecutionOptions(); + ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor() + .getImpl(bulkOptions) + .setCosmosEndToEndLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) + .build()); + + cosmosClient.getDatabase(databaseName) + .getContainer(containerName) + .executeBulkOperations(deleteOperations, bulkOptions) + .flatMap(response -> { + if (response.getException() != null) { + Exception ex = response.getException(); + if (ex instanceof CosmosException) { + CosmosException cosmosException = (CosmosException) ex; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND + && cosmosException.getSubStatusCode() == 0) { + return Mono.empty(); } - return Mono.error(ex); - } - if (response.getResponse() != null - && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { - return Mono.empty(); } - if (response.getResponse() != null - && !response.getResponse().isSuccessStatusCode()) { - return Mono.error(new IllegalStateException( - "Bulk delete operation failed with status code " + response.getResponse().getStatusCode())); - } - return Mono.just(response); - }) - .blockLast(); - - logger.info("Truncating DocumentCollection {} triggers ...", collection.getId()); - - state = new QueryFeedOperationState( - cosmosClient, - "truncateTriggers", - collection.getSelfLink(), - collection.getId(), - ResourceType.Document, - OperationType.Query, - null, - options, - new CosmosPagedFluxOptions() - ); - houseKeepingClient.queryTriggers(collection.getSelfLink(), "SELECT * FROM root", state) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(trigger -> { - RequestOptions requestOptions = new RequestOptions(); - - // if (paths != null && !paths.isEmpty()) { - // Object propertyValue = trigger.getObjectByPath(PathParser.getPathParts(paths.get(0))); - // requestOptions.partitionKey(new PartitionKey(propertyValue)); - // } - - return houseKeepingClient.deleteTrigger(trigger.getSelfLink(), requestOptions); - }).then().block(); - - logger.info("Truncating DocumentCollection {} storedProcedures ...", collection.getId()); - - state = new QueryFeedOperationState( - cosmosClient, - "truncateStoredProcs", - collection.getSelfLink(), - collection.getId(), - ResourceType.Document, - OperationType.Query, - null, - options, - new CosmosPagedFluxOptions() - ); - houseKeepingClient.queryStoredProcedures(collection.getSelfLink(), "SELECT * FROM root", state) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(storedProcedure -> { - RequestOptions requestOptions = new RequestOptions(); - - // if (paths != null && !paths.isEmpty()) { - // Object propertyValue = storedProcedure.getObjectByPath(PathParser.getPathParts(paths.get(0))); - // requestOptions.partitionKey(new PartitionKey(propertyValue)); - // } - - return houseKeepingClient.deleteStoredProcedure(storedProcedure.getSelfLink(), requestOptions); - }).then().block(); - - logger.info("Truncating DocumentCollection {} udfs ...", collection.getId()); - - state = new QueryFeedOperationState( - cosmosClient, - "truncateUserDefinedFunctions", - collection.getSelfLink(), - collection.getId(), - ResourceType.Document, - OperationType.Query, - null, - options, - new CosmosPagedFluxOptions() - ); - houseKeepingClient.queryUserDefinedFunctions(collection.getSelfLink(), "SELECT * FROM root", state) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(udf -> { - RequestOptions requestOptions = new RequestOptions(); - - // if (paths != null && !paths.isEmpty()) { - // Object propertyValue = udf.getObjectByPath(PathParser.getPathParts(paths.get(0))); - // requestOptions.partitionKey(new PartitionKey(propertyValue)); - // } - - return houseKeepingClient.deleteUserDefinedFunction(udf.getSelfLink(), requestOptions); - }).then().block(); - } - } finally { - houseKeepingClient.close(); + return Mono.error(ex); + } + if (response.getResponse() != null + && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { + return Mono.empty(); + } + if (response.getResponse() != null + && !response.getResponse().isSuccessStatusCode()) { + return Mono.error(new IllegalStateException( + "Bulk delete operation failed with status code " + response.getResponse().getStatusCode())); + } + return Mono.just(response); + }) + .blockLast(); + + logger.info("Truncating DocumentCollection {} triggers ...", collection.getId()); + + container + .getScripts() + .queryTriggers("SELECT * FROM root", new CosmosQueryRequestOptions()) + .byPage() + .publishOn(Schedulers.parallel()) + .flatMap(page -> Flux.fromIterable(page.getResults())) + .flatMap(trigger -> container.getScripts().getTrigger(trigger.getId()).delete()).then().block(); + + logger.info("Truncating DocumentCollection {} storedProcedures ...", collection.getId()); + + container + .getScripts() + .queryStoredProcedures("SELECT * FROM root", new CosmosQueryRequestOptions()) + .byPage() + .publishOn(Schedulers.parallel()) + .flatMap(page -> Flux.fromIterable(page.getResults())) + .flatMap(storedProcedure -> { + return container.getScripts().getStoredProcedure(storedProcedure.getId()).delete(); + }) + .then() + .block(); + + logger.info("Truncating DocumentCollection {} udfs ...", collection.getId()); + + container + .getScripts() + .queryUserDefinedFunctions("SELECT * FROM root", new CosmosQueryRequestOptions()) + .byPage() + .publishOn(Schedulers.parallel()).flatMap(page -> Flux.fromIterable(page.getResults())) + .flatMap(udf -> { + RequestOptions requestOptions = new RequestOptions(); + return container.getScripts().getUserDefinedFunction(udf.getId()).delete(); + }) + .then() + .block(); } logger.info("Finished truncating DocumentCollection {}.", collection.getId()); From fab4fd009fc426b2feabf10eb4be387d60ca3cfc Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 6 Feb 2026 13:40:13 -0800 Subject: [PATCH 07/10] Replace IllegalStateException with CosmosException in bulk error propagation Use BridgeInternal.createCosmosException() with status code and substatus code from CosmosBulkItemResponse instead of generic IllegalStateException. --- .../azure/cosmos/implementation/TestSuiteBase.java | 8 ++++++-- .../java/com/azure/cosmos/rx/TestSuiteBase.java | 14 ++++++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java index 8a6eca9855ac..0672d930c991 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation; +import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosAsyncContainer; @@ -252,8 +253,11 @@ protected static void truncateCollection(DocumentCollection collection) { } if (response.getResponse() != null && !response.getResponse().isSuccessStatusCode()) { - return Mono.error(new IllegalStateException( - "Bulk delete operation failed with status code " + response.getResponse().getStatusCode())); + CosmosException bulkException = BridgeInternal.createCosmosException( + response.getResponse().getStatusCode(), + "Bulk delete operation failed with status code " + response.getResponse().getStatusCode()); + BridgeInternal.setSubStatusCode(bulkException, response.getResponse().getSubStatusCode()); + return Mono.error(bulkException); } return Mono.just(response); }) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 921994bbaeaa..b335371d9f04 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -357,8 +357,11 @@ private static void cleanUpContainerInternal(CosmosAsyncContainer cosmosContaine } if (response.getResponse() != null && !response.getResponse().isSuccessStatusCode()) { - return Mono.error(new IllegalStateException( - "Bulk delete operation failed with status code " + response.getResponse().getStatusCode())); + CosmosException bulkException = BridgeInternal.createCosmosException( + response.getResponse().getStatusCode(), + "Bulk delete operation failed with status code " + response.getResponse().getStatusCode()); + BridgeInternal.setSubStatusCode(bulkException, response.getResponse().getSubStatusCode()); + return Mono.error(bulkException); } return Mono.just(response); }) @@ -647,8 +650,11 @@ public Flux> bulkInsert(CosmosAsyncConta if (response.getResponse() != null && !response.getResponse().isSuccessStatusCode() && response.getResponse().getStatusCode() != HttpConstants.StatusCodes.CONFLICT) { - return Mono.error(new IllegalStateException( - "Bulk insert operation failed with status code " + response.getResponse().getStatusCode())); + CosmosException bulkException = BridgeInternal.createCosmosException( + response.getResponse().getStatusCode(), + "Bulk insert operation failed with status code " + response.getResponse().getStatusCode()); + BridgeInternal.setSubStatusCode(bulkException, response.getResponse().getSubStatusCode()); + return Mono.error(bulkException); } return Mono.just(response); }); From 6df45b338cb984f2aaa59fd1ede78da65705aa7e Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 6 Feb 2026 15:18:31 -0800 Subject: [PATCH 08/10] test --- .../cosmos/implementation/TestSuiteBase.java | 12 ++---------- .../com/azure/cosmos/rx/TestSuiteBase.java | 18 ++++++------------ 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java index 0672d930c991..e8f7abc509ab 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java @@ -180,7 +180,6 @@ public void afterSuite() { protected static void truncateCollection(DocumentCollection collection) { logger.info("Truncating DocumentCollection {} ...", collection.getId()); - List paths = collection.getPartitionKey().getPaths(); try (CosmosAsyncClient cosmosClient = new CosmosClientBuilder() .key(TestConfigurations.MASTER_KEY) @@ -209,15 +208,8 @@ protected static void truncateCollection(DocumentCollection collection) { .publishOn(Schedulers.parallel()) .flatMap(page -> Flux.fromIterable(page.getResults())) .map(doc -> { - PartitionKey partitionKey; - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - Object propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - propertyValue = Undefined.value(); - } - partitionKey = new PartitionKey(propertyValue); - } else { + PartitionKey partitionKey = PartitionKeyHelper.extractPartitionKeyFromDocument(doc, collection.getPartitionKey()); + if (partitionKey == null) { partitionKey = PartitionKey.NONE; } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index b335371d9f04..97adc285c015 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -31,7 +31,6 @@ import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.InternalObjectNode; import com.azure.cosmos.implementation.PartitionKeyHelper; -import com.azure.cosmos.implementation.PathParser; import com.azure.cosmos.implementation.QueryFeedOperationState; import com.azure.cosmos.implementation.Resource; import com.azure.cosmos.implementation.TestConfigurations; @@ -296,7 +295,6 @@ private static void cleanUpContainerInternal(CosmosAsyncContainer cosmosContaine CosmosContainerProperties cosmosContainerProperties = cosmosContainer.read().block().getProperties(); String cosmosContainerId = cosmosContainerProperties.getId(); logger.info("Truncating collection {} ...", cosmosContainerId); - List paths = cosmosContainerProperties.getPartitionKeyDefinition().getPaths(); CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); options.setCosmosEndToEndOperationLatencyPolicyConfig( new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofHours(1)) @@ -313,16 +311,12 @@ private static void cleanUpContainerInternal(CosmosAsyncContainer cosmosContaine .publishOn(Schedulers.parallel()) .flatMap(page -> Flux.fromIterable(page.getResults())) .map(doc -> { - PartitionKey partitionKey; - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - Object propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - partitionKey = PartitionKey.NONE; - } else { - partitionKey = new PartitionKey(propertyValue); - } - } else { + PartitionKey partitionKey = + PartitionKeyHelper.extractPartitionKeyFromDocument( + doc, + cosmosContainerProperties.getPartitionKeyDefinition()); + + if (partitionKey == null) { partitionKey = PartitionKey.NONE; } From 797ea8bd232e18298b914d2753999e917c14b1e4 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 6 Feb 2026 19:36:47 -0800 Subject: [PATCH 09/10] Add bulkEnabled parameter and rename insert methods - Add insertUsingPointOperations for one-by-one createItem path - Add bulkEnabled parameter to insertAllItemsBlocking/voidInsertAllItemsBlocking - Rename bulkInsertBlocking -> insertAllItemsBlocking - Rename voidBulkInsertBlocking -> voidInsertAllItemsBlocking - Update all callers with explicit bulkEnabled=true --- .../EndToEndTimeOutValidationTests.java | 2 +- .../azure/cosmos/rx/AggregateQueryTests.java | 2 +- .../rx/BackPressureCrossPartitionTest.java | 5 ++- .../com/azure/cosmos/rx/BackPressureTest.java | 2 +- ...ContainerCreateDeleteWithSameNameTest.java | 2 +- .../azure/cosmos/rx/DistinctQueryTests.java | 4 +- .../azure/cosmos/rx/FeedRangeQueryTests.java | 2 +- .../azure/cosmos/rx/GroupByQueryTests.java | 2 +- .../cosmos/rx/MultiOrderByQueryTests.java | 2 +- .../cosmos/rx/OffsetLimitQueryTests.java | 2 +- .../cosmos/rx/OrderbyDocumentQueryTest.java | 2 +- .../cosmos/rx/ParallelDocumentQueryTest.java | 2 +- .../azure/cosmos/rx/QueryValidationTests.java | 4 +- .../cosmos/rx/ReadFeedDocumentsTest.java | 2 +- .../SinglePartitionReadFeedDocumentsTest.java | 2 +- .../com/azure/cosmos/rx/TestSuiteBase.java | 42 +++++++++++++++++-- .../ChangeFeedProcessorMigrationTests.java | 2 +- .../FullFidelityChangeFeedProcessorTest.java | 4 +- .../IncrementalChangeFeedProcessorTest.java | 4 +- .../IncrementalChangeFeedProcessorTest.java | 6 +-- 20 files changed, 66 insertions(+), 29 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java index eac18cead8af..aa5004539f92 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java @@ -504,7 +504,7 @@ private List insertDocuments(int documentCount, List partiti partitionKeys == null ? UUID.randomUUID().toString() : partitionKeys.get(random.nextInt(partitionKeys.size())))); } - List documentInserted = bulkInsertBlocking(container, documentsToInsert); + List documentInserted = insertAllItemsBlocking(container, documentsToInsert, true); waitIfNeededForReplicasToCatchUp(this.getClientBuilder()); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java index 523430b7fad4..02a59815e730 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java @@ -113,7 +113,7 @@ public void queryDocumentsWithAggregates(Boolean qmEnabled) throws Exception { public void bulkInsert() { generateTestData(); - voidBulkInsertBlocking(createdCollection, docs); + voidInsertAllItemsBlocking(createdCollection, docs, true); } public void generateTestData() { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java index 08d3927a162b..9abcee71b131 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java @@ -206,9 +206,10 @@ public void before_BackPressureCrossPartitionTest() { docDefList.add(getDocumentDefinition(i)); } - createdDocuments = bulkInsertBlocking( + createdDocuments = insertAllItemsBlocking( createdCollection, - docDefList); + docDefList, + true); numberOfPartitions = CosmosBridgeInternal.getAsyncDocumentClient(client).readPartitionKeyRanges(getCollectionLink(), (CosmosQueryRequestOptions) null) .flatMap(p -> Flux.fromIterable(p.getResults())).collectList().single().block().size(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureTest.java index c133d0e81241..bac1eb1719f4 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureTest.java @@ -285,7 +285,7 @@ public void before_BackPressureTest() throws Exception { docDefList.add(getDocumentDefinition(i)); } - createdDocuments = bulkInsertBlocking(createdCollection, docDefList); + createdDocuments = insertAllItemsBlocking(createdCollection, docDefList, true); waitIfNeededForReplicasToCatchUp(getClientBuilder()); warmUp(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ContainerCreateDeleteWithSameNameTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ContainerCreateDeleteWithSameNameTest.java index 55a06016b767..9de5984d98ee 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ContainerCreateDeleteWithSameNameTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ContainerCreateDeleteWithSameNameTest.java @@ -929,7 +929,7 @@ private void setupReadFeedDocuments(List createdDocuments, CosmosAsy docDefList.add(TestObject.creatNewTestObject()); } - createdDocuments.addAll(bulkInsertBlocking(feedContainer, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedContainer, docDefList, true)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java index ca386623c9a4..c2c6fb4cf5ff 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java @@ -325,8 +325,8 @@ public void queryDocumentsWithOrderBy(String query, boolean matchedOrderBy) { public void bulkInsert() { generateTestData(); - voidBulkInsertBlocking(createdCollection, docs); - voidBulkInsertBlocking(createdCollection, propertiesDocs); + voidInsertAllItemsBlocking(createdCollection, docs, true); + voidInsertAllItemsBlocking(createdCollection, propertiesDocs, true); } public void generateTestData() { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java index f4d86b535a4b..21775691d2ef 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java @@ -187,7 +187,7 @@ private List insertDocuments( .size())))); } - List documentInserted = bulkInsertBlocking(container, documentsToInsert); + List documentInserted = insertAllItemsBlocking(container, documentsToInsert, true); waitIfNeededForReplicasToCatchUp(this.getClientBuilder()); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java index 6ae065c39cdd..23428c9b7544 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java @@ -145,7 +145,7 @@ public void queryDocuments(Triple, Integer> gro public void bulkInsert() { generateTestData(INSERT_DOCUMENTS_CNT); - voidBulkInsertBlocking(createdCollection, docs); + voidInsertAllItemsBlocking(createdCollection, docs, true); } public void generateTestData(int documentCnt) { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java index f59d2e1a9752..9a6427952ca6 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java @@ -161,7 +161,7 @@ public void before_MultiOrderByQueryTests() throws Exception { } } - voidBulkInsertBlocking(documentCollection, documents); + voidInsertAllItemsBlocking(documentCollection, documents, true); expectCount(documentCollection, documents.size()); waitIfNeededForReplicasToCatchUp(getClientBuilder()); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java index b3ad601f322f..1e8824d8a0ff 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java @@ -305,7 +305,7 @@ private List queryWithContinuationTokens(String query, int p public void bulkInsert() { generateTestData(); - voidBulkInsertBlocking(createdCollection, docs); + voidInsertAllItemsBlocking(createdCollection, docs, true); } public void generateTestData() { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java index 2279faa033dc..821ca11c4898 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java @@ -653,7 +653,7 @@ public List bulkInsertDocs(CosmosAsyncContainer cosmosContai result.add(docDefinition); } - return bulkInsertBlocking(cosmosContainer, result); + return insertAllItemsBlocking(cosmosContainer, result, true); } @BeforeMethod(groups = { "query" }) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java index 37b8941a265d..94d3895a0177 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java @@ -509,7 +509,7 @@ private List prepareCosmosContainer(CosmosAsyncContainer cos docDefList.add(getDocumentDefinition(99)); } - List items = bulkInsertBlocking(cosmosContainer, docDefList); + List items = insertAllItemsBlocking(cosmosContainer, docDefList, true); waitIfNeededForReplicasToCatchUp(getClientBuilder()); return items; } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java index 80443137a22c..dc3b508f4a0a 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java @@ -570,7 +570,7 @@ private List createDocumentsWithUndefinedAndNullValues(CosmosAsyncCo } docsToInsert.add(objectNode); } - return bulkInsertBlocking(container, docsToInsert); + return insertAllItemsBlocking(container, docsToInsert, true); } @Test(groups = {"query"}, timeOut = TIMEOUT) @@ -668,7 +668,7 @@ private List insertDocuments(int documentCount, List partiti partitionKeys == null ? UUID.randomUUID().toString() : partitionKeys.get(random.nextInt(partitionKeys.size())))); } - List documentInserted = bulkInsertBlocking(container, documentsToInsert); + List documentInserted = insertAllItemsBlocking(container, documentsToInsert, true); waitIfNeededForReplicasToCatchUp(this.getClientBuilder()); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java index 43b98694dd5c..5fd5100b5855 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java @@ -91,7 +91,7 @@ public void before_ReadFeedDocumentsTest() { docDefList.add(getDocumentDefinition()); } - createdDocuments = bulkInsertBlocking(createdCollection, docDefList); + createdDocuments = insertAllItemsBlocking(createdCollection, docDefList, true); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java index cf1cf698546a..a975079aa354 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java @@ -63,7 +63,7 @@ public void before_SinglePartitionReadFeedDocumentsTest() { docDefList.add(getDocumentDefinition()); } - createdDocuments = bulkInsertBlocking(createdCollection, docDefList); + createdDocuments = insertAllItemsBlocking(createdCollection, docDefList, true); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 97adc285c015..5ec2b6241653 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -101,6 +101,7 @@ public abstract class TestSuiteBase extends CosmosAsyncClientTest { + private static final int DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL = 5; private static final ObjectMapper objectMapper = new ObjectMapper(); protected static final int TIMEOUT = 40000; @@ -653,14 +654,39 @@ public Flux> bulkInsert(CosmosAsyncConta return Mono.just(response); }); } + + private Flux> insertUsingPointOperations(CosmosAsyncContainer cosmosContainer, + List documentDefinitionList) { + CosmosItemRequestOptions options = new CosmosItemRequestOptions() + .setCosmosEndToEndOperationLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofHours(1)) + .build() + ); + + List>> result = new ArrayList<>(documentDefinitionList.size()); + for (T docDef : documentDefinitionList) { + result.add(cosmosContainer.createItem(docDef, options)); + } + + return Flux.merge(Flux.fromIterable(result), DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL); + } @SuppressWarnings("unchecked") - public List bulkInsertBlocking(CosmosAsyncContainer cosmosContainer, - List documentDefinitionList) { + public List insertAllItemsBlocking(CosmosAsyncContainer cosmosContainer, + List documentDefinitionList, + boolean bulkEnabled) { if (documentDefinitionList == null || documentDefinitionList.isEmpty()) { return documentDefinitionList; } + if (!bulkEnabled) { + return insertUsingPointOperations(cosmosContainer, documentDefinitionList) + .publishOn(Schedulers.parallel()) + .map(CosmosItemResponse::getItem) + .collectList() + .block(); + } + Class clazz = (Class) documentDefinitionList.get(0).getClass(); return bulkInsert(cosmosContainer, documentDefinitionList) @@ -671,7 +697,17 @@ public List bulkInsertBlocking(CosmosAsyncContainer cosmosContainer, .block(); } - public void voidBulkInsertBlocking(CosmosAsyncContainer cosmosContainer, List documentDefinitionList) { + public void voidInsertAllItemsBlocking(CosmosAsyncContainer cosmosContainer, + List documentDefinitionList, + boolean bulkEnabled) { + if (!bulkEnabled) { + insertUsingPointOperations(cosmosContainer, documentDefinitionList) + .publishOn(Schedulers.parallel()) + .then() + .block(); + return; + } + bulkInsert(cosmosContainer, documentDefinitionList) .publishOn(Schedulers.parallel()) .then() diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/ChangeFeedProcessorMigrationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/ChangeFeedProcessorMigrationTests.java index b9d474ccb0a4..f9b4c51535f7 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/ChangeFeedProcessorMigrationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/ChangeFeedProcessorMigrationTests.java @@ -211,7 +211,7 @@ private void setupReadFeedDocuments( docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, true)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java index abc7348a14f0..0f28d36c42e8 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java @@ -2153,7 +2153,7 @@ private void setupReadFeedDocuments(List createdDocuments, C logger.info("Adding the following item to bulk list: {}", item); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } @@ -2164,7 +2164,7 @@ private void createReadFeedDocuments(List createdDocuments, docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index 0c91d6ec5039..fd4166871667 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -2350,7 +2350,7 @@ private void setupReadFeedDocuments( docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); for (InternalObjectNode current : createdDocuments) { try { logger.info("CREATED {}", OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(current)); @@ -2368,7 +2368,7 @@ private void createReadFeedDocuments(List createdDocuments, docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index 4e54d81ba417..6525617d94dd 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -2236,7 +2236,7 @@ private void setupReadFeedDocuments(List createdDocuments, C docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } @@ -2247,7 +2247,7 @@ private void setupReadFeedDocumentsForAllVersionsAndDeletes(List createdDocuments, docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } From 4e8b099a4de607ee0f7d2f41681a95bfcfc692aa Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 30 Jan 2026 13:52:36 -0800 Subject: [PATCH 10/10] fix test pipeline --- .../com/azure/cosmos/implementation/RxDocumentClientImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 4c9643a3eafd..083389edcd55 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -510,7 +510,7 @@ private RxDocumentClientImpl(URI serviceEndpoint, if (value == null) { return 1; } - + return value + 1; });