Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
Expand Down Expand Up @@ -592,8 +591,6 @@ public void databaseAccountToClients() {
cosmosDiagnosticsNode.get("clientCfgs").get("clientEndpoints").get(TestConfigurations.HOST).asInt();

assertThat(updatedClientCount).isEqualTo(clientCount + 1);
} catch (JsonMappingException e) {
throw new RuntimeException(e);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public CosmosAsyncClient initializeClient(CosmosEndToEndOperationLatencyPolicyCo

try {
createdContainer = getSharedMultiPartitionCosmosContainer(client);
truncateCollection(createdContainer);
cleanUpContainer(createdContainer);

createdDocuments.addAll(this.insertDocuments(DEFAULT_NUM_DOCUMENTS, null, createdContainer));

Expand Down Expand Up @@ -504,7 +504,7 @@ private List<TestObject> insertDocuments(int documentCount, List<String> partiti
partitionKeys == null ? UUID.randomUUID().toString() : partitionKeys.get(random.nextInt(partitionKeys.size()))));
}

List<TestObject> documentInserted = bulkInsertBlocking(container, documentsToInsert);
List<TestObject> documentInserted = insertAllItemsBlocking(container, documentsToInsert, true);

waitIfNeededForReplicasToCatchUp(this.getClientBuilder());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
// Licensed under the MIT License.
package com.azure.cosmos.implementation;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.CosmosNettyLeakDetectorFactory;
import com.azure.cosmos.DirectConnectionConfig;
import com.azure.cosmos.DocumentClientTest;
Expand All @@ -19,7 +23,10 @@
import com.azure.cosmos.models.CompositePath;
import com.azure.cosmos.models.CompositePathSortOrder;
import com.azure.cosmos.models.CosmosClientTelemetryConfig;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkOperations;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.IncludedPath;
import com.azure.cosmos.models.IndexingPolicy;
Expand Down Expand Up @@ -173,134 +180,118 @@ public void afterSuite() {

protected static void truncateCollection(DocumentCollection collection) {
logger.info("Truncating DocumentCollection {} ...", collection.getId());
AsyncDocumentClient houseKeepingClient = createGatewayHouseKeepingDocumentClient().build();
try {
List<String> paths = collection.getPartitionKey().getPaths();

try (CosmosAsyncClient cosmosClient = new CosmosClientBuilder()
.key(TestConfigurations.MASTER_KEY)
.endpoint(TestConfigurations.HOST)
.buildAsyncClient()) {
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
options.setMaxDegreeOfParallelism(-1);
QueryFeedOperationState state = new QueryFeedOperationState(
cosmosClient,
"truncateCollection",
collection.getSelfLink(),
collection.getId(),
ResourceType.Document,
OperationType.Query,
null,
options,
new CosmosPagedFluxOptions()
);

ModelBridgeInternal.setQueryRequestOptionsMaxItemCount(options, 100);

logger.info("Truncating DocumentCollection {} documents ...", collection.getId());

houseKeepingClient.queryDocuments(collection.getSelfLink(), "SELECT * FROM root", state, Document.class)
.publishOn(Schedulers.parallel())
.flatMap(page -> Flux.fromIterable(page.getResults()))
.flatMap(doc -> {
RequestOptions requestOptions = new RequestOptions();

if (paths != null && !paths.isEmpty()) {
List<String> pkPath = PathParser.getPathParts(paths.get(0));
Object propertyValue = doc.getObjectByPath(pkPath);
if (propertyValue == null) {
propertyValue = Undefined.value();
}

requestOptions.setPartitionKey(new PartitionKey(propertyValue));
}

return houseKeepingClient.deleteDocument(doc.getSelfLink(), requestOptions);
}).then().block();

logger.info("Truncating DocumentCollection {} triggers ...", collection.getId());

state = new QueryFeedOperationState(
cosmosClient,
"truncateTriggers",
collection.getSelfLink(),
collection.getId(),
ResourceType.Document,
OperationType.Query,
null,
options,
new CosmosPagedFluxOptions()
);
houseKeepingClient.queryTriggers(collection.getSelfLink(), "SELECT * FROM root", state)
.publishOn(Schedulers.parallel())
.flatMap(page -> Flux.fromIterable(page.getResults()))
.flatMap(trigger -> {
RequestOptions requestOptions = new RequestOptions();

// if (paths != null && !paths.isEmpty()) {
// Object propertyValue = trigger.getObjectByPath(PathParser.getPathParts(paths.get(0)));
// requestOptions.partitionKey(new PartitionKey(propertyValue));
// }

return houseKeepingClient.deleteTrigger(trigger.getSelfLink(), requestOptions);
}).then().block();

logger.info("Truncating DocumentCollection {} storedProcedures ...", collection.getId());

state = new QueryFeedOperationState(
cosmosClient,
"truncateStoredProcs",
collection.getSelfLink(),
collection.getId(),
ResourceType.Document,
OperationType.Query,
null,
options,
new CosmosPagedFluxOptions()
);
houseKeepingClient.queryStoredProcedures(collection.getSelfLink(), "SELECT * FROM root", state)
.publishOn(Schedulers.parallel())
.flatMap(page -> Flux.fromIterable(page.getResults()))
.flatMap(storedProcedure -> {
RequestOptions requestOptions = new RequestOptions();

// if (paths != null && !paths.isEmpty()) {
// Object propertyValue = storedProcedure.getObjectByPath(PathParser.getPathParts(paths.get(0)));
// requestOptions.partitionKey(new PartitionKey(propertyValue));
// }

return houseKeepingClient.deleteStoredProcedure(storedProcedure.getSelfLink(), requestOptions);
}).then().block();

logger.info("Truncating DocumentCollection {} udfs ...", collection.getId());

state = new QueryFeedOperationState(
cosmosClient,
"truncateUserDefinedFunctions",
collection.getSelfLink(),
collection.getId(),
ResourceType.Document,
OperationType.Query,
null,
options,
new CosmosPagedFluxOptions()
);
houseKeepingClient.queryUserDefinedFunctions(collection.getSelfLink(), "SELECT * FROM root", state)
.publishOn(Schedulers.parallel())
.flatMap(page -> Flux.fromIterable(page.getResults()))
.flatMap(udf -> {
RequestOptions requestOptions = new RequestOptions();

// if (paths != null && !paths.isEmpty()) {
// Object propertyValue = udf.getObjectByPath(PathParser.getPathParts(paths.get(0)));
// requestOptions.partitionKey(new PartitionKey(propertyValue));
// }

return houseKeepingClient.deleteUserDefinedFunction(udf.getSelfLink(), requestOptions);
}).then().block();
}
} finally {
houseKeepingClient.close();

try (CosmosAsyncClient cosmosClient = new CosmosClientBuilder()
.key(TestConfigurations.MASTER_KEY)
.endpoint(TestConfigurations.HOST)
.buildAsyncClient()) {

CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
options.setMaxDegreeOfParallelism(-1);

ModelBridgeInternal.setQueryRequestOptionsMaxItemCount(options, 100);

logger.info("Truncating DocumentCollection {} documents ...", collection.getId());

String altLink = collection.getAltLink();
String[] altLinkSegments = altLink.split("/");
// altLink format: dbs/{dbName}/colls/{collName}
String databaseName = altLinkSegments[1];
String containerName = altLinkSegments[3];

CosmosAsyncContainer container = cosmosClient.getDatabase(databaseName).getContainer(containerName);

Flux<CosmosItemOperation> deleteOperations =
container
.queryItems( "SELECT * FROM root", Document.class)
.byPage()
.publishOn(Schedulers.parallel())
.flatMap(page -> Flux.fromIterable(page.getResults()))
.map(doc -> {
PartitionKey partitionKey = PartitionKeyHelper.extractPartitionKeyFromDocument(doc, collection.getPartitionKey());
if (partitionKey == null) {
partitionKey = PartitionKey.NONE;
}

return CosmosBulkOperations.getDeleteItemOperation(doc.getId(), partitionKey);
});

CosmosBulkExecutionOptions bulkOptions = new CosmosBulkExecutionOptions();
ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.getImpl(bulkOptions)
.setCosmosEndToEndLatencyPolicyConfig(
new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(65))
.build());

cosmosClient.getDatabase(databaseName)
.getContainer(containerName)
.executeBulkOperations(deleteOperations, bulkOptions)
.flatMap(response -> {
if (response.getException() != null) {
Exception ex = response.getException();
if (ex instanceof CosmosException) {
CosmosException cosmosException = (CosmosException) ex;
if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND
&& cosmosException.getSubStatusCode() == 0) {
return Mono.empty();
}
}
return Mono.error(ex);
}
if (response.getResponse() != null
&& response.getResponse().getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
return Mono.empty();
}
if (response.getResponse() != null
&& !response.getResponse().isSuccessStatusCode()) {
CosmosException bulkException = BridgeInternal.createCosmosException(
response.getResponse().getStatusCode(),
"Bulk delete operation failed with status code " + response.getResponse().getStatusCode());
BridgeInternal.setSubStatusCode(bulkException, response.getResponse().getSubStatusCode());
return Mono.error(bulkException);
}
return Mono.just(response);
})
.blockLast();

logger.info("Truncating DocumentCollection {} triggers ...", collection.getId());

container
.getScripts()
.queryTriggers("SELECT * FROM root", new CosmosQueryRequestOptions())
.byPage()
.publishOn(Schedulers.parallel())
.flatMap(page -> Flux.fromIterable(page.getResults()))
.flatMap(trigger -> container.getScripts().getTrigger(trigger.getId()).delete()).then().block();

logger.info("Truncating DocumentCollection {} storedProcedures ...", collection.getId());

container
.getScripts()
.queryStoredProcedures("SELECT * FROM root", new CosmosQueryRequestOptions())
.byPage()
.publishOn(Schedulers.parallel())
.flatMap(page -> Flux.fromIterable(page.getResults()))
.flatMap(storedProcedure -> {
return container.getScripts().getStoredProcedure(storedProcedure.getId()).delete();
})
.then()
.block();

logger.info("Truncating DocumentCollection {} udfs ...", collection.getId());

container
.getScripts()
.queryUserDefinedFunctions("SELECT * FROM root", new CosmosQueryRequestOptions())
.byPage()
.publishOn(Schedulers.parallel()).flatMap(page -> Flux.fromIterable(page.getResults()))
.flatMap(udf -> {
RequestOptions requestOptions = new RequestOptions();
return container.getScripts().getUserDefinedFunction(udf.getId()).delete();
})
.then()
.block();
}

logger.info("Finished truncating DocumentCollection {}.", collection.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void queryDocumentsWithAggregates(Boolean qmEnabled) throws Exception {

public void bulkInsert() {
generateTestData();
voidBulkInsertBlocking(createdCollection, docs);
voidInsertAllItemsBlocking(createdCollection, docs, true);
}

public void generateTestData() {
Expand Down Expand Up @@ -266,7 +266,7 @@ public void before_AggregateQueryTests() throws Throwable {
client = this.getClientBuilder().buildAsyncClient();
createdCollection = getSharedMultiPartitionCosmosContainer(client);
try {
truncateCollection(createdCollection);
cleanUpContainer(createdCollection);
} catch (Throwable throwable) {
throwable = Exceptions.unwrap(throwable);
if (!(throwable instanceof CosmosException)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,10 @@ public void before_BackPressureCrossPartitionTest() {
docDefList.add(getDocumentDefinition(i));
}

createdDocuments = bulkInsertBlocking(
createdDocuments = insertAllItemsBlocking(
createdCollection,
docDefList);
docDefList,
true);

numberOfPartitions = CosmosBridgeInternal.getAsyncDocumentClient(client).readPartitionKeyRanges(getCollectionLink(), (CosmosQueryRequestOptions) null)
.flatMap(p -> Flux.fromIterable(p.getResults())).collectList().single().block().size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public void before_BackPressureTest() throws Exception {
docDefList.add(getDocumentDefinition(i));
}

createdDocuments = bulkInsertBlocking(createdCollection, docDefList);
createdDocuments = insertAllItemsBlocking(createdCollection, docDefList, true);

waitIfNeededForReplicasToCatchUp(getClientBuilder());
warmUp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ private void setupReadFeedDocuments(List<TestObject> createdDocuments, CosmosAsy
docDefList.add(TestObject.creatNewTestObject());
}

createdDocuments.addAll(bulkInsertBlocking(feedContainer, docDefList));
createdDocuments.addAll(insertAllItemsBlocking(feedContainer, docDefList, true));
waitIfNeededForReplicasToCatchUp(getClientBuilder());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,8 @@ public void queryDocumentsWithOrderBy(String query, boolean matchedOrderBy) {

public void bulkInsert() {
generateTestData();
voidBulkInsertBlocking(createdCollection, docs);
voidBulkInsertBlocking(createdCollection, propertiesDocs);
voidInsertAllItemsBlocking(createdCollection, docs, true);
voidInsertAllItemsBlocking(createdCollection, propertiesDocs, true);
}

public void generateTestData() {
Expand Down Expand Up @@ -379,7 +379,7 @@ public void afterClass() {
public void beforeClass() throws Exception {
client = this.getClientBuilder().buildAsyncClient();
createdCollection = getSharedMultiPartitionCosmosContainer(client);
truncateCollection(createdCollection);
cleanUpContainer(createdCollection);

bulkInsert();

Expand Down
Loading
Loading