diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java index 7cb9320e17fe..a27deb774a35 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java @@ -5,16 +5,20 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.implementation.AuthorizationTokenType; import com.azure.cosmos.implementation.ClientSideRequestStatistics; import com.azure.cosmos.implementation.DiagnosticsClientContext; import com.azure.cosmos.implementation.FailureValidator; import com.azure.cosmos.implementation.GoneException; +import com.azure.cosmos.implementation.HttpConstants.SubStatusCodes; import com.azure.cosmos.implementation.IAuthorizationTokenProvider; import com.azure.cosmos.implementation.ISessionContainer; +import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.PartitionIsMigratingException; import com.azure.cosmos.implementation.PartitionKeyRangeGoneException; import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException; import com.azure.cosmos.implementation.RequestTimeoutException; +import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RetryContext; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.SessionTokenHelper; @@ -33,10 +37,12 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; @@ -358,6 +364,224 @@ public void isGlobalStrongRequest(ConsistencyLevel defaultConsistencyLevel, RxDo assertThat(consistencyWriter.isGlobalStrongRequest(req, storeResponse)).isEqualTo(isGlobalStrongExpected); } + @Test(groups = "unit") + public void writeAsyncGlobalStrongRequest() { + runWriteAsyncBarrierableRequestTest(true, true); + } + + @Test(groups = "unit") + public void writeAsyncGlobalStrongRequestFailed() { + runWriteAsyncBarrierableRequestTest(true, false); + } + + @Test(groups = "unit") + public void writeAsyncNRegionCommitRequest() { + runWriteAsyncBarrierableRequestTest(false, true); + } + + @Test(groups = "unit") + public void writeAsyncNRegionCommitRequestFailed() { + runWriteAsyncBarrierableRequestTest(false, false); + } + + @Test(groups = "unit") + public void writeAsyncNoBarrierRequest() { + initializeConsistencyWriter(false); + RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext); + TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class); + Mockito.doReturn(false).when(timeoutHelper).isElapsed(); + StoreResponse storeResponse = Mockito.mock(StoreResponse.class); + Mockito.doReturn("0").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS); + Mockito.doReturn(ConsistencyLevel.SESSION).when(serviceConfigReader).getDefaultConsistencyLevel(); + ConsistencyWriter spyWriter = Mockito.spy(this.consistencyWriter); + Mockito.doReturn(Mono.just(storeResponse)).when(spyWriter).barrierForGlobalStrong(Mockito.any(), Mockito.any(), Mockito.any()); + AddressInformation addressInformation = Mockito.mock(AddressInformation.class); + Uri primaryUri = Mockito.mock(Uri.class); + Mockito.doReturn(true).when(primaryUri).isPrimary(); + Mockito.doReturn("Healthy").when(primaryUri).getHealthStatusDiagnosticString(); + Mockito.doReturn(primaryUri).when(addressInformation).getPhysicalUri(); + List addressList = Collections.singletonList(addressInformation); + Mockito.doReturn(Mono.just(addressList)).when(addressSelector).resolveAddressesAsync(Mockito.any(), Mockito.anyBoolean()); + Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class)); + Mono result = spyWriter.writeAsync(request, timeoutHelper, false); + StepVerifier.create(result) + .expectNext(storeResponse) + .expectComplete() + .verify(); + } + + @Test + public void isBarrierRequest() { + // Setup ConsistencyWriter with useMultipleWriteLocations false + initializeConsistencyWriter(false); + ConsistencyWriter writer = this.consistencyWriter; + RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext); + StoreResponse response = Mockito.mock(StoreResponse.class); + + // 1. Global strong enabled and isGlobalStrongRequest returns true + try (MockedStatic replicatedResourceClientMock = Mockito.mockStatic(ReplicatedResourceClient.class)) { + replicatedResourceClientMock.when(ReplicatedResourceClient::isGlobalStrongEnabled).thenReturn(true); + ConsistencyWriter spyWriter = Mockito.spy(writer); + Mockito.doReturn(true).when(spyWriter).isGlobalStrongRequest(request, response); + boolean result = spyWriter.isBarrierRequest(request, response); + assertThat(result).isTrue(); + } + + // 2. NRegionSynchronousCommitEnabled path + // Setup request.requestContext.getNRegionSynchronousCommitEnabled() to true + request.requestContext.setNRegionSynchronousCommitEnabled(true); + // useMultipleWriteLocations is already false + Mockito.doReturn("123").when(response).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN); + Mockito.doReturn(2L).when(response).getNumberOfReadRegions(); + boolean nRegionResult = writer.isBarrierRequest(request, response); + assertThat(nRegionResult).isTrue(); + + // 3. Negative case: NRegionSynchronousCommitEnabled false + request.requestContext.setNRegionSynchronousCommitEnabled(false); + boolean negativeResult = writer.isBarrierRequest(request, response); + assertThat(negativeResult).isFalse(); + + // 4. Negative case: useMultipleWriteLocations true + initializeConsistencyWriter(true); + writer = this.consistencyWriter; + request.requestContext.setNRegionSynchronousCommitEnabled(true); + boolean negativeResult2 = writer.isBarrierRequest(request, response); + assertThat(negativeResult2).isFalse(); + + // 5. Negative case: GLOBAL_NREGION_COMMITTED_LSN header missing + initializeConsistencyWriter(false); + writer = this.consistencyWriter; + request.requestContext.setNRegionSynchronousCommitEnabled(true); + Mockito.doReturn(null).when(response).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN); + boolean negativeResult3 = writer.isBarrierRequest(request, response); + assertThat(negativeResult3).isFalse(); + + // 6. Negative case: NUMBER_OF_READ_REGIONS header missing or zero + Mockito.doReturn(0L).when(response).getNumberOfReadRegions(); + boolean negativeResult4 = writer.isBarrierRequest(request, response); + assertThat(negativeResult4).isFalse(); + } + + private void runWriteAsyncBarrierableRequestTest(boolean globalStrong, boolean barrierMet) { + RxDocumentServiceRequest request = setupRequest(!globalStrong); + TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class); + Mockito.doReturn(false).when(timeoutHelper).isElapsed(); + StoreResponse storeResponse = setupStoreResponse(!globalStrong); + List addressList = setupAddressList(); + List storeResults = new ArrayList<>(); + if (barrierMet) { + storeResults.add(getStoreResult(storeResponse, 1L)); + storeResults.add(getStoreResult(storeResponse, 2L)); + } else { + storeResults.add(getStoreResult(storeResponse, 1L)); + } + StoreReader storeReader = setupStoreReader(storeResults); + initializeConsistencyWriterWithStoreReader(false, storeReader); + ConsistencyWriter spyWriter = Mockito.spy(this.consistencyWriter); + Mockito.doReturn(globalStrong ? ConsistencyLevel.STRONG : ConsistencyLevel.SESSION) + .when(serviceConfigReader).getDefaultConsistencyLevel(); + Mockito.doReturn(Mono.just(addressList)).when(addressSelector).resolveAddressesAsync(Mockito.any(), Mockito.anyBoolean()); + Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class)); + Mono result = spyWriter.writeAsync(request, timeoutHelper, false); + if (!barrierMet) { + StepVerifier.create(result) + .expectErrorSatisfies(error -> { + assertThat(error).isInstanceOf(GoneException.class); + FailureValidator failureValidator = FailureValidator.builder() + .instanceOf(GoneException.class) + .statusCode(GONE) + .subStatusCode(globalStrong ? SubStatusCodes.GLOBAL_STRONG_WRITE_BARRIER_NOT_MET : SubStatusCodes.GLOBAL_N_REGION_COMMIT_WRITE_BARRIER_NOT_MET) + .build(); + failureValidator.validate(error); + }) + .verify(); + } else { + StepVerifier.create(result) + .expectNext(storeResponse) + .expectComplete() + .verify(); + } + } + + private RxDocumentServiceRequest setupRequest(boolean nRegionCommit) { + RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext); + if (nRegionCommit) { + request.requestContext.setNRegionSynchronousCommitEnabled(true); + } + Mockito.doReturn(ResourceType.Document).when(request).getResourceType(); + Mockito.doReturn(OperationType.Create).when(request).getOperationType(); + Mockito.doReturn("1-MxAPlgMgA=").when(request).getResourceId(); + request.authorizationTokenType = AuthorizationTokenType.PrimaryMasterKey; + return request; + } + + private StoreResponse setupStoreResponse(boolean nRegionCommit) { + StoreResponse storeResponse = Mockito.mock(StoreResponse.class); + Mockito.doReturn(1L).when(storeResponse).getNumberOfReadRegions(); + Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS); + if (nRegionCommit) { + Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN); + } else { + Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN); + } + Mockito.doReturn("2").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.LSN); + return storeResponse; + } + + private List setupAddressList() { + AddressInformation addressInformation = Mockito.mock(AddressInformation.class); + Uri primaryUri = Mockito.mock(Uri.class); + Mockito.doReturn(true).when(primaryUri).isPrimary(); + Mockito.doReturn("Healthy").when(primaryUri).getHealthStatusDiagnosticString(); + Mockito.doReturn(primaryUri).when(addressInformation).getPhysicalUri(); + return Collections.singletonList(addressInformation); + } + + private StoreReader setupStoreReader(List storeResults) { + StoreReader storeReader = Mockito.mock(StoreReader.class); + Mono>[] monos = storeResults.stream() + .map(Collections::singletonList) + .map(Mono::just) + .toArray(Mono[]::new); + Mockito.when(storeReader.readMultipleReplicaAsync( + Mockito.any(), + Mockito.anyBoolean(), + Mockito.anyInt(), + Mockito.anyBoolean(), + Mockito.anyBoolean(), + Mockito.any(), + Mockito.anyBoolean(), + Mockito.anyBoolean())) + .thenReturn(monos.length > 0 ? monos[0] : Mono.empty(), + Arrays.copyOfRange(monos, 1, monos.length)); + return storeReader; + } + + private StoreResult getStoreResult(StoreResponse storeResponse, long globalCommittedLsn) { + return new StoreResult( + storeResponse, + null, + "1", + 1, + 1, + 1.0, + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + 4, + 2, + true, + null, + globalCommittedLsn, + 1, + 1, + null, + 0.3, + 90.0); + } + + + + private void initializeConsistencyWriter(boolean useMultipleWriteLocation) { addressSelector = Mockito.mock(AddressSelector.class); sessionContainer = Mockito.mock(ISessionContainer.class); @@ -375,6 +599,24 @@ private void initializeConsistencyWriter(boolean useMultipleWriteLocation) { null); } + private void initializeConsistencyWriterWithStoreReader(boolean useMultipleWriteLocation, StoreReader reader) { + addressSelector = Mockito.mock(AddressSelector.class); + sessionContainer = Mockito.mock(ISessionContainer.class); + transportClient = Mockito.mock(TransportClient.class); + IAuthorizationTokenProvider authorizationTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class); + serviceConfigReader = Mockito.mock(GatewayServiceConfigurationReader.class); + + consistencyWriter = new ConsistencyWriter(clientContext, + addressSelector, + sessionContainer, + transportClient, + authorizationTokenProvider, + serviceConfigReader, + useMultipleWriteLocation, + reader, + null); + } + public static void validateError(Mono single, FailureValidator validator) { TestSubscriber testSubscriber = TestSubscriber.create(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java index 3ea927bf9b72..555020ae8b09 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java @@ -205,6 +205,7 @@ public static final class Properties { public static final String THINCLIENT_READABLE_LOCATIONS = "thinClientReadableLocations"; public static final String DATABASE_ACCOUNT_ENDPOINT = "databaseAccountEndpoint"; public static final String ENABLE_PER_PARTITION_FAILOVER_BEHAVIOR = "enablePerPartitionFailoverBehavior"; + public static final String ENABLE_N_REGION_SYNCHRONOUS_COMMIT = "enableNRegionSynchronousCommit"; //Authorization public static final String MASTER_TOKEN = "master"; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DatabaseAccount.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DatabaseAccount.java index 52c2f66ac7df..8338b3eb40b5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DatabaseAccount.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DatabaseAccount.java @@ -300,6 +300,23 @@ public Boolean isPerPartitionFailoverBehaviorEnabled() { return null; } + /** + * Returns true if the account supports N region synchronous commit, + * false if enableNRegionSynchronousCommit evaluates to null or false. + *

+ * If enableNRegionSynchronousCommit property does not exist in account metadata JSON payload, null is returned. + * + * @return true if the account supports N region synchronous commit, false otherwise. + */ + public Boolean isNRegionSynchronousCommitEnabled() { + + if (super.has(Constants.Properties.ENABLE_N_REGION_SYNCHRONOUS_COMMIT)) { + return ObjectUtils.defaultIfNull(super.getBoolean(Constants.Properties.ENABLE_N_REGION_SYNCHRONOUS_COMMIT), false); + } + + return null; + } + public void setIsPerPartitionFailoverBehaviorEnabled(boolean value) { this.set(Constants.Properties.ENABLE_PER_PARTITION_FAILOVER_BEHAVIOR, value); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java index 18da5250c458..e13d198e97c4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java @@ -8,14 +8,15 @@ import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; import com.azure.cosmos.CosmosException; import com.azure.cosmos.ReadConsistencyStrategy; -import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PartitionLevelFailoverInfo; -import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionFailoverInfoHolder; -import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder; -import com.azure.cosmos.implementation.perPartitionCircuitBreaker.LocationSpecificHealthContext; +import com.azure.cosmos.implementation.directconnectivity.BarrierType; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; import com.azure.cosmos.implementation.directconnectivity.StoreResult; import com.azure.cosmos.implementation.directconnectivity.TimeoutHelper; import com.azure.cosmos.implementation.directconnectivity.Uri; +import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PartitionLevelFailoverInfo; +import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionFailoverInfoHolder; +import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder; +import com.azure.cosmos.implementation.perPartitionCircuitBreaker.LocationSpecificHealthContext; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.RegionalRoutingContext; import com.azure.cosmos.implementation.throughputControl.ThroughputControlRequestContext; @@ -38,7 +39,7 @@ public class DocumentServiceRequestContext implements Cloneable { public volatile ISessionToken sessionToken; public volatile long quorumSelectedLSN; public volatile long globalCommittedSelectedLSN; - public volatile StoreResponse globalStrongWriteResponse; + public volatile StoreResponse cachedWriteResponse; public volatile ConsistencyLevel originalRequestConsistencyLevel; public volatile ReadConsistencyStrategy readConsistencyStrategy; public volatile PartitionKeyRange resolvedPartitionKeyRange; @@ -65,6 +66,8 @@ public class DocumentServiceRequestContext implements Cloneable { private volatile long approximateBloomFilterInsertionCount; private final Set sessionTokenEvaluationResults = ConcurrentHashMap.newKeySet(); private volatile List unavailableRegionsForPartition; + private volatile Boolean nRegionSynchronousCommitEnabled; + private volatile BarrierType barrierType = BarrierType.NONE; // For cancelled rntbd requests, track the response as OperationCancelledException which later will be used to populate the cosmosDiagnostics public final Map rntbdCancelledRequestMap = new ConcurrentHashMap<>(); @@ -148,7 +151,7 @@ public DocumentServiceRequestContext clone() { context.sessionToken = this.sessionToken; context.quorumSelectedLSN = this.quorumSelectedLSN; context.globalCommittedSelectedLSN = this.globalCommittedSelectedLSN; - context.globalStrongWriteResponse = this.globalStrongWriteResponse; + context.cachedWriteResponse = this.cachedWriteResponse; context.originalRequestConsistencyLevel = this.originalRequestConsistencyLevel; context.readConsistencyStrategy = this.readConsistencyStrategy; context.resolvedPartitionKeyRange = this.resolvedPartitionKeyRange; @@ -266,5 +269,21 @@ public void setPerPartitionAutomaticFailoverInfoHolder(PartitionLevelFailoverInf this.perPartitionFailoverInfoHolder.setPartitionLevelFailoverInfo(partitionLevelFailoverInfo); } } + + public Boolean getNRegionSynchronousCommitEnabled() { + return nRegionSynchronousCommitEnabled; + } + + public void setNRegionSynchronousCommitEnabled(Boolean nRegionSynchronousCommitEnabled) { + this.nRegionSynchronousCommitEnabled = nRegionSynchronousCommitEnabled; + } + + public BarrierType getBarrierType() { + return barrierType; + } + + public void setBarrierType(BarrierType barrierType) { + this.barrierType = barrierType; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index ec0ceb536615..07a3bbafeb4c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -437,4 +437,8 @@ private List getEffectivePreferredRegions() { public void setPerPartitionAutomaticFailoverConfigModifier(Consumer perPartitionAutomaticFailoverConfigModifier) { this.perPartitionAutomaticFailoverConfigModifier = perPartitionAutomaticFailoverConfigModifier; } + + public Boolean getNRegionSynchronousCommitEnabled() { + return this.latestDatabaseAccount.isNRegionSynchronousCommitEnabled(); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java index 174b155485f8..5ae5bb50067a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java @@ -481,6 +481,7 @@ public static class SubStatusCodes { public static final int NO_VALID_STORE_RESPONSE = 21009; public static final int SERVER_GENERATED_408 = 21010; public static final int FAILED_TO_PARSE_SERVER_RESPONSE = 21011; + public static final int GLOBAL_N_REGION_COMMIT_WRITE_BARRIER_NOT_MET = 21012; } public static class HeaderValues { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RMResources.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RMResources.java index 731accfed3a0..3748f8b4f14f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RMResources.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RMResources.java @@ -43,6 +43,7 @@ public class RMResources { public static final String ReadConsistencyStrategyGlobalStrongOnlyAllowedForGlobalStrongAccount = "Value '%s' specified for the header '%s' is invalid. This value requires the account-level default consistency level to be '%s' - but it is '%s'."; public static final String RequestTimeout = "Request timed out. More info: https://aka.ms/cosmosdb-tsg-request-timeout-java"; public static final String GlobalStrongWriteBarrierNotMet = "Global STRONG write barrier has not been met for the request."; + public static final String NRegionSyncCommitBarrierNotMet = "NRegion Commit write barrier has not been met for the request, since the gap between the local LSN and global n-region committed lsn is more than 1"; public static final String InvalidRequestHeaderValue = "INVALID value for request header %s: %s"; public static final String InvalidResourceAddress = "INVALID address for resource %s: %s"; public static final String ReadQuorumNotMet = "READ Quorum size of %d is not met for the request."; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 025e6c96e657..a98bd371c2ba 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -2689,6 +2689,7 @@ private Mono> createDocumentInternal( options.setPartitionKeyDefinition(documentCollectionValueHolder.v.getPartitionKey()); request.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest); + request.requestContext.setNRegionSynchronousCommitEnabled(this.globalEndpointManager.getNRegionSynchronousCommitEnabled()); PartitionKeyRange preResolvedPartitionKeyRangeIfAny = setPartitionKeyRangeForPointOperationRequestForPerPartitionAutomaticFailover( request, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierType.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierType.java new file mode 100644 index 000000000000..412b0bb8d298 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierType.java @@ -0,0 +1,10 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.directconnectivity; + +public enum BarrierType { + NONE, + GLOBAL_STRONG_WRITE, + N_REGION_SYNCHRONOUS_COMMIT +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java index a80c0548ce09..b2a2cb358fe4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java @@ -27,6 +27,7 @@ import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.collections.ComparatorUtils; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Exceptions; @@ -110,6 +111,40 @@ public ConsistencyWriter( this.sessionRetryOptions = sessionRetryOptions; } + /** + * Constructor for ConsistencyWriter with StoreReader parameter for dependency injection in unit tests. + * + * @param diagnosticsClientContext the diagnostics client context used to capture diagnostics for requests. + * @param addressSelector the address selector used to resolve physical replica addresses for requests. + * @param sessionContainer the session container used for managing and maintaining session tokens. + * @param transportClient the transport client used to send requests to the backend replicas. + * @param authorizationTokenProvider the authorization token provider used to authenticate requests. + * @param serviceConfigReader the gateway service configuration reader providing service configuration settings. + * @param useMultipleWriteLocations flag indicating whether multiple write locations are enabled for the account. + * @param reader the StoreReader instance to use, injected for testing instead of creating a new StoreReader. + * @param sessionRetryOptions the session retry options used to handle session token mismatch retries. + */ + public ConsistencyWriter( + DiagnosticsClientContext diagnosticsClientContext, + AddressSelector addressSelector, + ISessionContainer sessionContainer, + TransportClient transportClient, + IAuthorizationTokenProvider authorizationTokenProvider, + GatewayServiceConfigurationReader serviceConfigReader, + boolean useMultipleWriteLocations, + StoreReader reader, + SessionRetryOptions sessionRetryOptions) { + this.diagnosticsClientContext = diagnosticsClientContext; + this.transportClient = transportClient; + this.addressSelector = addressSelector; + this.sessionContainer = sessionContainer; + this.authorizationTokenProvider = authorizationTokenProvider; + this.useMultipleWriteLocations = useMultipleWriteLocations; + this.serviceConfigReader = serviceConfigReader; + this.storeReader = reader; + this.sessionRetryOptions = sessionRetryOptions; + } + public Mono writeAsync( RxDocumentServiceRequest entity, TimeoutHelper timeout, @@ -174,7 +209,7 @@ Mono writePrivateAsync( request.requestContext.forceRefreshAddressCache = forceRefresh; - if (request.requestContext.globalStrongWriteResponse == null) { + if (request.requestContext.cachedWriteResponse == null) { Mono> replicaAddressesObs = this.addressSelector.resolveAddressesAsync(request, forceRefresh); AtomicReference primaryURI = new AtomicReference<>(); @@ -304,18 +339,17 @@ Mono writePrivateAsync( .flatMap(v -> { if (!v) { - logger.info("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {}", request.requestContext.globalCommittedSelectedLSN); + logger.info(this.getErrorMessageForBarrierRequest(request)); if (cosmosExceptionValueHolder.get() != null) { return Mono.error(cosmosExceptionValueHolder.get()); } - return Mono.error(new GoneException(RMResources.GlobalStrongWriteBarrierNotMet, - HttpConstants.SubStatusCodes.GLOBAL_STRONG_WRITE_BARRIER_NOT_MET)); + return Mono.error(getGoneExceptionForBarrierRequest(request)); } return Mono.just(request); - })).map(req -> req.requestContext.globalStrongWriteResponse); + })).map(req -> req.requestContext.cachedWriteResponse); } } @@ -339,7 +373,7 @@ Mono barrierForGlobalStrong( AtomicReference cosmosExceptionValueHolder) { try { - if (ReplicatedResourceClient.isGlobalStrongEnabled() && this.isGlobalStrongRequest(request, response)) { + if (isBarrierRequest(request, response)) { Utils.ValueHolder lsn = Utils.ValueHolder.initialize(-1L); Utils.ValueHolder globalCommittedLsn = Utils.ValueHolder.initialize(-1L); @@ -351,7 +385,7 @@ Mono barrierForGlobalStrong( throw new GoneException(RMResources.Gone, HttpConstants.SubStatusCodes.SERVER_GENERATED_410); } - request.requestContext.globalStrongWriteResponse = response; + request.requestContext.cachedWriteResponse = response; request.requestContext.globalCommittedSelectedLSN = lsn.v; //if necessary we would have already refreshed cache by now. @@ -372,25 +406,23 @@ Mono barrierForGlobalStrong( return barrierWait.flatMap(res -> { if (!res) { - logger.error("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {}", - request.requestContext.globalCommittedSelectedLSN); + logger.error(getErrorMessageForBarrierRequest(request)); if (cosmosExceptionValueHolder.get() != null) { return Mono.error(cosmosExceptionValueHolder.get()); } // Reactor doesn't allow throwing checked exception - return Mono.error(new GoneException(RMResources.GlobalStrongWriteBarrierNotMet, - HttpConstants.SubStatusCodes.GLOBAL_STRONG_WRITE_BARRIER_NOT_MET)); + return Mono.error(getGoneExceptionForBarrierRequest(request)); } - return Mono.just(request.requestContext.globalStrongWriteResponse); + return Mono.just(request.requestContext.cachedWriteResponse); }); }); } else { - return Mono.just(request.requestContext.globalStrongWriteResponse); + return Mono.just(request.requestContext.cachedWriteResponse); } } else { return Mono.just(response); @@ -402,7 +434,57 @@ Mono barrierForGlobalStrong( } } - private Mono waitForWriteBarrierAsync( + private String getErrorMessageForBarrierRequest(RxDocumentServiceRequest request) { + if (request.requestContext.getBarrierType() == BarrierType.N_REGION_SYNCHRONOUS_COMMIT) { + return String.format("ConsistencyWriter: Write barrier has not been met for n region synchronous commit request. SelectedGlobalCommittedLsn: %s", + request.requestContext.globalCommittedSelectedLSN); + } + return String.format("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: %s", + request.requestContext.globalCommittedSelectedLSN); + } + + private GoneException getGoneExceptionForBarrierRequest(RxDocumentServiceRequest request){ + if (request.requestContext.getBarrierType() == BarrierType.N_REGION_SYNCHRONOUS_COMMIT) { + return new GoneException(RMResources.NRegionSyncCommitBarrierNotMet, + HttpConstants.SubStatusCodes.GLOBAL_N_REGION_COMMIT_WRITE_BARRIER_NOT_MET); + } + return new GoneException(RMResources.GlobalStrongWriteBarrierNotMet, + HttpConstants.SubStatusCodes.GLOBAL_STRONG_WRITE_BARRIER_NOT_MET); + } + + boolean isBarrierRequest(RxDocumentServiceRequest request, StoreResponse response) { + if (isGlobalStrongBarrierRequest(request, response)) { + request.requestContext.setBarrierType(BarrierType.GLOBAL_STRONG_WRITE); + return true; + } + if (isNRegionSynchronousCommitBarrierRequest(request, response)) { + request.requestContext.setBarrierType(BarrierType.N_REGION_SYNCHRONOUS_COMMIT); + return true; + } + request.requestContext.setBarrierType(BarrierType.NONE); + return false; + } + + /** + * Checks if the request is a Global Strong Write barrier request. + */ + private boolean isGlobalStrongBarrierRequest(RxDocumentServiceRequest request, StoreResponse response) { + return ReplicatedResourceClient.isGlobalStrongEnabled() && this.isGlobalStrongRequest(request, response); + } + + /** + * Checks if the request is a NRegion Synchronous Commit barrier request. + */ + private boolean isNRegionSynchronousCommitBarrierRequest(RxDocumentServiceRequest request, StoreResponse response) { + return request.requestContext.getNRegionSynchronousCommitEnabled() + && !this.useMultipleWriteLocations + && StringUtils.isNotEmpty(response.getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN)) + && Long.parseLong(response.getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN)) != -1 + && response.getNumberOfReadRegions() > 0; + } + + // visibility set to package-private for testing + Mono waitForWriteBarrierAsync( RxDocumentServiceRequest barrierRequest, long selectedGlobalCommittedLsn, AtomicReference cosmosExceptionValueHolder) { @@ -504,6 +586,9 @@ static void getLsnAndGlobalCommittedLsn(StoreResponse response, Utils.ValueHolde if ((headerValue = response.getHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN)) != null) { globalCommittedLsn.v = Long.parseLong(headerValue); } + else if ((headerValue = response.getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN)) != null) { + globalCommittedLsn.v = Long.parseLong(headerValue); + } } private Mono isBarrierMeetPossibleInPresenceOfAvoidQuorumSelectionException( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java index a3da4b9c487e..fb906a9767c2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java @@ -275,6 +275,19 @@ int getSubStatusCode() { return subStatusCode; } + public long getNumberOfReadRegions() { + int numberOfReadRegions = -1; + String numberOfReadRegionsString = this.getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS); + if (StringUtils.isNotEmpty(numberOfReadRegionsString)) { + try { + return Long.parseLong(numberOfReadRegionsString); + } catch (NumberFormatException e) { + // If value cannot be parsed as Long, return -1. + } + } + return numberOfReadRegions; + } + public Map> getReplicaStatusList() { return this.replicaStatusList; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/WFConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/WFConstants.java index d71dedc2c400..cf7d2dd10e89 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/WFConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/WFConstants.java @@ -77,5 +77,6 @@ public static class BackendHeaders { public static final String BACKEND_REQUEST_DURATION_MILLISECONDS = "x-ms-request-duration-ms"; public static final String INDEX_UTILIZATION = "x-ms-cosmos-index-utilization"; public static final String QUERY_EXECUTION_INFO = "x-ms-cosmos-query-execution-info"; + public static final String GLOBAL_N_REGION_COMMITTED_GLSN = "x-ms-cosmos-global-nregion-committed-glsn"; } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java index 0e9dfa7b86de..2cb0720c33a6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java @@ -906,7 +906,8 @@ public enum RntbdResponseHeader implements RntbdHeader { HasTentativeWrites((short) 0x003D, RntbdTokenType.Byte, false), SessionToken((short) 0x003E, RntbdTokenType.String, false), BackendRequestDurationMilliseconds((short) 0X0051, RntbdTokenType.Double, false), - CorrelatedActivityId((short) 0X0052, RntbdTokenType.Guid, false); + CorrelatedActivityId((short) 0X0052, RntbdTokenType.Guid, false), + GlobalNRegionCommittedGLSN((short) 0x0078, RntbdTokenType.LongLong, false); public static final Map map; public static final EnumSet set = EnumSet.allOf(RntbdResponseHeader.class); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponseHeaders.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponseHeaders.java index 5b769fb4ce43..002d33546530 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponseHeaders.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponseHeaders.java @@ -140,6 +140,8 @@ public class RntbdResponseHeaders extends RntbdTokenStream private final RntbdToken backendRequestDurationMilliseconds; @JsonProperty private final RntbdToken correlatedActivityId; + @JsonProperty + private final RntbdToken globalNRegionCommittedGLSN; // endregion @@ -200,6 +202,7 @@ private RntbdResponseHeaders(ByteBuf in) { this.xpRole = this.get(RntbdResponseHeader.XPRole); this.backendRequestDurationMilliseconds = this.get(RntbdResponseHeader.BackendRequestDurationMilliseconds); this.correlatedActivityId = this.get(RntbdResponseHeader.CorrelatedActivityId); + this.globalNRegionCommittedGLSN = this.get(RntbdResponseHeader.GlobalNRegionCommittedGLSN); } boolean isPayloadPresent() { @@ -304,6 +307,7 @@ public void setValues(final Map headers) { this.mapValue(this.xpRole, BackendHeaders.XP_ROLE, Integer::parseInt, headers); this.mapValue(this.backendRequestDurationMilliseconds, BackendHeaders.BACKEND_REQUEST_DURATION_MILLISECONDS, Double::parseDouble, headers); this.mapValue(this.correlatedActivityId, HttpHeaders.CORRELATED_ACTIVITY_ID, UUID::fromString, headers); + this.mapValue(this.globalNRegionCommittedGLSN, BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN, Long::parseLong, headers); } @Override @@ -503,6 +507,10 @@ private void collectEntries(final BiConsumer toUuidEntry(HttpHeaders.CORRELATED_ACTIVITY_ID, token)); + + collector.accept(this.globalNRegionCommittedGLSN, token -> + toLongEntry(BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN, token) + ); } private void mapValue(final RntbdToken token, final String name, final Function parse, final Map headers) {