From 793b17653a7729af2f306e0de5b6d6d4ca20e0d2 Mon Sep 17 00:00:00 2001 From: Bhaskar Mallapragada Date: Fri, 9 Jan 2026 18:44:44 -0800 Subject: [PATCH 1/8] Adding support for nregion commit feature --- .../ConsistencyWriterTest.java | 146 ++++++++++++++++++ .../cosmos/implementation/Constants.java | 1 + .../implementation/DatabaseAccount.java | 17 ++ .../DocumentServiceRequestContext.java | 13 +- .../implementation/GlobalEndpointManager.java | 4 + .../cosmos/implementation/HttpConstants.java | 1 + .../implementation/RxDocumentClientImpl.java | 1 + .../directconnectivity/BarrierType.java | 10 ++ .../directconnectivity/ConsistencyWriter.java | 32 +++- .../directconnectivity/StoreResponse.java | 13 ++ .../directconnectivity/WFConstants.java | 1 + 11 files changed, 230 insertions(+), 9 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierType.java diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java index 9ede1c281219..020f257ea36d 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java @@ -369,6 +369,152 @@ public void isGlobalStrongRequest(ConsistencyLevel defaultConsistencyLevel, RxDo assertThat(consistencyWriter.isGlobalStrongRequest(req, storeResponse)).isEqualTo(isGlobalStrongExpected); } + @Test + public void writeAsyncGlobalStrongRequest() { + initializeConsistencyWriter(false); + RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext); + TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class); + Mockito.doReturn(false).when(timeoutHelper).isElapsed(); + StoreResponse storeResponse = Mockito.mock(StoreResponse.class); + Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS); + Mockito.doReturn(ConsistencyLevel.STRONG).when(serviceConfigReader).getDefaultConsistencyLevel(); + ConsistencyWriter spyWriter = Mockito.spy(this.consistencyWriter); + Mockito.doReturn(true).when(spyWriter).isGlobalStrongRequest(Mockito.any(), Mockito.any()); + Mockito.doReturn(Mono.just(storeResponse)).when(spyWriter).barrierForGlobalStrong(Mockito.any(), Mockito.any(), Mockito.any()); + // Mock addressSelector.resolveAddressesAsync to return a valid Mono + AddressInformation addressInformation = Mockito.mock(AddressInformation.class); + Uri primaryUri = Mockito.mock(Uri.class); + Mockito.doReturn(true).when(primaryUri).isPrimary(); + Mockito.doReturn("Healthy").when(primaryUri).getHealthStatusDiagnosticString(); + Mockito.doReturn(primaryUri).when(addressInformation).getPhysicalUri(); + List addressList = Collections.singletonList(addressInformation); + Mockito.doReturn(Mono.just(addressList)).when(addressSelector).resolveAddressesAsync(Mockito.any(), Mockito.anyBoolean()); + // Mock transportClient.invokeResourceOperationAsync to return a valid Mono + Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class)); + Mono result = spyWriter.writeAsync(request, timeoutHelper, false); + TestSubscriber subscriber = new TestSubscriber<>(); + result.subscribe(subscriber); + subscriber.awaitTerminalEvent(); + subscriber.assertNoErrors(); + subscriber.assertValue(storeResponse); + } + + @Test + public void writeAsyncNRegionCommitRequest() { + initializeConsistencyWriter(false); + RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext); + TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class); + Mockito.doReturn(false).when(timeoutHelper).isElapsed(); + StoreResponse storeResponse = Mockito.mock(StoreResponse.class); + Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_NREGION_COMMITTED_LSN); + Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS); + Mockito.doReturn(ConsistencyLevel.STRONG).when(serviceConfigReader).getDefaultConsistencyLevel(); + ConsistencyWriter spyWriter = Mockito.spy(this.consistencyWriter); + Mockito.doReturn(false).when(spyWriter).isGlobalStrongRequest(Mockito.any(), Mockito.any()); + Mockito.doReturn(true).when(spyWriter).isBarrierRequest(Mockito.any(), Mockito.any()); + Mockito.doReturn(Mono.just(storeResponse)).when(spyWriter).barrierForGlobalStrong(Mockito.any(), Mockito.any(), Mockito.any()); + // Mock addressSelector.resolveAddressesAsync to return a valid Mono + AddressInformation addressInformation = Mockito.mock(AddressInformation.class); + Uri primaryUri = Mockito.mock(Uri.class); + Mockito.doReturn(true).when(primaryUri).isPrimary(); + Mockito.doReturn("Healthy").when(primaryUri).getHealthStatusDiagnosticString(); + Mockito.doReturn(primaryUri).when(addressInformation).getPhysicalUri(); + List addressList = Collections.singletonList(addressInformation); + Mockito.doReturn(Mono.just(addressList)).when(addressSelector).resolveAddressesAsync(Mockito.any(), Mockito.anyBoolean()); + // Mock transportClient.invokeResourceOperationAsync to return a valid Mono + Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class)); + Mono result = spyWriter.writeAsync(request, timeoutHelper, false); + TestSubscriber subscriber = new TestSubscriber<>(); + result.subscribe(subscriber); + subscriber.awaitTerminalEvent(); + subscriber.assertNoErrors(); + subscriber.assertValue(storeResponse); + } + + @Test + public void writeAsyncNoBarrierRequest() { + initializeConsistencyWriter(false); + RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext); + TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class); + Mockito.doReturn(false).when(timeoutHelper).isElapsed(); + StoreResponse storeResponse = Mockito.mock(StoreResponse.class); + Mockito.doReturn("0").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS); + Mockito.doReturn(ConsistencyLevel.SESSION).when(serviceConfigReader).getDefaultConsistencyLevel(); + ConsistencyWriter spyWriter = Mockito.spy(this.consistencyWriter); + Mockito.doReturn(false).when(spyWriter).isGlobalStrongRequest(Mockito.any(), Mockito.any()); + Mockito.doReturn(false).when(spyWriter).isBarrierRequest(Mockito.any(), Mockito.any()); + Mockito.doReturn(Mono.just(storeResponse)).when(spyWriter).barrierForGlobalStrong(Mockito.any(), Mockito.any(), Mockito.any()); + // Mock addressSelector.resolveAddressesAsync to return a valid Mono + AddressInformation addressInformation = Mockito.mock(AddressInformation.class); + Uri primaryUri = Mockito.mock(Uri.class); + Mockito.doReturn(true).when(primaryUri).isPrimary(); + Mockito.doReturn("Healthy").when(primaryUri).getHealthStatusDiagnosticString(); + Mockito.doReturn(primaryUri).when(addressInformation).getPhysicalUri(); + List addressList = Collections.singletonList(addressInformation); + Mockito.doReturn(Mono.just(addressList)).when(addressSelector).resolveAddressesAsync(Mockito.any(), Mockito.anyBoolean()); + // Mock transportClient.invokeResourceOperationAsync to return a valid Mono + Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class)); + Mono result = spyWriter.writeAsync(request, timeoutHelper, false); + TestSubscriber subscriber = new TestSubscriber<>(); + result.subscribe(subscriber); + subscriber.awaitTerminalEvent(); + subscriber.assertNoErrors(); + subscriber.assertValue(storeResponse); + } + + @Test + public void isBarrierRequest() { + // Setup ConsistencyWriter with useMultipleWriteLocations false + initializeConsistencyWriter(false); + ConsistencyWriter writer = this.consistencyWriter; + RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext); + StoreResponse response = Mockito.mock(StoreResponse.class); + + // 1. Global strong enabled and isGlobalStrongRequest returns true + try (MockedStatic replicatedResourceClientMock = Mockito.mockStatic(ReplicatedResourceClient.class)) { + replicatedResourceClientMock.when(ReplicatedResourceClient::isGlobalStrongEnabled).thenReturn(true); + ConsistencyWriter spyWriter = Mockito.spy(writer); + Mockito.doReturn(true).when(spyWriter).isGlobalStrongRequest(request, response); + boolean result = spyWriter.isBarrierRequest(request, response); + assertThat(result).isTrue(); + } + + // 2. NRegionSynchronousCommitEnabled path + // Setup request.requestContext.getNRegionSynchronousCommitEnabled() to true + request.requestContext.setNRegionSynchronousCommitEnabled(true); + // useMultipleWriteLocations is already false + Mockito.doReturn("123").when(response).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_NREGION_COMMITTED_LSN); + Mockito.doReturn(2L).when(response).getNumberOfReadRegions(); + boolean nRegionResult = writer.isBarrierRequest(request, response); + assertThat(nRegionResult).isTrue(); + + // 3. Negative case: NRegionSynchronousCommitEnabled false + request.requestContext.setNRegionSynchronousCommitEnabled(false); + boolean negativeResult = writer.isBarrierRequest(request, response); + assertThat(negativeResult).isFalse(); + + // 4. Negative case: useMultipleWriteLocations true + initializeConsistencyWriter(true); + writer = this.consistencyWriter; + request.requestContext.setNRegionSynchronousCommitEnabled(true); + boolean negativeResult2 = writer.isBarrierRequest(request, response); + assertThat(negativeResult2).isFalse(); + + // 5. Negative case: GLOBAL_NREGION_COMMITTED_LSN header missing + initializeConsistencyWriter(false); + writer = this.consistencyWriter; + request.requestContext.setNRegionSynchronousCommitEnabled(true); + Mockito.doReturn(null).when(response).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_NREGION_COMMITTED_LSN); + boolean negativeResult3 = writer.isBarrierRequest(request, response); + assertThat(negativeResult3).isFalse(); + + // 6. Negative case: NUMBER_OF_READ_REGIONS header missing or zero + Mockito.doReturn(0L).when(response).getNumberOfReadRegions(); + boolean negativeResult4 = writer.isBarrierRequest(request, response); + assertThat(negativeResult4).isFalse(); + } + + private void initializeConsistencyWriter(boolean useMultipleWriteLocation) { addressSelector = Mockito.mock(AddressSelector.class); sessionContainer = Mockito.mock(ISessionContainer.class); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java index 3ea927bf9b72..555020ae8b09 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java @@ -205,6 +205,7 @@ public static final class Properties { public static final String THINCLIENT_READABLE_LOCATIONS = "thinClientReadableLocations"; public static final String DATABASE_ACCOUNT_ENDPOINT = "databaseAccountEndpoint"; public static final String ENABLE_PER_PARTITION_FAILOVER_BEHAVIOR = "enablePerPartitionFailoverBehavior"; + public static final String ENABLE_N_REGION_SYNCHRONOUS_COMMIT = "enableNRegionSynchronousCommit"; //Authorization public static final String MASTER_TOKEN = "master"; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DatabaseAccount.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DatabaseAccount.java index 52c2f66ac7df..8338b3eb40b5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DatabaseAccount.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DatabaseAccount.java @@ -300,6 +300,23 @@ public Boolean isPerPartitionFailoverBehaviorEnabled() { return null; } + /** + * Returns true if the account supports N region synchronous commit, + * false if enableNRegionSynchronousCommit evaluates to null or false. + *

