From 795cf738e00058a938a26e9ce1dd6c630e74c51b Mon Sep 17 00:00:00 2001 From: milismsft-mac Date: Wed, 17 Nov 2021 20:37:30 -0800 Subject: [PATCH] Session token optimization and ChangeFeedProcessor bug fixes By default we don't scope the global session token in Gateway when partition key ID is part of the request. For containers with a very large number of partitions and multi-region account setting, this can result in "Request headers too long" errors. The change is to look first at the current request's headers and if the request is scoped to one specific partition, we should also scope the session token to that specific partition. ChangeFeedProcessor related changes: when the renewer task is cancelled, we also need to close the processor task even if we are about to check-point the current state. The cange also ensures that any exceptions are prioritized accordinally. Added more logs to CFP to allow for better monitoring of the current processing. Added test case for the scoped session token and simplified the existing CFP test. --- .../implementation/SessionContainer.java | 7 + .../changefeed/LeaseCheckpointer.java | 3 +- .../changefeed/LeaseStoreManager.java | 3 +- .../changefeed/PartitionCheckpointer.java | 8 + .../implementation/LeaseStoreManagerImpl.java | 43 +++-- .../PartitionCheckpointerImpl.java | 16 +- .../PartitionProcessorFactoryImpl.java | 2 +- .../PartitionProcessorImpl.java | 30 +++- .../PartitionSupervisorImpl.java | 20 ++- .../implementation/SessionContainerTest.java | 11 ++ .../cosmos/rx/ChangeFeedProcessorTest.java | 164 ++++++++---------- 11 files changed, 172 insertions(+), 135 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionContainer.java index b329bec6200a..51439b6afeea 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionContainer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionContainer.java @@ -109,6 +109,13 @@ private ConcurrentHashMap getPartitionKeyRangeIdToTokenMa public String resolveGlobalSessionToken(RxDocumentServiceRequest request) { ConcurrentHashMap partitionKeyRangeIdToTokenMap = this.getPartitionKeyRangeIdToTokenMap(request); if (partitionKeyRangeIdToTokenMap != null) { + String partitionKeyRangeId = request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY_RANGE_ID); + if (StringUtils.isNotEmpty(partitionKeyRangeId) && partitionKeyRangeIdToTokenMap.get(partitionKeyRangeId) != null) { + StringBuilder result = new StringBuilder() + .append(partitionKeyRangeId).append(":").append(partitionKeyRangeIdToTokenMap.get(partitionKeyRangeId).convertToString()); + return result.toString(); + } + return SessionContainer.getCombinedSessionToken(partitionKeyRangeIdToTokenMap); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java index cd16a7da0aae..4c7f9c5f3e2e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java @@ -15,7 +15,8 @@ public interface LeaseCheckpointer { * * @param lease the lease to renew. * @param continuationToken the continuation token. + * @param cancellationToken the cancellation token. * @return the updated renewed lease. */ - Mono checkpoint(Lease lease, String continuationToken); + Mono checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java index f5cba3a4d9e9..509d2fa09785 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java @@ -100,9 +100,10 @@ static LeaseStoreManagerBuilderDefinition builder() { * * @param lease the Lease to renew. * @param continuationToken the continuation token. + * @param cancellationToken the cancellation token. * @return the updated renewed lease. */ - Mono checkpoint(Lease lease, String continuationToken); + Mono checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken); /** * @return true if the lease store is initialized. diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionCheckpointer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionCheckpointer.java index 8b16bae7d896..4662a7881460 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionCheckpointer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionCheckpointer.java @@ -16,4 +16,12 @@ public interface PartitionCheckpointer { * @return a deferred operation of this call. */ Mono checkpointPartition(ChangeFeedState continuationState); + + /** + * Sets the cancelation token in case we need to bail out before check-pointing. + * + * @param cancellationToken the cancellation token. + * @return this instance. + */ + PartitionCheckpointer setCancellationToken(CancellationToken cancellationToken); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java index c092f786a3d6..f1f0dcb6029e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java @@ -6,6 +6,8 @@ import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.InternalObjectNode; +import com.azure.cosmos.implementation.changefeed.CancellationToken; +import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.ModelBridgeInternal; import com.azure.cosmos.models.PartitionKey; @@ -355,7 +357,7 @@ public Mono updateProperties(Lease lease) { } @Override - public Mono checkpoint(Lease lease, String continuationToken) { + public Mono checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken) { if (lease == null) { throw new IllegalArgumentException("lease"); } @@ -364,28 +366,33 @@ public Mono checkpoint(Lease lease, String continuationToken) { throw new IllegalArgumentException("continuationToken must be a non-empty string"); } + if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException()); + return this.leaseDocumentClient.readItem(lease.getId(), new PartitionKey(lease.getId()), this.requestOptionsFactory.createItemRequestOptions(lease), InternalObjectNode.class) .map( documentResourceResponse -> ServiceItemLease.fromDocument(BridgeInternal.getProperties(documentResourceResponse))) - .flatMap( refreshedLease -> this.leaseUpdater.updateLease( - refreshedLease, - lease.getId(), new PartitionKey(lease.getId()), - this.requestOptionsFactory.createItemRequestOptions(lease), - serverLease -> { - if (serverLease.getOwner() == null) { - logger.info("Partition {} lease was taken over and released by a different owner", lease.getLeaseToken()); - throw new LeaseLostException(lease); - } - else if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) { - logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner()); - throw new LeaseLostException(lease); - } - serverLease.setContinuationToken(continuationToken); - - return serverLease; - })) + .flatMap( refreshedLease -> { + if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException()); + + return this.leaseUpdater.updateLease( + refreshedLease, + lease.getId(), new PartitionKey(lease.getId()), + this.requestOptionsFactory.createItemRequestOptions(lease), + serverLease -> { + if (serverLease.getOwner() == null) { + logger.info("Partition {} lease was taken over and released by a different owner", lease.getLeaseToken()); + throw new LeaseLostException(lease); + } else if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) { + logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner()); + throw new LeaseLostException(lease); + } + serverLease.setContinuationToken(continuationToken); + + return serverLease; + }); + }) .doOnError(throwable -> { logger.info("Partition {} lease with token '{}' failed to checkpoint for owner '{}' with continuation token '{}'", lease.getLeaseToken(), lease.getConcurrencyToken(), lease.getOwner(), lease.getContinuationToken()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionCheckpointerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionCheckpointerImpl.java index b6ef9f4858ad..dce2d084fe22 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionCheckpointerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionCheckpointerImpl.java @@ -2,11 +2,14 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation.changefeed.implementation; +import com.azure.cosmos.implementation.changefeed.CancellationToken; import com.azure.cosmos.implementation.changefeed.Lease; import com.azure.cosmos.implementation.changefeed.LeaseCheckpointer; import com.azure.cosmos.implementation.changefeed.PartitionCheckpointer; +import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; @@ -19,6 +22,7 @@ class PartitionCheckpointerImpl implements PartitionCheckpointer { private final Logger logger = LoggerFactory.getLogger(PartitionCheckpointerImpl.class); private final LeaseCheckpointer leaseCheckpointer; private Lease lease; + private CancellationToken cancellationToken; public PartitionCheckpointerImpl(LeaseCheckpointer leaseCheckpointer, Lease lease) { this.leaseCheckpointer = leaseCheckpointer; @@ -31,13 +35,23 @@ public Mono checkpointPartition(ChangeFeedState continuationState) { checkArgument( continuationState.getContinuation().getContinuationTokenCount() == 1, "For ChangeFeedProcessor the continuation state should always have one range/continuation"); + + if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException()); + return this.leaseCheckpointer.checkpoint( this.lease, - continuationState.getContinuation().getCurrentContinuationToken().getToken()) + continuationState.getContinuation().getCurrentContinuationToken().getToken(), + cancellationToken) .map(lease1 -> { this.lease = lease1; logger.info("Checkpoint: partition {}, new continuation {}", this.lease.getLeaseToken(), this.lease.getContinuationToken()); return lease1; }); } + + @Override + public PartitionCheckpointer setCancellationToken(CancellationToken cancellationToken) { + this.cancellationToken = cancellationToken; + return this; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorFactoryImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorFactoryImpl.java index 1366651d5743..0c6f82270c40 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorFactoryImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorFactoryImpl.java @@ -113,6 +113,6 @@ public PartitionProcessor create(Lease lease, ChangeFeedObserver observer) { .withMaxItemCount(this.changeFeedProcessorOptions.getMaxItemCount()); PartitionCheckpointer checkpointer = new PartitionCheckpointerImpl(this.leaseCheckpointer, lease); - return new PartitionProcessorImpl(observer, this.documentClient, settings, checkpointer); + return new PartitionProcessorImpl(observer, this.documentClient, settings, checkpointer, lease); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java index b13dbdaff1be..45e0488554c0 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java @@ -4,6 +4,7 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.CosmosSchedulers; +import com.azure.cosmos.implementation.changefeed.Lease; import com.azure.cosmos.implementation.feedranges.FeedRangeInternal; import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl; import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; @@ -45,6 +46,7 @@ class PartitionProcessorImpl implements PartitionProcessor { private final ChangeFeedObserver observer; private volatile CosmosChangeFeedRequestOptions options; private final ChangeFeedContextClient documentClient; + private final Lease lease; private volatile RuntimeException resultException; private volatile String lastServerContinuationToken; @@ -53,11 +55,13 @@ class PartitionProcessorImpl implements PartitionProcessor { public PartitionProcessorImpl(ChangeFeedObserver observer, ChangeFeedContextClient documentClient, ProcessorSettings settings, - PartitionCheckpointer checkpointer) { + PartitionCheckpointer checkpointer, + Lease lease) { this.observer = observer; this.documentClient = documentClient; this.settings = settings; this.checkpointer = checkpointer; + this.lease = lease; ChangeFeedState state = settings.getStartState(); this.options = ModelBridgeInternal.createChangeFeedRequestOptionsForChangeFeedState(state); @@ -66,7 +70,9 @@ public PartitionProcessorImpl(ChangeFeedObserver observer, @Override public Mono run(CancellationToken cancellationToken) { + logger.info("Partition {}: processing task started with owner {}.", this.lease.getLeaseToken(), this.lease.getOwner()); this.isFirstQueryForChangeFeeds = true; + this.checkpointer.setCancellationToken(cancellationToken); return Flux.just(this) .flatMap( value -> { @@ -109,6 +115,7 @@ public Mono run(CancellationToken cancellationToken) { .getToken(); if (documentFeedResponse.getResults() != null && documentFeedResponse.getResults().size() > 0) { + logger.info("Partition {}: processing {} feeds with owner {}.", this.lease.getLeaseToken(), documentFeedResponse.getResults().size(), this.lease.getOwner()); return this.dispatchChanges(documentFeedResponse, continuationState) .doOnError(throwable -> logger.debug( "Exception was thrown from thread {}", @@ -145,8 +152,8 @@ public Mono run(CancellationToken cancellationToken) { // we know it is a terminal event. CosmosException clientException = (CosmosException) throwable; - logger.warn("CosmosException: FeedRange {} from thread {}", - this.settings.getStartState().getFeedRange().toString(), Thread.currentThread().getId(), clientException); + logger.warn("CosmosException: Partition {} from thread {} with owner {}", + this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), clientException); StatusCodeErrorType docDbError = ExceptionClassifier.classifyClientException(clientException); switch (docDbError) { @@ -199,15 +206,16 @@ public Mono run(CancellationToken cancellationToken) { } } } else if (throwable instanceof LeaseLostException) { - logger.info("LeaseLoseException with FeedRange {} from thread {}", - this.settings.getStartState().getFeedRange().toString(), Thread.currentThread().getId()); + logger.info("LeaseLoseException with Partition {} from thread {} with owner {}", + this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner()); this.resultException = (LeaseLostException) throwable; } else if (throwable instanceof TaskCancelledException) { - logger.debug("Task cancelled exception: FeedRange {} from {}", - this.settings.getStartState().getFeedRange().toString(), Thread.currentThread().getId(), throwable); + logger.debug("Task cancelled exception: Partition {} from thread {} with owner {}", + this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), throwable); this.resultException = (TaskCancelledException) throwable; } else { - logger.warn("Unexpected exception from thread {}", Thread.currentThread().getId(), throwable); + logger.warn("Unexpected exception: Partition {} from thread {} with owner {}", + this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), throwable); this.resultException = new RuntimeException(throwable); } return Flux.error(throwable); @@ -226,7 +234,11 @@ public Mono run(CancellationToken cancellationToken) { } return Flux.empty(); - }).then(); + }) + .then() + .doFinally( any -> { + logger.info("Partition {}: processing task exited with owner {}.", this.lease.getLeaseToken(), this.lease.getOwner()); + }); } private FeedRangePartitionKeyRangeImpl getPkRangeFeedRangeFromStartState() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java index 31827d6dc0b4..3f65384ca451 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java @@ -78,18 +78,22 @@ private Mono afterRun(ChangeFeedObserverContext context, CancellationToken this.childShutdownCts.cancel(); - if (this.processor.getResultException() != null) { - throw this.processor.getResultException(); - } - - if (this.renewer.getResultException() != null) { - throw this.renewer.getResultException(); - } - closeReason = shutdownToken.isCancellationRequested() ? ChangeFeedObserverCloseReason.SHUTDOWN : ChangeFeedObserverCloseReason.UNKNOWN; + RuntimeException workerException = this.processor.getResultException(); + + // Priority must be given to any exception from the processor worker unless it is a task being cancelled. + if (workerException == null || workerException instanceof TaskCancelledException) { + if (this.renewer.getResultException() != null) { + workerException = this.renewer.getResultException(); + } + } + + if (workerException != null) { + throw workerException; + } } catch (LeaseLostException llex) { closeReason = ChangeFeedObserverCloseReason.LEASE_LOST; this.resultException = llex; diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/SessionContainerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/SessionContainerTest.java index 2edbbf69fe94..e2308513ecd9 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/SessionContainerTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/SessionContainerTest.java @@ -68,6 +68,17 @@ public void sessionContainer() throws Exception { sessionToken = sessionContainer.resolvePartitionLocalSessionToken(request, resolvedPKRange.getId()); assertThat(sessionToken.getLSN()).isEqualTo(2); + + String globalSessionToken = sessionContainer.resolveGlobalSessionToken(request); + assertThat(globalSessionToken).isEqualTo("range_0:1#0#4=90#5=2,range_1:1#1#4=90#5=2,range_4:1#4#4=90#5=2,range_2:1#2#4=90#5=2,range_3:1#3#4=90#5=2"); + + request.getHeaders().put(HttpConstants.HttpHeaders.PARTITION_KEY_RANGE_ID, "not_found"); + globalSessionToken = sessionContainer.resolveGlobalSessionToken(request); + assertThat(globalSessionToken).isEqualTo("range_0:1#0#4=90#5=2,range_1:1#1#4=90#5=2,range_4:1#4#4=90#5=2,range_2:1#2#4=90#5=2,range_3:1#3#4=90#5=2"); + + request.getHeaders().put(HttpConstants.HttpHeaders.PARTITION_KEY_RANGE_ID, "range_1"); + globalSessionToken = sessionContainer.resolveGlobalSessionToken(request); + assertThat(globalSessionToken).isEqualTo("range_1:1#1#4=90#5=2"); } @Test(groups = "unit") diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java index aeb444474363..fccb84e29ae2 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java @@ -2,7 +2,6 @@ // Licensed under the MIT License. package com.azure.cosmos.rx; -import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ChangeFeedProcessor; import com.azure.cosmos.ChangeFeedProcessorBuilder; import com.azure.cosmos.implementation.AsyncDocumentClient; @@ -15,6 +14,7 @@ import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosContainerRequestOptions; import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.SqlParameter; @@ -449,11 +449,10 @@ public void staledLeaseAcquiring() throws InterruptedException { ServiceItemLease leaseDocument = ServiceItemLease.fromDocument(doc); leaseDocument.setOwner("TEMP_OWNER"); CosmosItemRequestOptions options = new CosmosItemRequestOptions(); - return createdLeaseCollection.replaceItem(doc, doc.getId(), new PartitionKey(doc.getId()), options) - .map(itemResponse -> BridgeInternal.getProperties(itemResponse)); + return createdLeaseCollection.replaceItem(leaseDocument, leaseDocument.getId(), new PartitionKey(leaseDocument.getId()), options) + .map(CosmosItemResponse::getItem); }) - .map(ServiceItemLease::fromDocument) - .map(leaseDocument -> { + .map(leaseDocument -> { ChangeFeedProcessorTest.log.info("QueryItems after Change feed processor processing; found host {}", leaseDocument.getOwner()); return leaseDocument; }) @@ -511,7 +510,6 @@ public void staledLeaseAcquiring() throws InterruptedException { @Test(groups = { "emulator" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT) public void ownerNullAcquiring() throws InterruptedException { final String ownerFirst = "Owner_First"; - final String ownerSecond = "Owner_Second"; final String leasePrefix = "TEST"; CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); @@ -519,55 +517,34 @@ public void ownerNullAcquiring() throws InterruptedException { try { List createdDocuments = new ArrayList<>(); Map receivedDocuments = new ConcurrentHashMap<>(); - setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, FEED_COUNT); ChangeFeedProcessor changeFeedProcessorFirst = new ChangeFeedProcessorBuilder() .hostName(ownerFirst) .handleChanges(docs -> { ChangeFeedProcessorTest.log.info("START processing from thread {} using host {}", Thread.currentThread().getId(), ownerFirst); - try { - Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted exception", e); - } - ChangeFeedProcessorTest.log.info("END processing from thread {} using host {}", Thread.currentThread().getId(), ownerFirst); - }) - .feedContainer(createdFeedCollection) - .leaseContainer(createdLeaseCollection) - .options(new ChangeFeedProcessorOptions() - .setLeasePrefix(leasePrefix) - .setLeaseRenewInterval(Duration.ofSeconds(1)) - .setLeaseAcquireInterval(Duration.ofSeconds(2)) - .setLeaseExpirationInterval(Duration.ofSeconds(20)) - .setFeedPollDelay(Duration.ofSeconds(1)) - ) - .buildChangeFeedProcessor(); - - ChangeFeedProcessor changeFeedProcessorSecond = new ChangeFeedProcessorBuilder() - .hostName(ownerSecond) - .handleChanges((List docs) -> { - ChangeFeedProcessorTest.log.info("START processing from thread {} using host {}", Thread.currentThread().getId(), ownerSecond); for (JsonNode item : docs) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + } processItem(item, receivedDocuments); } - ChangeFeedProcessorTest.log.info("END processing from thread {} using host {}", Thread.currentThread().getId(), ownerSecond); + ChangeFeedProcessorTest.log.info("END processing from thread {} using host {}", Thread.currentThread().getId(), ownerFirst); }) .feedContainer(createdFeedCollection) .leaseContainer(createdLeaseCollection) .options(new ChangeFeedProcessorOptions() - .setLeaseRenewInterval(Duration.ofSeconds(10)) - .setLeaseAcquireInterval(Duration.ofSeconds(5)) - .setLeaseExpirationInterval(Duration.ofSeconds(20)) - .setFeedPollDelay(Duration.ofSeconds(2)) - .setLeasePrefix(leasePrefix) - .setMaxItemCount(10) .setStartFromBeginning(true) - .setMaxScaleCount(0) // unlimited + .setLeasePrefix(leasePrefix) + .setLeaseRenewInterval(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .setLeaseAcquireInterval(Duration.ofMillis(5 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .setLeaseExpirationInterval(Duration.ofMillis(6 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .setFeedPollDelay(Duration.ofSeconds(5)) ) .buildChangeFeedProcessor(); try { - ChangeFeedProcessorTest.log.info("Start creating documents"); + ChangeFeedProcessorTest.log.info("Start more creating documents"); List docDefList = new ArrayList<>(); for (int i = 0; i < FEED_COUNT; i++) { @@ -583,59 +560,53 @@ public void ownerNullAcquiring() throws InterruptedException { }) .then( Mono.just(changeFeedProcessorFirst) - .flatMap( value -> { - try { - Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted exception", e); - } - ChangeFeedProcessorTest.log.info("Update leases for Change feed processor in thread {} using host {}", Thread.currentThread().getId(), "Owner_first"); - - SqlParameter param1 = new SqlParameter(); - param1.setName("@PartitionLeasePrefix"); - param1.setValue(leasePrefix); - SqlParameter param2 = new SqlParameter(); - param2.setName("@Owner"); - param2.setValue(ownerFirst); - - SqlQuerySpec querySpec = new SqlQuerySpec( - "SELECT * FROM c WHERE STARTSWITH(c.id, @PartitionLeasePrefix) AND c.Owner=@Owner", Arrays.asList(param1, param2)); - - CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions(); - - return createdLeaseCollection.queryItems(querySpec, cosmosQueryRequestOptions, InternalObjectNode.class).byPage() - .flatMap(documentFeedResponse -> reactor.core.publisher.Flux.fromIterable(documentFeedResponse.getResults())) - .flatMap(doc -> { - ServiceItemLease leaseDocument = ServiceItemLease.fromDocument(doc); - leaseDocument.setOwner(null); - CosmosItemRequestOptions options = new CosmosItemRequestOptions(); - return createdLeaseCollection.replaceItem(doc, doc.getId(), new PartitionKey(doc.getId()), options) - .map(itemResponse -> BridgeInternal.getProperties(itemResponse)); - }) - .map(ServiceItemLease::fromDocument) - .map(leaseDocument -> { - ChangeFeedProcessorTest.log.info("QueryItems after Change feed processor processing; setting host to '{}'", leaseDocument.getOwner()); - return leaseDocument; - }) - .last() - .flatMap(leaseDocument -> { - ChangeFeedProcessorTest.log.info("Start creating documents"); - List docDefList1 = new ArrayList<>(); - - for (int i = 0; i < FEED_COUNT; i++) { - docDefList1.add(getDocumentDefinition()); - } - - return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) - .last() - .delayElement(Duration.ofMillis(1000)) - .flatMap(cosmosItemResponse -> { - ChangeFeedProcessorTest.log.info("Start second Change feed processor"); - return changeFeedProcessorSecond.start().subscribeOn(Schedulers.elastic()) - .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)); - }); - }); - })) + .flatMap( value -> { + ChangeFeedProcessorTest.log.info("Update leases for Change feed processor in thread {} using host {}", Thread.currentThread().getId(), "Owner_first"); + try { + Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); + } catch (InterruptedException ignored) { + } + + ChangeFeedProcessorTest.log.info("QueryItems before Change feed processor processing"); + + SqlParameter param1 = new SqlParameter(); + param1.setName("@PartitionLeasePrefix"); + param1.setValue(leasePrefix); + SqlParameter param2 = new SqlParameter(); + param2.setName("@Owner"); + param2.setValue(ownerFirst); + + SqlQuerySpec querySpec = new SqlQuerySpec( + "SELECT * FROM c WHERE STARTSWITH(c.id, @PartitionLeasePrefix) AND c.Owner=@Owner", Arrays.asList(param1, param2)); + + CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions(); + + return createdLeaseCollection.queryItems(querySpec, cosmosQueryRequestOptions, InternalObjectNode.class).byPage() + .flatMap(documentFeedResponse -> reactor.core.publisher.Flux.fromIterable(documentFeedResponse.getResults())) + .flatMap(doc -> { + ServiceItemLease leaseDocument = ServiceItemLease.fromDocument(doc); + leaseDocument.setOwner(null); + CosmosItemRequestOptions options = new CosmosItemRequestOptions(); + return createdLeaseCollection.replaceItem(leaseDocument, leaseDocument.getId(), new PartitionKey(leaseDocument.getId()), options) + .map(CosmosItemResponse::getItem); + }) + .map(leaseDocument -> { + ChangeFeedProcessorTest.log.info("QueryItems after Change feed processor processing; current Owner is'{}'", leaseDocument.getOwner()); + return leaseDocument; + }) + .last() + .flatMap(leaseDocument -> { + ChangeFeedProcessorTest.log.info("Start creating more documents"); + List docDefList1 = new ArrayList<>(); + + for (int i = 0; i < FEED_COUNT; i++) { + docDefList1.add(getDocumentDefinition()); + } + + return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) + .last(); + }); + })) .subscribe(); } catch (Exception ex) { log.error("First change feed processor did not start in the expected time", ex); @@ -643,17 +614,18 @@ public void ownerNullAcquiring() throws InterruptedException { } long remainingWork = 20 * CHANGE_FEED_PROCESSOR_TIMEOUT; - while (remainingWork > 0 && changeFeedProcessorFirst.isStarted() && changeFeedProcessorSecond.isStarted()) { + while (remainingWork > 0 && !changeFeedProcessorFirst.isStarted()) { remainingWork -= 100; Thread.sleep(100); } // Wait for the feed processor to receive and process the documents. - waitToReceiveDocuments(receivedDocuments, 10 * CHANGE_FEED_PROCESSOR_TIMEOUT, 2 * FEED_COUNT); + waitToReceiveDocuments(receivedDocuments, 30 * CHANGE_FEED_PROCESSOR_TIMEOUT, 2 * FEED_COUNT); - assertThat(changeFeedProcessorSecond.isStarted()).as("Change Feed Processor instance is running").isTrue(); + assertThat(changeFeedProcessorFirst.isStarted()).as("Change Feed Processor instance is running").isTrue(); + + Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); - changeFeedProcessorSecond.stop().subscribeOn(Schedulers.elastic()).timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe(); changeFeedProcessorFirst.stop().subscribeOn(Schedulers.elastic()).timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe(); // Wait for the feed processor to shutdown. @@ -870,7 +842,7 @@ private void waitToReceiveDocuments(Map receivedDocuments, lon Thread.sleep(100); } - assertThat(remainingWork >= 0).as("Failed to receive all the feed documents").isTrue(); + assertThat(remainingWork > 0).as("Failed to receive all the feed documents").isTrue(); } private Consumer> leasesChangeFeedProcessorHandler(LeaseStateMonitor leaseStateMonitor) {