diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java index b0d6543dd70c..9875951afac0 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java @@ -66,7 +66,6 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -592,8 +591,6 @@ public void databaseAccountToClients() { cosmosDiagnosticsNode.get("clientCfgs").get("clientEndpoints").get(TestConfigurations.HOST).asInt(); assertThat(updatedClientCount).isEqualTo(clientCount + 1); - } catch (JsonMappingException e) { - throw new RuntimeException(e); } catch (JsonProcessingException e) { throw new RuntimeException(e); } finally { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java index 55950284fc00..aa5004539f92 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutValidationTests.java @@ -62,7 +62,7 @@ public CosmosAsyncClient initializeClient(CosmosEndToEndOperationLatencyPolicyCo try { createdContainer = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdContainer); + cleanUpContainer(createdContainer); createdDocuments.addAll(this.insertDocuments(DEFAULT_NUM_DOCUMENTS, null, createdContainer)); @@ -504,7 +504,7 @@ private List insertDocuments(int documentCount, List partiti partitionKeys == null ? UUID.randomUUID().toString() : partitionKeys.get(random.nextInt(partitionKeys.size())))); } - List documentInserted = bulkInsertBlocking(container, documentsToInsert); + List documentInserted = insertAllItemsBlocking(container, documentsToInsert, true); waitIfNeededForReplicasToCatchUp(this.getClientBuilder()); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java index 5c59341d59c0..e8f7abc509ab 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java @@ -2,9 +2,13 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation; +import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; +import com.azure.cosmos.CosmosException; import com.azure.cosmos.CosmosNettyLeakDetectorFactory; import com.azure.cosmos.DirectConnectionConfig; import com.azure.cosmos.DocumentClientTest; @@ -19,7 +23,10 @@ import com.azure.cosmos.models.CompositePath; import com.azure.cosmos.models.CompositePathSortOrder; import com.azure.cosmos.models.CosmosClientTelemetryConfig; +import com.azure.cosmos.models.CosmosItemOperation; import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperations; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.IncludedPath; import com.azure.cosmos.models.IndexingPolicy; @@ -173,134 +180,118 @@ public void afterSuite() { protected static void truncateCollection(DocumentCollection collection) { logger.info("Truncating DocumentCollection {} ...", collection.getId()); - AsyncDocumentClient houseKeepingClient = createGatewayHouseKeepingDocumentClient().build(); - try { - List paths = collection.getPartitionKey().getPaths(); - - try (CosmosAsyncClient cosmosClient = new CosmosClientBuilder() - .key(TestConfigurations.MASTER_KEY) - .endpoint(TestConfigurations.HOST) - .buildAsyncClient()) { - CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); - options.setMaxDegreeOfParallelism(-1); - QueryFeedOperationState state = new QueryFeedOperationState( - cosmosClient, - "truncateCollection", - collection.getSelfLink(), - collection.getId(), - ResourceType.Document, - OperationType.Query, - null, - options, - new CosmosPagedFluxOptions() - ); - - ModelBridgeInternal.setQueryRequestOptionsMaxItemCount(options, 100); - - logger.info("Truncating DocumentCollection {} documents ...", collection.getId()); - - houseKeepingClient.queryDocuments(collection.getSelfLink(), "SELECT * FROM root", state, Document.class) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(doc -> { - RequestOptions requestOptions = new RequestOptions(); - - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - Object propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - propertyValue = Undefined.value(); - } - - requestOptions.setPartitionKey(new PartitionKey(propertyValue)); - } - - return houseKeepingClient.deleteDocument(doc.getSelfLink(), requestOptions); - }).then().block(); - - logger.info("Truncating DocumentCollection {} triggers ...", collection.getId()); - - state = new QueryFeedOperationState( - cosmosClient, - "truncateTriggers", - collection.getSelfLink(), - collection.getId(), - ResourceType.Document, - OperationType.Query, - null, - options, - new CosmosPagedFluxOptions() - ); - houseKeepingClient.queryTriggers(collection.getSelfLink(), "SELECT * FROM root", state) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(trigger -> { - RequestOptions requestOptions = new RequestOptions(); - - // if (paths != null && !paths.isEmpty()) { - // Object propertyValue = trigger.getObjectByPath(PathParser.getPathParts(paths.get(0))); - // requestOptions.partitionKey(new PartitionKey(propertyValue)); - // } - - return houseKeepingClient.deleteTrigger(trigger.getSelfLink(), requestOptions); - }).then().block(); - - logger.info("Truncating DocumentCollection {} storedProcedures ...", collection.getId()); - - state = new QueryFeedOperationState( - cosmosClient, - "truncateStoredProcs", - collection.getSelfLink(), - collection.getId(), - ResourceType.Document, - OperationType.Query, - null, - options, - new CosmosPagedFluxOptions() - ); - houseKeepingClient.queryStoredProcedures(collection.getSelfLink(), "SELECT * FROM root", state) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(storedProcedure -> { - RequestOptions requestOptions = new RequestOptions(); - - // if (paths != null && !paths.isEmpty()) { - // Object propertyValue = storedProcedure.getObjectByPath(PathParser.getPathParts(paths.get(0))); - // requestOptions.partitionKey(new PartitionKey(propertyValue)); - // } - - return houseKeepingClient.deleteStoredProcedure(storedProcedure.getSelfLink(), requestOptions); - }).then().block(); - - logger.info("Truncating DocumentCollection {} udfs ...", collection.getId()); - - state = new QueryFeedOperationState( - cosmosClient, - "truncateUserDefinedFunctions", - collection.getSelfLink(), - collection.getId(), - ResourceType.Document, - OperationType.Query, - null, - options, - new CosmosPagedFluxOptions() - ); - houseKeepingClient.queryUserDefinedFunctions(collection.getSelfLink(), "SELECT * FROM root", state) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(udf -> { - RequestOptions requestOptions = new RequestOptions(); - - // if (paths != null && !paths.isEmpty()) { - // Object propertyValue = udf.getObjectByPath(PathParser.getPathParts(paths.get(0))); - // requestOptions.partitionKey(new PartitionKey(propertyValue)); - // } - - return houseKeepingClient.deleteUserDefinedFunction(udf.getSelfLink(), requestOptions); - }).then().block(); - } - } finally { - houseKeepingClient.close(); + + try (CosmosAsyncClient cosmosClient = new CosmosClientBuilder() + .key(TestConfigurations.MASTER_KEY) + .endpoint(TestConfigurations.HOST) + .buildAsyncClient()) { + + CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); + options.setMaxDegreeOfParallelism(-1); + + ModelBridgeInternal.setQueryRequestOptionsMaxItemCount(options, 100); + + logger.info("Truncating DocumentCollection {} documents ...", collection.getId()); + + String altLink = collection.getAltLink(); + String[] altLinkSegments = altLink.split("/"); + // altLink format: dbs/{dbName}/colls/{collName} + String databaseName = altLinkSegments[1]; + String containerName = altLinkSegments[3]; + + CosmosAsyncContainer container = cosmosClient.getDatabase(databaseName).getContainer(containerName); + + Flux deleteOperations = + container + .queryItems( "SELECT * FROM root", Document.class) + .byPage() + .publishOn(Schedulers.parallel()) + .flatMap(page -> Flux.fromIterable(page.getResults())) + .map(doc -> { + PartitionKey partitionKey = PartitionKeyHelper.extractPartitionKeyFromDocument(doc, collection.getPartitionKey()); + if (partitionKey == null) { + partitionKey = PartitionKey.NONE; + } + + return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey); + }); + + CosmosBulkExecutionOptions bulkOptions = new CosmosBulkExecutionOptions(); + ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor() + .getImpl(bulkOptions) + .setCosmosEndToEndLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) + .build()); + + cosmosClient.getDatabase(databaseName) + .getContainer(containerName) + .executeBulkOperations(deleteOperations, bulkOptions) + .flatMap(response -> { + if (response.getException() != null) { + Exception ex = response.getException(); + if (ex instanceof CosmosException) { + CosmosException cosmosException = (CosmosException) ex; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND + && cosmosException.getSubStatusCode() == 0) { + return Mono.empty(); + } + } + return Mono.error(ex); + } + if (response.getResponse() != null + && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { + return Mono.empty(); + } + if (response.getResponse() != null + && !response.getResponse().isSuccessStatusCode()) { + CosmosException bulkException = BridgeInternal.createCosmosException( + response.getResponse().getStatusCode(), + "Bulk delete operation failed with status code " + response.getResponse().getStatusCode()); + BridgeInternal.setSubStatusCode(bulkException, response.getResponse().getSubStatusCode()); + return Mono.error(bulkException); + } + return Mono.just(response); + }) + .blockLast(); + + logger.info("Truncating DocumentCollection {} triggers ...", collection.getId()); + + container + .getScripts() + .queryTriggers("SELECT * FROM root", new CosmosQueryRequestOptions()) + .byPage() + .publishOn(Schedulers.parallel()) + .flatMap(page -> Flux.fromIterable(page.getResults())) + .flatMap(trigger -> container.getScripts().getTrigger(trigger.getId()).delete()).then().block(); + + logger.info("Truncating DocumentCollection {} storedProcedures ...", collection.getId()); + + container + .getScripts() + .queryStoredProcedures("SELECT * FROM root", new CosmosQueryRequestOptions()) + .byPage() + .publishOn(Schedulers.parallel()) + .flatMap(page -> Flux.fromIterable(page.getResults())) + .flatMap(storedProcedure -> { + return container.getScripts().getStoredProcedure(storedProcedure.getId()).delete(); + }) + .then() + .block(); + + logger.info("Truncating DocumentCollection {} udfs ...", collection.getId()); + + container + .getScripts() + .queryUserDefinedFunctions("SELECT * FROM root", new CosmosQueryRequestOptions()) + .byPage() + .publishOn(Schedulers.parallel()).flatMap(page -> Flux.fromIterable(page.getResults())) + .flatMap(udf -> { + RequestOptions requestOptions = new RequestOptions(); + return container.getScripts().getUserDefinedFunction(udf.getId()).delete(); + }) + .then() + .block(); } logger.info("Finished truncating DocumentCollection {}.", collection.getId()); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java index e6d58eb148bc..02a59815e730 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java @@ -113,7 +113,7 @@ public void queryDocumentsWithAggregates(Boolean qmEnabled) throws Exception { public void bulkInsert() { generateTestData(); - voidBulkInsertBlocking(createdCollection, docs); + voidInsertAllItemsBlocking(createdCollection, docs, true); } public void generateTestData() { @@ -266,7 +266,7 @@ public void before_AggregateQueryTests() throws Throwable { client = this.getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); try { - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); } catch (Throwable throwable) { throwable = Exceptions.unwrap(throwable); if (!(throwable instanceof CosmosException)) { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java index 08d3927a162b..9abcee71b131 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java @@ -206,9 +206,10 @@ public void before_BackPressureCrossPartitionTest() { docDefList.add(getDocumentDefinition(i)); } - createdDocuments = bulkInsertBlocking( + createdDocuments = insertAllItemsBlocking( createdCollection, - docDefList); + docDefList, + true); numberOfPartitions = CosmosBridgeInternal.getAsyncDocumentClient(client).readPartitionKeyRanges(getCollectionLink(), (CosmosQueryRequestOptions) null) .flatMap(p -> Flux.fromIterable(p.getResults())).collectList().single().block().size(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureTest.java index c133d0e81241..bac1eb1719f4 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/BackPressureTest.java @@ -285,7 +285,7 @@ public void before_BackPressureTest() throws Exception { docDefList.add(getDocumentDefinition(i)); } - createdDocuments = bulkInsertBlocking(createdCollection, docDefList); + createdDocuments = insertAllItemsBlocking(createdCollection, docDefList, true); waitIfNeededForReplicasToCatchUp(getClientBuilder()); warmUp(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ContainerCreateDeleteWithSameNameTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ContainerCreateDeleteWithSameNameTest.java index 55a06016b767..9de5984d98ee 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ContainerCreateDeleteWithSameNameTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ContainerCreateDeleteWithSameNameTest.java @@ -929,7 +929,7 @@ private void setupReadFeedDocuments(List createdDocuments, CosmosAsy docDefList.add(TestObject.creatNewTestObject()); } - createdDocuments.addAll(bulkInsertBlocking(feedContainer, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedContainer, docDefList, true)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java index 062c891cf09d..c2c6fb4cf5ff 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java @@ -325,8 +325,8 @@ public void queryDocumentsWithOrderBy(String query, boolean matchedOrderBy) { public void bulkInsert() { generateTestData(); - voidBulkInsertBlocking(createdCollection, docs); - voidBulkInsertBlocking(createdCollection, propertiesDocs); + voidInsertAllItemsBlocking(createdCollection, docs, true); + voidInsertAllItemsBlocking(createdCollection, propertiesDocs, true); } public void generateTestData() { @@ -379,7 +379,7 @@ public void afterClass() { public void beforeClass() throws Exception { client = this.getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); bulkInsert(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java index de2b0d9bddd0..21775691d2ef 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/FeedRangeQueryTests.java @@ -161,7 +161,7 @@ private List queryAndGetResults(SqlQuerySpec querySpec, CosmosQueryReques public void beforeClass() throws Exception { client = this.getClientBuilder().buildAsyncClient(); createdContainer = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdContainer); + cleanUpContainer(createdContainer); createdDocuments.addAll(this.insertDocuments( DEFAULT_NUM_DOCUMENTS_PER_PKEY, @@ -187,7 +187,7 @@ private List insertDocuments( .size())))); } - List documentInserted = bulkInsertBlocking(container, documentsToInsert); + List documentInserted = insertAllItemsBlocking(container, documentsToInsert, true); waitIfNeededForReplicasToCatchUp(this.getClientBuilder()); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java index 1d3f0adc9b08..23428c9b7544 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GroupByQueryTests.java @@ -145,7 +145,7 @@ public void queryDocuments(Triple, Integer> gro public void bulkInsert() { generateTestData(INSERT_DOCUMENTS_CNT); - voidBulkInsertBlocking(createdCollection, docs); + voidInsertAllItemsBlocking(createdCollection, docs, true); } public void generateTestData(int documentCnt) { @@ -195,7 +195,7 @@ public void afterClass() { public void beforeClass() throws Exception { client = this.getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); bulkInsert(); waitIfNeededForReplicasToCatchUp(this.getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java index a5e9e832aa1c..9a6427952ca6 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java @@ -120,7 +120,7 @@ public void before_MultiOrderByQueryTests() throws Exception { documents = new ArrayList<>(); client = getClientBuilder().buildAsyncClient(); documentCollection = getSharedMultiPartitionCosmosContainerWithCompositeAndSpatialIndexes(client); - truncateCollection(documentCollection); + cleanUpContainer(documentCollection); expectCount(documentCollection, 0); int numberOfDocuments = 4; @@ -161,7 +161,7 @@ public void before_MultiOrderByQueryTests() throws Exception { } } - voidBulkInsertBlocking(documentCollection, documents); + voidInsertAllItemsBlocking(documentCollection, documents, true); expectCount(documentCollection, documents.size()); waitIfNeededForReplicasToCatchUp(getClientBuilder()); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java index 227c4db66133..1e8824d8a0ff 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java @@ -305,7 +305,7 @@ private List queryWithContinuationTokens(String query, int p public void bulkInsert() { generateTestData(); - voidBulkInsertBlocking(createdCollection, docs); + voidInsertAllItemsBlocking(createdCollection, docs, true); } public void generateTestData() { @@ -344,7 +344,7 @@ public void afterClass() { public void beforeClass() throws Exception { client = this.getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); bulkInsert(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java index e02330cbf9b6..821ca11c4898 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java @@ -644,7 +644,7 @@ public InternalObjectNode createDocument(CosmosAsyncContainer cosmosContainer, M return BridgeInternal.getProperties(cosmosContainer.createItem(docDefinition).block()); } - public List bulkInsert(CosmosAsyncContainer cosmosContainer, List> keyValuePropsList) { + public List bulkInsertDocs(CosmosAsyncContainer cosmosContainer, List> keyValuePropsList) { ArrayList result = new ArrayList(); @@ -653,7 +653,7 @@ public List bulkInsert(CosmosAsyncContainer cosmosContainer, result.add(docDefinition); } - return bulkInsertBlocking(cosmosContainer, result); + return insertAllItemsBlocking(cosmosContainer, result, true); } @BeforeMethod(groups = { "query" }) @@ -672,7 +672,7 @@ public void before_OrderbyDocumentQueryTest() throws Exception { client = getClientBuilder().buildAsyncClient(); createdDatabase = getSharedCosmosDatabase(client); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); String containerName = "roundTripsContainer-" + UUID.randomUUID(); createdDatabase.createContainer(containerName, "/mypk", @@ -740,7 +740,7 @@ public void before_OrderbyDocumentQueryTest() throws Exception { props = new HashMap<>(); keyValuePropsList.add(props); - createdDocuments = bulkInsert(createdCollection, keyValuePropsList); + createdDocuments = bulkInsertDocs(createdCollection, keyValuePropsList); for(int i = 0; i < 10; i++) { Map p = new HashMap<>(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java index ff84a3efa2b9..94d3895a0177 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java @@ -485,10 +485,10 @@ public void before_ParallelDocumentQueryTest() { private List prepareCosmosContainer(CosmosAsyncContainer cosmosContainer) { try { - truncateCollection(cosmosContainer); + cleanUpContainer(cosmosContainer); } catch (Throwable firstChanceException) { try { - truncateCollection(cosmosContainer); + cleanUpContainer(cosmosContainer); } catch (Throwable lastChanceException) { String message = String.format("container %s truncation failed due to first chance %s followed by last chance %s", cosmosContainer, @@ -509,7 +509,7 @@ private List prepareCosmosContainer(CosmosAsyncContainer cos docDefList.add(getDocumentDefinition(99)); } - List items = bulkInsertBlocking(cosmosContainer, docDefList); + List items = insertAllItemsBlocking(cosmosContainer, docDefList, true); waitIfNeededForReplicasToCatchUp(getClientBuilder()); return items; } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java index bc44a2824acd..dc3b508f4a0a 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java @@ -84,7 +84,7 @@ public void beforeClass() throws Exception { client = this.getClientBuilder().buildAsyncClient(); createdDatabase = getSharedCosmosDatabase(client); createdContainer = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdContainer); + cleanUpContainer(createdContainer); createdDocuments.addAll(this.insertDocuments(DEFAULT_NUM_DOCUMENTS, null, createdContainer)); } @@ -570,7 +570,7 @@ private List createDocumentsWithUndefinedAndNullValues(CosmosAsyncCo } docsToInsert.add(objectNode); } - return bulkInsertBlocking(container, docsToInsert); + return insertAllItemsBlocking(container, docsToInsert, true); } @Test(groups = {"query"}, timeOut = TIMEOUT) @@ -668,7 +668,7 @@ private List insertDocuments(int documentCount, List partiti partitionKeys == null ? UUID.randomUUID().toString() : partitionKeys.get(random.nextInt(partitionKeys.size())))); } - List documentInserted = bulkInsertBlocking(container, documentsToInsert); + List documentInserted = insertAllItemsBlocking(container, documentsToInsert, true); waitIfNeededForReplicasToCatchUp(this.getClientBuilder()); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java index 66c902239162..5fd5100b5855 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java @@ -83,7 +83,7 @@ public void readDocuments_withoutEnableCrossPartitionQuery() { public void before_ReadFeedDocumentsTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); List docDefList = new ArrayList<>(); @@ -91,7 +91,7 @@ public void before_ReadFeedDocumentsTest() { docDefList.add(getDocumentDefinition()); } - createdDocuments = bulkInsertBlocking(createdCollection, docDefList); + createdDocuments = insertAllItemsBlocking(createdCollection, docDefList, true); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedStoredProceduresTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedStoredProceduresTest.java index 3a28ac6e3c3d..fa99650d2dd8 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedStoredProceduresTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedStoredProceduresTest.java @@ -56,7 +56,7 @@ public void readStoredProcedures() throws Exception { public void before_ReadFeedStoredProceduresTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for (int i = 0; i < 5; i++) { createdStoredProcedures.add(createStoredProcedures(createdCollection)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedTriggersTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedTriggersTest.java index 555e535b65a6..2c42722406ac 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedTriggersTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedTriggersTest.java @@ -54,7 +54,7 @@ public void readTriggers() throws Exception { public void before_ReadFeedTriggersTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for (int i = 0; i < 5; i++) { this.createdTriggers.add(this.createTriggers(createdCollection)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedUdfsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedUdfsTest.java index 2fb5e20495cb..870689f0c8f9 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedUdfsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedUdfsTest.java @@ -58,7 +58,7 @@ public void readUserDefinedFunctions() throws Exception { public void before_ReadFeedUdfsTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for (int i = 0; i < 5; i++) { createdUserDefinedFunctions.add(createUserDefinedFunctions(createdCollection)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java index ddb421861ef0..0aebb62bfcee 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java @@ -312,7 +312,7 @@ public InternalObjectNode createDocument(CosmosAsyncContainer cosmosContainer, i public void before_SinglePartitionDocumentQueryTest() throws Exception { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedSinglePartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for(int i = 0; i < 5; i++) { createdDocuments.add(createDocument(createdCollection, i)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java index fb8de9b4852a..a975079aa354 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java @@ -55,7 +55,7 @@ public void readDocuments() { public void before_SinglePartitionReadFeedDocumentsTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedSinglePartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); List docDefList = new ArrayList<>(); @@ -63,7 +63,7 @@ public void before_SinglePartitionReadFeedDocumentsTest() { docDefList.add(getDocumentDefinition()); } - createdDocuments = bulkInsertBlocking(createdCollection, docDefList); + createdDocuments = insertAllItemsBlocking(createdCollection, docDefList, true); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/StoredProcedureQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/StoredProcedureQueryTest.java index e18cd765c41b..7d87e90fce5c 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/StoredProcedureQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/StoredProcedureQueryTest.java @@ -127,7 +127,7 @@ public CosmosStoredProcedureProperties createStoredProc(CosmosAsyncContainer cos public void before_StoredProcedureQueryTest() throws Exception { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for (int i = 0; i < 5; i++) { createdStoredProcs.add(createStoredProc(createdCollection)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 67e268718bdd..c74be9228545 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -30,7 +30,7 @@ import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.InternalObjectNode; -import com.azure.cosmos.implementation.PathParser; +import com.azure.cosmos.implementation.PartitionKeyHelper; import com.azure.cosmos.implementation.QueryFeedOperationState; import com.azure.cosmos.implementation.Resource; import com.azure.cosmos.implementation.TestConfigurations; @@ -45,8 +45,12 @@ import com.azure.cosmos.models.CosmosContainerRequestOptions; import com.azure.cosmos.models.CosmosDatabaseProperties; import com.azure.cosmos.models.CosmosDatabaseResponse; +import com.azure.cosmos.models.CosmosItemOperation; import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperationResponse; +import com.azure.cosmos.models.CosmosBulkOperations; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.CosmosResponse; import com.azure.cosmos.models.CosmosStoredProcedureRequestOptions; @@ -97,7 +101,7 @@ public abstract class TestSuiteBase extends CosmosAsyncClientTest { - private static final int DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL = 500; + private static final int DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL = 5; private static final ObjectMapper objectMapper = new ObjectMapper(); private static final CosmosItemRequestOptions DEFAULT_DELETE_ITEM_OPTIONS = new CosmosItemRequestOptions() .setCosmosEndToEndOperationLatencyPolicyConfig( @@ -246,50 +250,12 @@ public void afterSuitEmulatorVNext() { } protected static void cleanUpContainer(CosmosAsyncContainer cosmosContainer) { - CosmosContainerProperties cosmosContainerProperties = cosmosContainer.read().block().getProperties(); - String cosmosContainerId = cosmosContainerProperties.getId(); - logger.info("Truncating collection {} ...", cosmosContainerId); - List paths = cosmosContainerProperties.getPartitionKeyDefinition().getPaths(); - CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); - options.setCosmosEndToEndOperationLatencyPolicyConfig( - new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofHours(1)) - .build() - ); - options.setMaxDegreeOfParallelism(-1); - int maxItemCount = 100; - - cosmosContainer.queryItems("SELECT * FROM root", options, InternalObjectNode.class) - .byPage(maxItemCount) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(doc -> { - - PartitionKey partitionKey = null; - - Object propertyValue = null; - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - partitionKey = PartitionKey.NONE; - } else { - partitionKey = new PartitionKey(propertyValue); - } - } else { - partitionKey = new PartitionKey(null); - } - - return cosmosContainer.deleteItem(doc.getId(), partitionKey, DEFAULT_DELETE_ITEM_OPTIONS); - }).then().block(); - } - - protected static void truncateCollection(CosmosAsyncContainer cosmosContainer) { try { int i = 0; while (i < 100) { try { - truncateCollectionInternal(cosmosContainer); + cleanUpContainerInternal(cosmosContainer); return; } catch (CosmosException exception) { if (exception.getStatusCode() != HttpConstants.StatusCodes.TOO_MANY_REQUESTS @@ -331,11 +297,10 @@ protected static void expectCount(CosmosAsyncContainer cosmosContainer, int expe assertThat(counts.get(0)).isEqualTo(expectedCount); } - private static void truncateCollectionInternal(CosmosAsyncContainer cosmosContainer) { + private static void cleanUpContainerInternal(CosmosAsyncContainer cosmosContainer) { CosmosContainerProperties cosmosContainerProperties = cosmosContainer.read().block().getProperties(); String cosmosContainerId = cosmosContainerProperties.getId(); logger.info("Truncating collection {} ...", cosmosContainerId); - List paths = cosmosContainerProperties.getPartitionKeyDefinition().getPaths(); CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); options.setCosmosEndToEndOperationLatencyPolicyConfig( new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofHours(1)) @@ -346,29 +311,61 @@ private static void truncateCollectionInternal(CosmosAsyncContainer cosmosContai logger.info("Truncating collection {} documents ...", cosmosContainer.getId()); - cosmosContainer.queryItems("SELECT * FROM root", options, InternalObjectNode.class) - .byPage(maxItemCount) - .publishOn(Schedulers.parallel()) - .flatMap(page -> Flux.fromIterable(page.getResults())) - .flatMap(doc -> { - - PartitionKey partitionKey = null; - - Object propertyValue = null; - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { + Flux deleteOperations = + cosmosContainer.queryItems("SELECT * FROM root", options, InternalObjectNode.class) + .byPage(maxItemCount) + .publishOn(Schedulers.parallel()) + .flatMap(page -> Flux.fromIterable(page.getResults())) + .map(doc -> { + PartitionKey partitionKey = + PartitionKeyHelper.extractPartitionKeyFromDocument( + doc, + cosmosContainerProperties.getPartitionKeyDefinition()); + + if (partitionKey == null) { partitionKey = PartitionKey.NONE; - } else { - partitionKey = new PartitionKey(propertyValue); } - } else { - partitionKey = new PartitionKey(null); - } - return cosmosContainer.deleteItem(doc.getId(), partitionKey, DEFAULT_DELETE_ITEM_OPTIONS); - }).then().block(); + return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey); + }); + + CosmosBulkExecutionOptions truncateBulkOptions = new CosmosBulkExecutionOptions(); + ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor() + .getImpl(truncateBulkOptions) + .setCosmosEndToEndLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) + .build()); + + cosmosContainer + .executeBulkOperations(deleteOperations, truncateBulkOptions) + .flatMap(response -> { + if (response.getException() != null) { + Exception ex = response.getException(); + if (ex instanceof CosmosException) { + CosmosException cosmosException = (CosmosException) ex; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND + && cosmosException.getSubStatusCode() == 0) { + return Mono.empty(); + } + } + return Mono.error(ex); + } + if (response.getResponse() != null + && response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { + return Mono.empty(); + } + if (response.getResponse() != null + && !response.getResponse().isSuccessStatusCode()) { + CosmosException bulkException = BridgeInternal.createCosmosException( + response.getResponse().getStatusCode(), + "Bulk delete operation failed with status code " + response.getResponse().getStatusCode()); + BridgeInternal.setSubStatusCode(bulkException, response.getResponse().getSubStatusCode()); + return Mono.error(bulkException); + } + return Mono.just(response); + }) + .blockLast(); expectCount(cosmosContainer, 0); @@ -379,13 +376,6 @@ private static void truncateCollectionInternal(CosmosAsyncContainer cosmosContai .publishOn(Schedulers.parallel()) .flatMap(page -> Flux.fromIterable(page.getResults())) .flatMap(trigger -> { - // if (paths != null && !paths.isEmpty()) { - // Object propertyValue = trigger.getObjectByPath(PathParser.getPathParts(paths.get(0))); - // requestOptions.partitionKey(new PartitionKey(propertyValue)); - // Object propertyValue = getTrigger.getObjectByPath(PathParser.getPathParts(getPaths.get(0))); - // requestOptions.getPartitionKey(new PartitionKey(propertyValue)); - // } - return cosmosContainer.getScripts().getTrigger(trigger.getId()).delete(); }).then().block(); @@ -396,14 +386,6 @@ private static void truncateCollectionInternal(CosmosAsyncContainer cosmosContai .publishOn(Schedulers.parallel()) .flatMap(page -> Flux.fromIterable(page.getResults())) .flatMap(storedProcedure -> { - - // if (getPaths != null && !getPaths.isEmpty()) { - // if (paths != null && !paths.isEmpty()) { - // Object propertyValue = storedProcedure.getObjectByPath(PathParser.getPathParts(paths.get(0))); - // requestOptions.partitionKey(new PartitionKey(propertyValue)); - // requestOptions.getPartitionKey(new PartitionKey(propertyValue)); - // } - return cosmosContainer.getScripts().getStoredProcedure(storedProcedure.getId()).delete(new CosmosStoredProcedureRequestOptions()); }).then().block(); @@ -414,14 +396,6 @@ private static void truncateCollectionInternal(CosmosAsyncContainer cosmosContai .publishOn(Schedulers.parallel()) .flatMap(page -> Flux.fromIterable(page.getResults())) .flatMap(udf -> { - - // if (getPaths != null && !getPaths.isEmpty()) { - // if (paths != null && !paths.isEmpty()) { - // Object propertyValue = udf.getObjectByPath(PathParser.getPathParts(paths.get(0))); - // requestOptions.partitionKey(new PartitionKey(propertyValue)); - // requestOptions.getPartitionKey(new PartitionKey(propertyValue)); - // } - return cosmosContainer.getScripts().getUserDefinedFunction(udf.getId()).delete(); }).then().block(); @@ -612,34 +586,111 @@ public static InternalObjectNode createDocument(CosmosAsyncContainer cosmosConta return BridgeInternal.getProperties(cosmosContainer.createItem(item, options).block()); } - public Flux> bulkInsert(CosmosAsyncContainer cosmosContainer, - List documentDefinitionList, - int concurrencyLevel) { + public Flux> bulkInsert(CosmosAsyncContainer cosmosContainer, + List documentDefinitionList) { + + CosmosContainerProperties cosmosContainerProperties = cosmosContainer.read().block().getProperties(); + PartitionKeyDefinition pkDef = cosmosContainerProperties.getPartitionKeyDefinition(); + + List operations = new ArrayList<>(documentDefinitionList.size()); + for (T docDef : documentDefinitionList) { + InternalObjectNode internalNode = InternalObjectNode.fromObjectToInternalObjectNode(docDef); + PartitionKey partitionKey = PartitionKeyHelper.extractPartitionKeyFromDocument(internalNode, pkDef); + if (partitionKey == null) { + partitionKey = PartitionKey.NONE; + } + operations.add(CosmosBulkOperations.getCreateItemOperation(docDef, partitionKey)); + } + CosmosBulkExecutionOptions bulkOptions = new CosmosBulkExecutionOptions(); + ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor() + .getImpl(bulkOptions) + .setCosmosEndToEndLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65)) + .build()); + + return cosmosContainer + .executeBulkOperations(Flux.fromIterable(operations), bulkOptions) + .flatMap(response -> { + if (response.getException() != null) { + Exception ex = response.getException(); + if (ex instanceof CosmosException) { + CosmosException cosmosException = (CosmosException) ex; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.CONFLICT + && cosmosException.getSubStatusCode() == 0) { + return Mono.empty(); + } + } + return Mono.error(ex); + } + if (response.getResponse() != null + && !response.getResponse().isSuccessStatusCode() + && response.getResponse().getStatusCode() != HttpConstants.StatusCodes.CONFLICT) { + CosmosException bulkException = BridgeInternal.createCosmosException( + response.getResponse().getStatusCode(), + "Bulk insert operation failed with status code " + response.getResponse().getStatusCode()); + BridgeInternal.setSubStatusCode(bulkException, response.getResponse().getSubStatusCode()); + return Mono.error(bulkException); + } + return Mono.just(response); + }); + } + + private Flux> insertUsingPointOperations(CosmosAsyncContainer cosmosContainer, + List documentDefinitionList) { CosmosItemRequestOptions options = new CosmosItemRequestOptions() .setCosmosEndToEndOperationLatencyPolicyConfig( new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofHours(1)) .build() ); - List>> result = - new ArrayList<>(documentDefinitionList.size()); + + List>> result = new ArrayList<>(documentDefinitionList.size()); for (T docDef : documentDefinitionList) { result.add(cosmosContainer.createItem(docDef, options)); } - return Flux.merge(Flux.fromIterable(result), concurrencyLevel); + return Flux.merge(Flux.fromIterable(result), DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL); } - public List bulkInsertBlocking(CosmosAsyncContainer cosmosContainer, - List documentDefinitionList) { - return bulkInsert(cosmosContainer, documentDefinitionList, DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL) + + @SuppressWarnings("unchecked") + public List insertAllItemsBlocking(CosmosAsyncContainer cosmosContainer, + List documentDefinitionList, + boolean bulkEnabled) { + if (documentDefinitionList == null || documentDefinitionList.isEmpty()) { + return documentDefinitionList; + } + + if (!bulkEnabled) { + return insertUsingPointOperations(cosmosContainer, documentDefinitionList) + .publishOn(Schedulers.parallel()) + .map(CosmosItemResponse::getItem) + .collectList() + .block(); + } + + Class clazz = (Class) documentDefinitionList.get(0).getClass(); + + return bulkInsert(cosmosContainer, documentDefinitionList) .publishOn(Schedulers.parallel()) - .map(itemResponse -> itemResponse.getItem()) + .filter(response -> response.getResponse() != null) + .map(response -> response.getResponse().getItem(clazz)) .collectList() .block(); } - public void voidBulkInsertBlocking(CosmosAsyncContainer cosmosContainer, List documentDefinitionList) { - bulkInsert(cosmosContainer, documentDefinitionList, DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL) + public void voidInsertAllItemsBlocking(CosmosAsyncContainer cosmosContainer, + List documentDefinitionList, + boolean bulkEnabled) { + if (!bulkEnabled) { + insertUsingPointOperations(cosmosContainer, documentDefinitionList) + .publishOn(Schedulers.parallel()) + .then() + .block(); + return; + } + + bulkInsert(cosmosContainer, documentDefinitionList) .publishOn(Schedulers.parallel()) .then() .block(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java index 1fad912332e9..41562c68cb80 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java @@ -248,7 +248,7 @@ public void afterClass() { public void before_TopQueryTests() throws Exception { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedSinglePartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); bulkInsert(client); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerQueryTest.java index 3b850c42ff18..8866cd5f7c0b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerQueryTest.java @@ -137,7 +137,7 @@ public CosmosTriggerProperties createTrigger(CosmosAsyncContainer cosmosContaine public void before_TriggerQueryTest() throws Exception { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); createdTriggers.clear(); for(int i = 0; i < 5; i++) { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerUpsertReplaceTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerUpsertReplaceTest.java index d2a4870a1c10..1db1bedf1845 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerUpsertReplaceTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TriggerUpsertReplaceTest.java @@ -73,7 +73,7 @@ public void replaceTrigger() throws Exception { public void before_TriggerUpsertReplaceTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); } @AfterClass(groups = { "fast" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionQueryTest.java index 67775af2557e..74229a7ae529 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionQueryTest.java @@ -137,7 +137,7 @@ public CosmosUserDefinedFunctionProperties createUserDefinedFunction(CosmosAsync public void before_UserDefinedFunctionQueryTest() throws Exception { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); for(int i = 0; i < 5; i++) { createdUDF.add(createUserDefinedFunction(createdCollection)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionUpsertReplaceTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionUpsertReplaceTest.java index c9d19969c8dc..c432fe871487 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionUpsertReplaceTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionUpsertReplaceTest.java @@ -73,7 +73,7 @@ public void replaceUserDefinedFunction() throws Exception { public void before_UserDefinedFunctionUpsertReplaceTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); } @AfterClass(groups = { "fast" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/VeryLargeDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/VeryLargeDocumentQueryTest.java index 68e68c50731d..9e7b8af99a5f 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/VeryLargeDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/VeryLargeDocumentQueryTest.java @@ -84,7 +84,7 @@ private void createLargeDocument() { public void before_VeryLargeDocumentQueryTest() { client = getClientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); - truncateCollection(createdCollection); + cleanUpContainer(createdCollection); } @AfterClass(groups = { "query" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/ChangeFeedProcessorMigrationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/ChangeFeedProcessorMigrationTests.java index b9d474ccb0a4..f9b4c51535f7 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/ChangeFeedProcessorMigrationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/ChangeFeedProcessorMigrationTests.java @@ -211,7 +211,7 @@ private void setupReadFeedDocuments( docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, true)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java index fa0116f3dbad..0f28d36c42e8 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java @@ -1263,7 +1263,7 @@ public void staledLeaseAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - bulkInsert(createdFeedCollection, docDefList, FEED_COUNT).blockLast(); + bulkInsert(createdFeedCollection, docDefList).blockLast(); // Wait for the feed processor to receive and process the documents. Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); @@ -1337,7 +1337,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - bulkInsert(createdFeedCollection, docDefList, FEED_COUNT) + bulkInsert(createdFeedCollection, docDefList) .last() .flatMap(cosmosItemResponse -> { logger.info("Start first Change feed processor"); @@ -1389,7 +1389,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList1.add(getDocumentDefinition()); } - return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) + return bulkInsert(createdFeedCollection, docDefList1) .last(); }); })) @@ -2153,7 +2153,7 @@ private void setupReadFeedDocuments(List createdDocuments, C logger.info("Adding the following item to bulk list: {}", item); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } @@ -2164,7 +2164,7 @@ private void createReadFeedDocuments(List createdDocuments, docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index a543309ef37a..fd4166871667 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -987,7 +987,7 @@ public void staledLeaseAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - return bulkInsert(createdFeedCollection, docDefList, FEED_COUNT) + return bulkInsert(createdFeedCollection, docDefList) .last() .delayElement(Duration.ofMillis(1000)) .flatMap(cosmosItemResponse -> { @@ -1074,7 +1074,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - bulkInsert(createdFeedCollection, docDefList, FEED_COUNT) + bulkInsert(createdFeedCollection, docDefList) .last() .flatMap(cosmosItemResponse -> { logger.info("Start first Change feed processor"); @@ -1126,7 +1126,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList1.add(getDocumentDefinition()); } - return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) + return bulkInsert(createdFeedCollection, docDefList1) .last(); }); })) @@ -2350,7 +2350,7 @@ private void setupReadFeedDocuments( docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); for (InternalObjectNode current : createdDocuments) { try { logger.info("CREATED {}", OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(current)); @@ -2368,7 +2368,7 @@ private void createReadFeedDocuments(List createdDocuments, docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index ad70d2997423..6525617d94dd 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -1004,7 +1004,7 @@ public void staledLeaseAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - return bulkInsert(createdFeedCollection, docDefList, FEED_COUNT) + return bulkInsert(createdFeedCollection, docDefList) .last() .delayElement(Duration.ofMillis(1000)) .flatMap(cosmosItemResponse -> { @@ -1090,7 +1090,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList.add(getDocumentDefinition()); } - bulkInsert(createdFeedCollection, docDefList, FEED_COUNT) + bulkInsert(createdFeedCollection, docDefList) .last() .flatMap(cosmosItemResponse -> { log.info("Start first Change feed processor"); @@ -1143,7 +1143,7 @@ public void ownerNullAcquiring() throws InterruptedException { docDefList1.add(getDocumentDefinition()); } - return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) + return bulkInsert(createdFeedCollection, docDefList1) .last(); }); })) @@ -2236,7 +2236,7 @@ private void setupReadFeedDocuments(List createdDocuments, C docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } @@ -2247,7 +2247,7 @@ private void setupReadFeedDocumentsForAllVersionsAndDeletes(List createdDocuments, docDefList.add(getDocumentDefinition()); } - createdDocuments.addAll(bulkInsertBlocking(feedCollection, docDefList)); + createdDocuments.addAll(insertAllItemsBlocking(feedCollection, docDefList, false)); waitIfNeededForReplicasToCatchUp(getClientBuilder()); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 4c9643a3eafd..083389edcd55 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -510,7 +510,7 @@ private RxDocumentClientImpl(URI serviceEndpoint, if (value == null) { return 1; } - + return value + 1; });