+ * If enableNRegionSynchronousCommit property does not exist in account metadata JSON payload, null is returned. + * + * @return true if the account supports N region synchronous commit, false otherwise. + */ + public Boolean isNRegionSynchronousCommitEnabled() { + + if (super.has(Constants.Properties.ENABLE_N_REGION_SYNCHRONOUS_COMMIT)) { + return ObjectUtils.defaultIfNull(super.getBoolean(Constants.Properties.ENABLE_N_REGION_SYNCHRONOUS_COMMIT), false); + } + + return null; + } + public void setIsPerPartitionFailoverBehaviorEnabled(boolean value) { this.set(Constants.Properties.ENABLE_PER_PARTITION_FAILOVER_BEHAVIOR, value); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java index 18da5250c458..1df98daf7008 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java @@ -38,7 +38,7 @@ public class DocumentServiceRequestContext implements Cloneable { public volatile ISessionToken sessionToken; public volatile long quorumSelectedLSN; public volatile long globalCommittedSelectedLSN; - public volatile StoreResponse globalStrongWriteResponse; + public volatile StoreResponse cachedWriteResponse; public volatile ConsistencyLevel originalRequestConsistencyLevel; public volatile ReadConsistencyStrategy readConsistencyStrategy; public volatile PartitionKeyRange resolvedPartitionKeyRange; @@ -65,6 +65,7 @@ public class DocumentServiceRequestContext implements Cloneable { private volatile long approximateBloomFilterInsertionCount; private final Set sessionTokenEvaluationResults = ConcurrentHashMap.newKeySet(); private volatile List unavailableRegionsForPartition; + private volatile Boolean nRegionSynchronousCommitEnabled; // For cancelled rntbd requests, track the response as OperationCancelledException which later will be used to populate the cosmosDiagnostics public final Map rntbdCancelledRequestMap = new ConcurrentHashMap<>(); @@ -148,7 +149,7 @@ public DocumentServiceRequestContext clone() { context.sessionToken = this.sessionToken; context.quorumSelectedLSN = this.quorumSelectedLSN; context.globalCommittedSelectedLSN = this.globalCommittedSelectedLSN; - context.globalStrongWriteResponse = this.globalStrongWriteResponse; + context.cachedWriteResponse = this.cachedWriteResponse; context.originalRequestConsistencyLevel = this.originalRequestConsistencyLevel; context.readConsistencyStrategy = this.readConsistencyStrategy; context.resolvedPartitionKeyRange = this.resolvedPartitionKeyRange; @@ -266,5 +267,13 @@ public void setPerPartitionAutomaticFailoverInfoHolder(PartitionLevelFailoverInf this.perPartitionFailoverInfoHolder.setPartitionLevelFailoverInfo(partitionLevelFailoverInfo); } } + + public Boolean getNRegionSynchronousCommitEnabled() { + return nRegionSynchronousCommitEnabled; + } + + public void setNRegionSynchronousCommitEnabled(Boolean nRegionSynchronousCommitEnabled) { + this.nRegionSynchronousCommitEnabled = nRegionSynchronousCommitEnabled; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index ec0ceb536615..07a3bbafeb4c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -437,4 +437,8 @@ private List getEffectivePreferredRegions() { public void setPerPartitionAutomaticFailoverConfigModifier(Consumer perPartitionAutomaticFailoverConfigModifier) { this.perPartitionAutomaticFailoverConfigModifier = perPartitionAutomaticFailoverConfigModifier; } + + public Boolean getNRegionSynchronousCommitEnabled() { + return this.latestDatabaseAccount.isNRegionSynchronousCommitEnabled(); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java index 174b155485f8..001f3e17f711 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java @@ -481,6 +481,7 @@ public static class SubStatusCodes { public static final int NO_VALID_STORE_RESPONSE = 21009; public static final int SERVER_GENERATED_408 = 21010; public static final int FAILED_TO_PARSE_SERVER_RESPONSE = 21011; + public static final int GLOBAL_NREGION_COMMIT_WRITE_BARRIER_NOT_MET = 21012; } public static class HeaderValues { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 9b035b28dff6..4f71a2742c30 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -2683,6 +2683,7 @@ private Mono> createDocumentInternal( options.setPartitionKeyDefinition(documentCollectionValueHolder.v.getPartitionKey()); request.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest); + request.requestContext.setNRegionSynchronousCommitEnabled(this.globalEndpointManager.getNRegionSynchronousCommitEnabled()); PartitionKeyRange preResolvedPartitionKeyRangeIfAny = setPartitionKeyRangeForPointOperationRequestForPerPartitionAutomaticFailover( request, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierType.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierType.java new file mode 100644 index 000000000000..e87b2328d855 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierType.java @@ -0,0 +1,10 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.directconnectivity; + +public enum BarrierType { + None, + GlobalStrongWrite, + NRegionSynchronousCommit +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java index 86b3e6cd0346..1a1e04971cf8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java @@ -27,6 +27,7 @@ import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.collections.ComparatorUtils; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Exceptions; @@ -174,7 +175,7 @@ Mono writePrivateAsync( request.requestContext.forceRefreshAddressCache = forceRefresh; - if (request.requestContext.globalStrongWriteResponse == null) { + if (request.requestContext.cachedWriteResponse == null) { Mono> replicaAddressesObs = this.addressSelector.resolveAddressesAsync(request, forceRefresh); AtomicReference primaryURI = new AtomicReference<>(); @@ -315,7 +316,7 @@ Mono writePrivateAsync( } return Mono.just(request); - })).map(req -> req.requestContext.globalStrongWriteResponse); + })).map(req -> req.requestContext.cachedWriteResponse); } } @@ -339,7 +340,7 @@ Mono barrierForGlobalStrong( AtomicReference cosmosExceptionValueHolder) { try { - if (ReplicatedResourceClient.isGlobalStrongEnabled() && this.isGlobalStrongRequest(request, response)) { + if (isBarrierRequest(request, response)) { Utils.ValueHolder lsn = Utils.ValueHolder.initialize(-1L); Utils.ValueHolder globalCommittedLsn = Utils.ValueHolder.initialize(-1L); @@ -351,7 +352,7 @@ Mono barrierForGlobalStrong( throw new GoneException(RMResources.Gone, HttpConstants.SubStatusCodes.SERVER_GENERATED_410); } - request.requestContext.globalStrongWriteResponse = response; + request.requestContext.cachedWriteResponse = response; request.requestContext.globalCommittedSelectedLSN = lsn.v; //if necessary we would have already refreshed cache by now. @@ -384,13 +385,13 @@ Mono barrierForGlobalStrong( HttpConstants.SubStatusCodes.GLOBAL_STRONG_WRITE_BARRIER_NOT_MET)); } - return Mono.just(request.requestContext.globalStrongWriteResponse); + return Mono.just(request.requestContext.cachedWriteResponse); }); }); } else { - return Mono.just(request.requestContext.globalStrongWriteResponse); + return Mono.just(request.requestContext.cachedWriteResponse); } } else { return Mono.just(response); @@ -402,7 +403,19 @@ Mono barrierForGlobalStrong( } } - private Mono waitForWriteBarrierAsync( + boolean isBarrierRequest(RxDocumentServiceRequest request, StoreResponse response) { + if (ReplicatedResourceClient.isGlobalStrongEnabled() && this.isGlobalStrongRequest(request, response)) { + return true; + } + return request.requestContext.getNRegionSynchronousCommitEnabled() && + !this.useMultipleWriteLocations && + StringUtils.isNotEmpty(response.getHeaderValue(WFConstants.BackendHeaders.GLOBAL_NREGION_COMMITTED_LSN)) && + Long.parseLong(response.getHeaderValue(WFConstants.BackendHeaders.GLOBAL_NREGION_COMMITTED_LSN)) != -1 && + response.getNumberOfReadRegions() > 0; + } + + // visibility set to package-private for testing + Mono waitForWriteBarrierAsync( RxDocumentServiceRequest barrierRequest, long selectedGlobalCommittedLsn, AtomicReference cosmosExceptionValueHolder) { @@ -501,6 +514,11 @@ static void getLsnAndGlobalCommittedLsn(StoreResponse response, Utils.ValueHolde lsn.v = Long.parseLong(headerValue); } + // if NRegionCommittedLSN is present, use it as GlobalCommittedLSN + if ((headerValue = response.getHeaderValue(WFConstants.BackendHeaders.GLOBAL_NREGION_COMMITTED_LSN)) != null) { + globalCommittedLsn.v = Long.parseLong(headerValue); + return; + } if ((headerValue = response.getHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN)) != null) { globalCommittedLsn.v = Long.parseLong(headerValue); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java index a3da4b9c487e..fb906a9767c2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java @@ -275,6 +275,19 @@ int getSubStatusCode() { return subStatusCode; } + public long getNumberOfReadRegions() { + int numberOfReadRegions = -1; + String numberOfReadRegionsString = this.getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS); + if (StringUtils.isNotEmpty(numberOfReadRegionsString)) { + try { + return Long.parseLong(numberOfReadRegionsString); + } catch (NumberFormatException e) { + // If value cannot be parsed as Long, return -1. + } + } + return numberOfReadRegions; + } + public Map> getReplicaStatusList() { return this.replicaStatusList; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/WFConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/WFConstants.java index d71dedc2c400..9fea3a09b598 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/WFConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/WFConstants.java @@ -77,5 +77,6 @@ public static class BackendHeaders { public static final String BACKEND_REQUEST_DURATION_MILLISECONDS = "x-ms-request-duration-ms"; public static final String INDEX_UTILIZATION = "x-ms-cosmos-index-utilization"; public static final String QUERY_EXECUTION_INFO = "x-ms-cosmos-query-execution-info"; + public static final String GLOBAL_NREGION_COMMITTED_LSN = "x-ms-cosmos-global-nregion-committed-glsn"; } } From 1351016b0b55ce1883333d8412c8a26a73b75fc7 Mon Sep 17 00:00:00 2001 From: Bhaskar Mallapragada Date: Fri, 16 Jan 2026 16:58:25 -0800 Subject: [PATCH 2/8] Adding nregion header serialization in RNTBD Adding unit tests Refactoring and cleanup --- .../ConsistencyWriterTest.java | 232 ++++++++++++------ .../DocumentServiceRequestContext.java | 14 +- .../cosmos/implementation/HttpConstants.java | 2 +- .../cosmos/implementation/RMResources.java | 1 + .../directconnectivity/BarrierType.java | 6 +- .../directconnectivity/ConsistencyWriter.java | 102 ++++++-- .../directconnectivity/WFConstants.java | 2 +- .../rntbd/RntbdConstants.java | 3 +- .../rntbd/RntbdResponseHeaders.java | 8 + 9 files changed, 265 insertions(+), 105 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java index 020f257ea36d..ee87e1ea622b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java @@ -5,21 +5,8 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConsistencyLevel; -import com.azure.cosmos.implementation.ClientSideRequestStatistics; -import com.azure.cosmos.implementation.DiagnosticsClientContext; -import com.azure.cosmos.implementation.FailureValidator; -import com.azure.cosmos.implementation.GoneException; -import com.azure.cosmos.implementation.IAuthorizationTokenProvider; -import com.azure.cosmos.implementation.ISessionContainer; -import com.azure.cosmos.implementation.PartitionIsMigratingException; -import com.azure.cosmos.implementation.PartitionKeyRangeGoneException; -import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException; -import com.azure.cosmos.implementation.RequestTimeoutException; -import com.azure.cosmos.implementation.RetryContext; -import com.azure.cosmos.implementation.RxDocumentServiceRequest; -import com.azure.cosmos.implementation.SessionTokenHelper; -import com.azure.cosmos.implementation.StoreResponseBuilder; -import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.*; +import com.azure.cosmos.implementation.HttpConstants.SubStatusCodes; import com.azure.cosmos.implementation.guava25.collect.ImmutableList; import io.reactivex.subscribers.TestSubscriber; import org.mockito.MockedStatic; @@ -31,10 +18,12 @@ import reactor.core.publisher.Mono; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; @@ -369,66 +358,24 @@ public void isGlobalStrongRequest(ConsistencyLevel defaultConsistencyLevel, RxDo assertThat(consistencyWriter.isGlobalStrongRequest(req, storeResponse)).isEqualTo(isGlobalStrongExpected); } - @Test + @Test(groups = "unit") public void writeAsyncGlobalStrongRequest() { - initializeConsistencyWriter(false); - RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext); - TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class); - Mockito.doReturn(false).when(timeoutHelper).isElapsed(); - StoreResponse storeResponse = Mockito.mock(StoreResponse.class); - Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS); - Mockito.doReturn(ConsistencyLevel.STRONG).when(serviceConfigReader).getDefaultConsistencyLevel(); - ConsistencyWriter spyWriter = Mockito.spy(this.consistencyWriter); - Mockito.doReturn(true).when(spyWriter).isGlobalStrongRequest(Mockito.any(), Mockito.any()); - Mockito.doReturn(Mono.just(storeResponse)).when(spyWriter).barrierForGlobalStrong(Mockito.any(), Mockito.any(), Mockito.any()); - // Mock addressSelector.resolveAddressesAsync to return a valid Mono - AddressInformation addressInformation = Mockito.mock(AddressInformation.class); - Uri primaryUri = Mockito.mock(Uri.class); - Mockito.doReturn(true).when(primaryUri).isPrimary(); - Mockito.doReturn("Healthy").when(primaryUri).getHealthStatusDiagnosticString(); - Mockito.doReturn(primaryUri).when(addressInformation).getPhysicalUri(); - List addressList = Collections.singletonList(addressInformation); - Mockito.doReturn(Mono.just(addressList)).when(addressSelector).resolveAddressesAsync(Mockito.any(), Mockito.anyBoolean()); - // Mock transportClient.invokeResourceOperationAsync to return a valid Mono - Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class)); - Mono result = spyWriter.writeAsync(request, timeoutHelper, false); - TestSubscriber subscriber = new TestSubscriber<>(); - result.subscribe(subscriber); - subscriber.awaitTerminalEvent(); - subscriber.assertNoErrors(); - subscriber.assertValue(storeResponse); + runWriteAsyncBarrierableRequestTest(true, true); } - @Test + @Test(groups = "unit") + public void writeAsyncGlobalStrongRequestFailed() { + runWriteAsyncBarrierableRequestTest(true, false); + } + + @Test(groups = "unit") public void writeAsyncNRegionCommitRequest() { - initializeConsistencyWriter(false); - RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext); - TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class); - Mockito.doReturn(false).when(timeoutHelper).isElapsed(); - StoreResponse storeResponse = Mockito.mock(StoreResponse.class); - Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_NREGION_COMMITTED_LSN); - Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS); - Mockito.doReturn(ConsistencyLevel.STRONG).when(serviceConfigReader).getDefaultConsistencyLevel(); - ConsistencyWriter spyWriter = Mockito.spy(this.consistencyWriter); - Mockito.doReturn(false).when(spyWriter).isGlobalStrongRequest(Mockito.any(), Mockito.any()); - Mockito.doReturn(true).when(spyWriter).isBarrierRequest(Mockito.any(), Mockito.any()); - Mockito.doReturn(Mono.just(storeResponse)).when(spyWriter).barrierForGlobalStrong(Mockito.any(), Mockito.any(), Mockito.any()); - // Mock addressSelector.resolveAddressesAsync to return a valid Mono - AddressInformation addressInformation = Mockito.mock(AddressInformation.class); - Uri primaryUri = Mockito.mock(Uri.class); - Mockito.doReturn(true).when(primaryUri).isPrimary(); - Mockito.doReturn("Healthy").when(primaryUri).getHealthStatusDiagnosticString(); - Mockito.doReturn(primaryUri).when(addressInformation).getPhysicalUri(); - List addressList = Collections.singletonList(addressInformation); - Mockito.doReturn(Mono.just(addressList)).when(addressSelector).resolveAddressesAsync(Mockito.any(), Mockito.anyBoolean()); - // Mock transportClient.invokeResourceOperationAsync to return a valid Mono - Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class)); - Mono result = spyWriter.writeAsync(request, timeoutHelper, false); - TestSubscriber subscriber = new TestSubscriber<>(); - result.subscribe(subscriber); - subscriber.awaitTerminalEvent(); - subscriber.assertNoErrors(); - subscriber.assertValue(storeResponse); + runWriteAsyncBarrierableRequestTest(false, true); + } + + @Test(groups = "unit") + public void writeAsyncNRegionCommitRequestFailed() { + runWriteAsyncBarrierableRequestTest(false, false); } @Test @@ -441,10 +388,7 @@ public void writeAsyncNoBarrierRequest() { Mockito.doReturn("0").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS); Mockito.doReturn(ConsistencyLevel.SESSION).when(serviceConfigReader).getDefaultConsistencyLevel(); ConsistencyWriter spyWriter = Mockito.spy(this.consistencyWriter); - Mockito.doReturn(false).when(spyWriter).isGlobalStrongRequest(Mockito.any(), Mockito.any()); - Mockito.doReturn(false).when(spyWriter).isBarrierRequest(Mockito.any(), Mockito.any()); Mockito.doReturn(Mono.just(storeResponse)).when(spyWriter).barrierForGlobalStrong(Mockito.any(), Mockito.any(), Mockito.any()); - // Mock addressSelector.resolveAddressesAsync to return a valid Mono AddressInformation addressInformation = Mockito.mock(AddressInformation.class); Uri primaryUri = Mockito.mock(Uri.class); Mockito.doReturn(true).when(primaryUri).isPrimary(); @@ -452,7 +396,6 @@ public void writeAsyncNoBarrierRequest() { Mockito.doReturn(primaryUri).when(addressInformation).getPhysicalUri(); List addressList = Collections.singletonList(addressInformation); Mockito.doReturn(Mono.just(addressList)).when(addressSelector).resolveAddressesAsync(Mockito.any(), Mockito.anyBoolean()); - // Mock transportClient.invokeResourceOperationAsync to return a valid Mono Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class)); Mono result = spyWriter.writeAsync(request, timeoutHelper, false); TestSubscriber subscriber = new TestSubscriber<>(); @@ -483,7 +426,7 @@ public void isBarrierRequest() { // Setup request.requestContext.getNRegionSynchronousCommitEnabled() to true request.requestContext.setNRegionSynchronousCommitEnabled(true); // useMultipleWriteLocations is already false - Mockito.doReturn("123").when(response).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_NREGION_COMMITTED_LSN); + Mockito.doReturn("123").when(response).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN); Mockito.doReturn(2L).when(response).getNumberOfReadRegions(); boolean nRegionResult = writer.isBarrierRequest(request, response); assertThat(nRegionResult).isTrue(); @@ -504,7 +447,7 @@ public void isBarrierRequest() { initializeConsistencyWriter(false); writer = this.consistencyWriter; request.requestContext.setNRegionSynchronousCommitEnabled(true); - Mockito.doReturn(null).when(response).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_NREGION_COMMITTED_LSN); + Mockito.doReturn(null).when(response).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN); boolean negativeResult3 = writer.isBarrierRequest(request, response); assertThat(negativeResult3).isFalse(); @@ -514,6 +457,123 @@ public void isBarrierRequest() { assertThat(negativeResult4).isFalse(); } + private void runWriteAsyncBarrierableRequestTest(boolean globalStrong, boolean barrierMet) { + RxDocumentServiceRequest request = setupRequest(!globalStrong); + TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class); + Mockito.doReturn(false).when(timeoutHelper).isElapsed(); + StoreResponse storeResponse = setupStoreResponse(!globalStrong); + List addressList = setupAddressList(); + List storeResults = new ArrayList<>(); + if (barrierMet) { + storeResults.add(getStoreResult(storeResponse, 1L)); + storeResults.add(getStoreResult(storeResponse, 2L)); + } else { + storeResults.add(getStoreResult(storeResponse, 1L)); + } + StoreReader storeReader = setupStoreReader(storeResults); + initializeConsistencyWriterWithStoreReader(false, storeReader); + ConsistencyWriter spyWriter = Mockito.spy(this.consistencyWriter); + Mockito.doReturn(globalStrong ? ConsistencyLevel.STRONG : ConsistencyLevel.SESSION) + .when(serviceConfigReader).getDefaultConsistencyLevel(); + Mockito.doReturn(Mono.just(addressList)).when(addressSelector).resolveAddressesAsync(Mockito.any(), Mockito.anyBoolean()); + Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class)); + Mono result = spyWriter.writeAsync(request, timeoutHelper, false); + TestSubscriber subscriber = new TestSubscriber<>(); + result.subscribe(subscriber); + subscriber.awaitTerminalEvent(); + if (!barrierMet) { + subscriber.assertError(GoneException.class); + FailureValidator failureValidator = FailureValidator.builder() + .instanceOf(GoneException.class) + .statusCode(GONE) + .subStatusCode(globalStrong? SubStatusCodes.GLOBAL_STRONG_WRITE_BARRIER_NOT_MET : SubStatusCodes.GLOBAL_N_REGION_COMMIT_WRITE_BARRIER_NOT_MET) + .build(); + assertThat(subscriber.errorCount()).isEqualTo(1); + failureValidator.validate(subscriber.errors().getFirst()); + } else { + subscriber.assertNoErrors(); + subscriber.assertValue(storeResponse); + } + } + + private RxDocumentServiceRequest setupRequest(boolean nRegionCommit) { + RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext); + if (nRegionCommit) { + request.requestContext.setNRegionSynchronousCommitEnabled(true); + } + Mockito.doReturn(ResourceType.Document).when(request).getResourceType(); + Mockito.doReturn(OperationType.Create).when(request).getOperationType(); + Mockito.doReturn("1-MxAPlgMgA=").when(request).getResourceId(); + request.authorizationTokenType = AuthorizationTokenType.PrimaryMasterKey; + return request; + } + + private StoreResponse setupStoreResponse(boolean nRegionCommit) { + StoreResponse storeResponse = Mockito.mock(StoreResponse.class); + Mockito.doReturn(1L).when(storeResponse).getNumberOfReadRegions(); + Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS); + if (nRegionCommit) { + Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN); + } else { + Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN); + } + Mockito.doReturn("2").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.LSN); + return storeResponse; + } + + private List setupAddressList() { + AddressInformation addressInformation = Mockito.mock(AddressInformation.class); + Uri primaryUri = Mockito.mock(Uri.class); + Mockito.doReturn(true).when(primaryUri).isPrimary(); + Mockito.doReturn("Healthy").when(primaryUri).getHealthStatusDiagnosticString(); + Mockito.doReturn(primaryUri).when(addressInformation).getPhysicalUri(); + return Collections.singletonList(addressInformation); + } + + private StoreReader setupStoreReader(List storeResults) { + StoreReader storeReader = Mockito.mock(StoreReader.class); + Mono>[] monos = storeResults.stream() + .map(Collections::singletonList) + .map(Mono::just) + .toArray(Mono[]::new); + Mockito.when(storeReader.readMultipleReplicaAsync( + Mockito.any(), + Mockito.anyBoolean(), + Mockito.anyInt(), + Mockito.anyBoolean(), + Mockito.anyBoolean(), + Mockito.any(), + Mockito.anyBoolean(), + Mockito.anyBoolean())) + .thenReturn(monos.length > 0 ? monos[0] : Mono.empty(), + Arrays.copyOfRange(monos, 1, monos.length)); + return storeReader; + } + + private StoreResult getStoreResult(StoreResponse storeResponse, long globalCommittedLsn) { + return new StoreResult( + storeResponse, + null, + "1", + 1, + 1, + 1.0, + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + 4, + 2, + true, + null, + globalCommittedLsn, + 1, + 1, + null, + 0.3, + 90.0); + } + + + private void initializeConsistencyWriter(boolean useMultipleWriteLocation) { addressSelector = Mockito.mock(AddressSelector.class); @@ -532,6 +592,24 @@ private void initializeConsistencyWriter(boolean useMultipleWriteLocation) { null); } + private void initializeConsistencyWriterWithStoreReader(boolean useMultipleWriteLocation, StoreReader reader) { + addressSelector = Mockito.mock(AddressSelector.class); + sessionContainer = Mockito.mock(ISessionContainer.class); + transportClient = Mockito.mock(TransportClient.class); + IAuthorizationTokenProvider authorizationTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class); + serviceConfigReader = Mockito.mock(GatewayServiceConfigurationReader.class); + + consistencyWriter = new ConsistencyWriter(clientContext, + addressSelector, + sessionContainer, + transportClient, + authorizationTokenProvider, + serviceConfigReader, + useMultipleWriteLocation, + reader, + null); + } + public static void validateError(Mono single, FailureValidator validator) { TestSubscriber testSubscriber = new TestSubscriber<>(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java index 1df98daf7008..269b7b9f169d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java @@ -8,14 +8,11 @@ import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; import com.azure.cosmos.CosmosException; import com.azure.cosmos.ReadConsistencyStrategy; +import com.azure.cosmos.implementation.directconnectivity.*; import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PartitionLevelFailoverInfo; import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionFailoverInfoHolder; import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder; import com.azure.cosmos.implementation.perPartitionCircuitBreaker.LocationSpecificHealthContext; -import com.azure.cosmos.implementation.directconnectivity.StoreResponse; -import com.azure.cosmos.implementation.directconnectivity.StoreResult; -import com.azure.cosmos.implementation.directconnectivity.TimeoutHelper; -import com.azure.cosmos.implementation.directconnectivity.Uri; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.RegionalRoutingContext; import com.azure.cosmos.implementation.throughputControl.ThroughputControlRequestContext; @@ -66,6 +63,7 @@ public class DocumentServiceRequestContext implements Cloneable { private final Set sessionTokenEvaluationResults = ConcurrentHashMap.newKeySet(); private volatile List unavailableRegionsForPartition; private volatile Boolean nRegionSynchronousCommitEnabled; + private volatile BarrierType barrierType = BarrierType.NONE; // For cancelled rntbd requests, track the response as OperationCancelledException which later will be used to populate the cosmosDiagnostics public final Map rntbdCancelledRequestMap = new ConcurrentHashMap<>(); @@ -275,5 +273,13 @@ public Boolean getNRegionSynchronousCommitEnabled() { public void setNRegionSynchronousCommitEnabled(Boolean nRegionSynchronousCommitEnabled) { this.nRegionSynchronousCommitEnabled = nRegionSynchronousCommitEnabled; } + + public BarrierType getBarrierType() { + return barrierType; + } + + public void setBarrierType(BarrierType barrierType) { + this.barrierType = barrierType; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java index 001f3e17f711..5ae5bb50067a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java @@ -481,7 +481,7 @@ public static class SubStatusCodes { public static final int NO_VALID_STORE_RESPONSE = 21009; public static final int SERVER_GENERATED_408 = 21010; public static final int FAILED_TO_PARSE_SERVER_RESPONSE = 21011; - public static final int GLOBAL_NREGION_COMMIT_WRITE_BARRIER_NOT_MET = 21012; + public static final int GLOBAL_N_REGION_COMMIT_WRITE_BARRIER_NOT_MET = 21012; } public static class HeaderValues { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RMResources.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RMResources.java index 731accfed3a0..3748f8b4f14f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RMResources.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RMResources.java @@ -43,6 +43,7 @@ public class RMResources { public static final String ReadConsistencyStrategyGlobalStrongOnlyAllowedForGlobalStrongAccount = "Value '%s' specified for the header '%s' is invalid. This value requires the account-level default consistency level to be '%s' - but it is '%s'."; public static final String RequestTimeout = "Request timed out. More info: https://aka.ms/cosmosdb-tsg-request-timeout-java"; public static final String GlobalStrongWriteBarrierNotMet = "Global STRONG write barrier has not been met for the request."; + public static final String NRegionSyncCommitBarrierNotMet = "NRegion Commit write barrier has not been met for the request, since the gap between the local LSN and global n-region committed lsn is more than 1"; public static final String InvalidRequestHeaderValue = "INVALID value for request header %s: %s"; public static final String InvalidResourceAddress = "INVALID address for resource %s: %s"; public static final String ReadQuorumNotMet = "READ Quorum size of %d is not met for the request."; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierType.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierType.java index e87b2328d855..412b0bb8d298 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierType.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierType.java @@ -4,7 +4,7 @@ package com.azure.cosmos.implementation.directconnectivity; public enum BarrierType { - None, - GlobalStrongWrite, - NRegionSynchronousCommit + NONE, + GLOBAL_STRONG_WRITE, + N_REGION_SYNCHRONOUS_COMMIT } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java index 1a1e04971cf8..cb20ec94f92e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java @@ -111,6 +111,39 @@ public ConsistencyWriter( this.sessionRetryOptions = sessionRetryOptions; } + /** + * Constructor for ConsistencyWriter with StoreReader parameter for dependency injection in unit tests + * @param diagnosticsClientContext + * @param addressSelector + * @param sessionContainer + * @param transportClient + * @param authorizationTokenProvider + * @param serviceConfigReader + * @param useMultipleWriteLocations + * @param reader + * @param sessionRetryOptions + */ + public ConsistencyWriter( + DiagnosticsClientContext diagnosticsClientContext, + AddressSelector addressSelector, + ISessionContainer sessionContainer, + TransportClient transportClient, + IAuthorizationTokenProvider authorizationTokenProvider, + GatewayServiceConfigurationReader serviceConfigReader, + boolean useMultipleWriteLocations, + StoreReader reader, + SessionRetryOptions sessionRetryOptions) { + this.diagnosticsClientContext = diagnosticsClientContext; + this.transportClient = transportClient; + this.addressSelector = addressSelector; + this.sessionContainer = sessionContainer; + this.authorizationTokenProvider = authorizationTokenProvider; + this.useMultipleWriteLocations = useMultipleWriteLocations; + this.serviceConfigReader = serviceConfigReader; + this.storeReader = reader; + this.sessionRetryOptions = sessionRetryOptions; + } + public Mono writeAsync( RxDocumentServiceRequest entity, TimeoutHelper timeout, @@ -305,14 +338,13 @@ Mono writePrivateAsync( .flatMap(v -> { if (!v) { - logger.info("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {}", request.requestContext.globalCommittedSelectedLSN); + logger.info(this.getErrorMessageForBarrierRequest(request)); if (cosmosExceptionValueHolder.get() != null) { return Mono.error(cosmosExceptionValueHolder.get()); } - return Mono.error(new GoneException(RMResources.GlobalStrongWriteBarrierNotMet, - HttpConstants.SubStatusCodes.GLOBAL_STRONG_WRITE_BARRIER_NOT_MET)); + return Mono.error(getGoneExceptionForBarrierRequest(request)); } return Mono.just(request); @@ -373,16 +405,14 @@ Mono barrierForGlobalStrong( return barrierWait.flatMap(res -> { if (!res) { - logger.error("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {}", - request.requestContext.globalCommittedSelectedLSN); + logger.error(getErrorMessageForBarrierRequest(request)); if (cosmosExceptionValueHolder.get() != null) { return Mono.error(cosmosExceptionValueHolder.get()); } // RxJava1 doesn't allow throwing checked exception - return Mono.error(new GoneException(RMResources.GlobalStrongWriteBarrierNotMet, - HttpConstants.SubStatusCodes.GLOBAL_STRONG_WRITE_BARRIER_NOT_MET)); + return Mono.error(getGoneExceptionForBarrierRequest(request)); } return Mono.just(request.requestContext.cachedWriteResponse); @@ -403,15 +433,53 @@ Mono barrierForGlobalStrong( } } + private String getErrorMessageForBarrierRequest(RxDocumentServiceRequest request) { + if (request.requestContext.getBarrierType() == BarrierType.N_REGION_SYNCHRONOUS_COMMIT) { + return String.format("ConsistencyWriter: Write barrier has not been met for n region synchronous commit request. SelectedGlobalCommittedLsn: %s", + request.requestContext.globalCommittedSelectedLSN); + } + return String.format("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: %s", + request.requestContext.globalCommittedSelectedLSN); + } + + private GoneException getGoneExceptionForBarrierRequest(RxDocumentServiceRequest request){ + if (request.requestContext.getBarrierType() == BarrierType.N_REGION_SYNCHRONOUS_COMMIT) { + return new GoneException(RMResources.NRegionSyncCommitBarrierNotMet, + HttpConstants.SubStatusCodes.GLOBAL_N_REGION_COMMIT_WRITE_BARRIER_NOT_MET); + } + return new GoneException(RMResources.GlobalStrongWriteBarrierNotMet, + HttpConstants.SubStatusCodes.GLOBAL_STRONG_WRITE_BARRIER_NOT_MET); + } + boolean isBarrierRequest(RxDocumentServiceRequest request, StoreResponse response) { - if (ReplicatedResourceClient.isGlobalStrongEnabled() && this.isGlobalStrongRequest(request, response)) { - return true; + if (isGlobalStrongBarrierRequest(request, response)) { + request.requestContext.setBarrierType(BarrierType.GLOBAL_STRONG_WRITE); + return true; } - return request.requestContext.getNRegionSynchronousCommitEnabled() && - !this.useMultipleWriteLocations && - StringUtils.isNotEmpty(response.getHeaderValue(WFConstants.BackendHeaders.GLOBAL_NREGION_COMMITTED_LSN)) && - Long.parseLong(response.getHeaderValue(WFConstants.BackendHeaders.GLOBAL_NREGION_COMMITTED_LSN)) != -1 && - response.getNumberOfReadRegions() > 0; + if (isNRegionSynchronousCommitBarrierRequest(request, response)) { + request.requestContext.setBarrierType(BarrierType.N_REGION_SYNCHRONOUS_COMMIT); + return true; + } + request.requestContext.setBarrierType(BarrierType.NONE); + return false; + } + + /** + * Checks if the request is a Global Strong Write barrier request. + */ + private boolean isGlobalStrongBarrierRequest(RxDocumentServiceRequest request, StoreResponse response) { + return ReplicatedResourceClient.isGlobalStrongEnabled() && this.isGlobalStrongRequest(request, response); + } + + /** + * Checks if the request is a NRegion Synchronous Commit barrier request. + */ + private boolean isNRegionSynchronousCommitBarrierRequest(RxDocumentServiceRequest request, StoreResponse response) { + return request.requestContext.getNRegionSynchronousCommitEnabled() + && !this.useMultipleWriteLocations + && StringUtils.isNotEmpty(response.getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN)) + && Long.parseLong(response.getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN)) != -1 + && response.getNumberOfReadRegions() > 0; } // visibility set to package-private for testing @@ -514,12 +582,10 @@ static void getLsnAndGlobalCommittedLsn(StoreResponse response, Utils.ValueHolde lsn.v = Long.parseLong(headerValue); } - // if NRegionCommittedLSN is present, use it as GlobalCommittedLSN - if ((headerValue = response.getHeaderValue(WFConstants.BackendHeaders.GLOBAL_NREGION_COMMITTED_LSN)) != null) { + if ((headerValue = response.getHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN)) != null) { globalCommittedLsn.v = Long.parseLong(headerValue); - return; } - if ((headerValue = response.getHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN)) != null) { + else if ((headerValue = response.getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN)) != null) { globalCommittedLsn.v = Long.parseLong(headerValue); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/WFConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/WFConstants.java index 9fea3a09b598..cf7d2dd10e89 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/WFConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/WFConstants.java @@ -77,6 +77,6 @@ public static class BackendHeaders { public static final String BACKEND_REQUEST_DURATION_MILLISECONDS = "x-ms-request-duration-ms"; public static final String INDEX_UTILIZATION = "x-ms-cosmos-index-utilization"; public static final String QUERY_EXECUTION_INFO = "x-ms-cosmos-query-execution-info"; - public static final String GLOBAL_NREGION_COMMITTED_LSN = "x-ms-cosmos-global-nregion-committed-glsn"; + public static final String GLOBAL_N_REGION_COMMITTED_GLSN = "x-ms-cosmos-global-nregion-committed-glsn"; } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java index 0e9dfa7b86de..2cb0720c33a6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java @@ -906,7 +906,8 @@ public enum RntbdResponseHeader implements RntbdHeader { HasTentativeWrites((short) 0x003D, RntbdTokenType.Byte, false), SessionToken((short) 0x003E, RntbdTokenType.String, false), BackendRequestDurationMilliseconds((short) 0X0051, RntbdTokenType.Double, false), - CorrelatedActivityId((short) 0X0052, RntbdTokenType.Guid, false); + CorrelatedActivityId((short) 0X0052, RntbdTokenType.Guid, false), + GlobalNRegionCommittedGLSN((short) 0x0078, RntbdTokenType.LongLong, false); public static final Map map; public static final EnumSet set = EnumSet.allOf(RntbdResponseHeader.class); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponseHeaders.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponseHeaders.java index 5b769fb4ce43..002d33546530 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponseHeaders.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponseHeaders.java @@ -140,6 +140,8 @@ public class RntbdResponseHeaders extends RntbdTokenStream private final RntbdToken backendRequestDurationMilliseconds; @JsonProperty private final RntbdToken correlatedActivityId; + @JsonProperty + private final RntbdToken globalNRegionCommittedGLSN; // endregion @@ -200,6 +202,7 @@ private RntbdResponseHeaders(ByteBuf in) { this.xpRole = this.get(RntbdResponseHeader.XPRole); this.backendRequestDurationMilliseconds = this.get(RntbdResponseHeader.BackendRequestDurationMilliseconds); this.correlatedActivityId = this.get(RntbdResponseHeader.CorrelatedActivityId); + this.globalNRegionCommittedGLSN = this.get(RntbdResponseHeader.GlobalNRegionCommittedGLSN); } boolean isPayloadPresent() { @@ -304,6 +307,7 @@ public void setValues(final Map headers) { this.mapValue(this.xpRole, BackendHeaders.XP_ROLE, Integer::parseInt, headers); this.mapValue(this.backendRequestDurationMilliseconds, BackendHeaders.BACKEND_REQUEST_DURATION_MILLISECONDS, Double::parseDouble, headers); this.mapValue(this.correlatedActivityId, HttpHeaders.CORRELATED_ACTIVITY_ID, UUID::fromString, headers); + this.mapValue(this.globalNRegionCommittedGLSN, BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN, Long::parseLong, headers); } @Override @@ -503,6 +507,10 @@ private void collectEntries(final BiConsumer toUuidEntry(HttpHeaders.CORRELATED_ACTIVITY_ID, token)); + + collector.accept(this.globalNRegionCommittedGLSN, token -> + toLongEntry(BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN, token) + ); } private void mapValue(final RntbdToken token, final String name, final Function parse, final Map headers) { From c87d4adfce861c671ab7293fac8ac7044d2d1919 Mon Sep 17 00:00:00 2001 From: Bhaskar Mallapragada Date: Tue, 20 Jan 2026 15:03:33 -0800 Subject: [PATCH 3/8] Cleaning up test --- .../ConsistencyWriterTest.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java index ee87e1ea622b..4fbc9d528ebb 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java @@ -5,8 +5,25 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConsistencyLevel; -import com.azure.cosmos.implementation.*; +import com.azure.cosmos.implementation.AuthorizationTokenType; +import com.azure.cosmos.implementation.ClientSideRequestStatistics; +import com.azure.cosmos.implementation.DiagnosticsClientContext; +import com.azure.cosmos.implementation.FailureValidator; +import com.azure.cosmos.implementation.GoneException; import com.azure.cosmos.implementation.HttpConstants.SubStatusCodes; +import com.azure.cosmos.implementation.IAuthorizationTokenProvider; +import com.azure.cosmos.implementation.ISessionContainer; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.PartitionIsMigratingException; +import com.azure.cosmos.implementation.PartitionKeyRangeGoneException; +import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException; +import com.azure.cosmos.implementation.RequestTimeoutException; +import com.azure.cosmos.implementation.ResourceType; +import com.azure.cosmos.implementation.RetryContext; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.SessionTokenHelper; +import com.azure.cosmos.implementation.StoreResponseBuilder; +import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.guava25.collect.ImmutableList; import io.reactivex.subscribers.TestSubscriber; import org.mockito.MockedStatic; From 1a8e02cdbadc6558eeb75d3331e1aa874699fc1c Mon Sep 17 00:00:00 2001 From: Bhaskar Mallapragada Date: Tue, 20 Jan 2026 15:42:59 -0800 Subject: [PATCH 4/8] Test cleanup and refactoring --- .../ConsistencyWriterTest.java | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java index a0d905aa3497..a54673bb11dd 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java @@ -404,11 +404,10 @@ public void writeAsyncNoBarrierRequest() { Mockito.doReturn(Mono.just(addressList)).when(addressSelector).resolveAddressesAsync(Mockito.any(), Mockito.anyBoolean()); Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class)); Mono result = spyWriter.writeAsync(request, timeoutHelper, false); - TestSubscriber subscriber = new TestSubscriber<>(); - result.subscribe(subscriber); - subscriber.awaitTerminalEvent(); - subscriber.assertNoErrors(); - subscriber.assertValue(storeResponse); + StepVerifier.create(result) + .expectNext(storeResponse) + .expectComplete() + .verify(); } @Test @@ -484,21 +483,23 @@ private void runWriteAsyncBarrierableRequestTest(boolean globalStrong, boolean b Mockito.doReturn(Mono.just(addressList)).when(addressSelector).resolveAddressesAsync(Mockito.any(), Mockito.anyBoolean()); Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class)); Mono result = spyWriter.writeAsync(request, timeoutHelper, false); - TestSubscriber subscriber = new TestSubscriber<>(); - result.subscribe(subscriber); - subscriber.awaitTerminalEvent(); if (!barrierMet) { - subscriber.assertError(GoneException.class); - FailureValidator failureValidator = FailureValidator.builder() - .instanceOf(GoneException.class) - .statusCode(GONE) - .subStatusCode(globalStrong? SubStatusCodes.GLOBAL_STRONG_WRITE_BARRIER_NOT_MET : SubStatusCodes.GLOBAL_N_REGION_COMMIT_WRITE_BARRIER_NOT_MET) - .build(); - assertThat(subscriber.errorCount()).isEqualTo(1); - failureValidator.validate(subscriber.errors().getFirst()); + StepVerifier.create(result) + .expectErrorSatisfies(error -> { + assertThat(error).isInstanceOf(GoneException.class); + FailureValidator failureValidator = FailureValidator.builder() + .instanceOf(GoneException.class) + .statusCode(GONE) + .subStatusCode(globalStrong ? SubStatusCodes.GLOBAL_STRONG_WRITE_BARRIER_NOT_MET : SubStatusCodes.GLOBAL_N_REGION_COMMIT_WRITE_BARRIER_NOT_MET) + .build(); + failureValidator.validate(error); + }) + .verify(); } else { - subscriber.assertNoErrors(); - subscriber.assertValue(storeResponse); + StepVerifier.create(result) + .expectNext(storeResponse) + .expectComplete() + .verify(); } } From 1b171101c8ffd0ba5fb589d1d9b8e2b22cd1d816 Mon Sep 17 00:00:00 2001 From: Bhaskar Mallapragada Date: Tue, 20 Jan 2026 16:14:34 -0800 Subject: [PATCH 5/8] Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../implementation/DocumentServiceRequestContext.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java index 269b7b9f169d..9ff6c91395ad 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java @@ -8,7 +8,13 @@ import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; import com.azure.cosmos.CosmosException; import com.azure.cosmos.ReadConsistencyStrategy; -import com.azure.cosmos.implementation.directconnectivity.*; +import com.azure.cosmos.implementation.directconnectivity.ISessionToken; +import com.azure.cosmos.implementation.directconnectivity.PartitionKeyRange; +import com.azure.cosmos.implementation.directconnectivity.RequestChargeTracker; +import com.azure.cosmos.implementation.directconnectivity.StoreResponse; +import com.azure.cosmos.implementation.directconnectivity.StoreResult; +import com.azure.cosmos.implementation.directconnectivity.TimeoutHelper; +import com.azure.cosmos.implementation.directconnectivity.Uri; import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PartitionLevelFailoverInfo; import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionFailoverInfoHolder; import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder; From e703663a11614d8bd3f7acf7a8eeef614c920e81 Mon Sep 17 00:00:00 2001 From: Bhaskar Mallapragada Date: Tue, 20 Jan 2026 16:15:14 -0800 Subject: [PATCH 6/8] Update code doc Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../directconnectivity/ConsistencyWriter.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java index dc09773d74be..b2a2cb358fe4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java @@ -112,16 +112,17 @@ public ConsistencyWriter( } /** - * Constructor for ConsistencyWriter with StoreReader parameter for dependency injection in unit tests - * @param diagnosticsClientContext - * @param addressSelector - * @param sessionContainer - * @param transportClient - * @param authorizationTokenProvider - * @param serviceConfigReader - * @param useMultipleWriteLocations - * @param reader - * @param sessionRetryOptions + * Constructor for ConsistencyWriter with StoreReader parameter for dependency injection in unit tests. + * + * @param diagnosticsClientContext the diagnostics client context used to capture diagnostics for requests. + * @param addressSelector the address selector used to resolve physical replica addresses for requests. + * @param sessionContainer the session container used for managing and maintaining session tokens. + * @param transportClient the transport client used to send requests to the backend replicas. + * @param authorizationTokenProvider the authorization token provider used to authenticate requests. + * @param serviceConfigReader the gateway service configuration reader providing service configuration settings. + * @param useMultipleWriteLocations flag indicating whether multiple write locations are enabled for the account. + * @param reader the StoreReader instance to use, injected for testing instead of creating a new StoreReader. + * @param sessionRetryOptions the session retry options used to handle session token mismatch retries. */ public ConsistencyWriter( DiagnosticsClientContext diagnosticsClientContext, From 4c025beb6e4bae9d8b7cb4e76cf0907707cf0d1b Mon Sep 17 00:00:00 2001 From: Bhaskar Mallapragada Date: Tue, 20 Jan 2026 16:15:30 -0800 Subject: [PATCH 7/8] adding unit test group Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../directconnectivity/ConsistencyWriterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java index a54673bb11dd..a27deb774a35 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java @@ -384,7 +384,7 @@ public void writeAsyncNRegionCommitRequestFailed() { runWriteAsyncBarrierableRequestTest(false, false); } - @Test + @Test(groups = "unit") public void writeAsyncNoBarrierRequest() { initializeConsistencyWriter(false); RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext); From c9b55ef5a62cfd5356672567175be2a6455475db Mon Sep 17 00:00:00 2001 From: Bhaskar Mallapragada Date: Tue, 20 Jan 2026 17:52:59 -0800 Subject: [PATCH 8/8] Fixing imports issue caused by copilot --- .../cosmos/implementation/DocumentServiceRequestContext.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java index 9ff6c91395ad..e13d198e97c4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java @@ -8,9 +8,7 @@ import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; import com.azure.cosmos.CosmosException; import com.azure.cosmos.ReadConsistencyStrategy; -import com.azure.cosmos.implementation.directconnectivity.ISessionToken; -import com.azure.cosmos.implementation.directconnectivity.PartitionKeyRange; -import com.azure.cosmos.implementation.directconnectivity.RequestChargeTracker; +import com.azure.cosmos.implementation.directconnectivity.BarrierType; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; import com.azure.cosmos.implementation.directconnectivity.StoreResult; import com.azure.cosmos.implementation.directconnectivity.TimeoutHelper;