Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.implementation.AuthorizationTokenType;
import com.azure.cosmos.implementation.ClientSideRequestStatistics;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.FailureValidator;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.HttpConstants.SubStatusCodes;
import com.azure.cosmos.implementation.IAuthorizationTokenProvider;
import com.azure.cosmos.implementation.ISessionContainer;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.PartitionIsMigratingException;
import com.azure.cosmos.implementation.PartitionKeyRangeGoneException;
import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException;
import com.azure.cosmos.implementation.RequestTimeoutException;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RetryContext;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.SessionTokenHelper;
Expand All @@ -33,10 +37,12 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -358,6 +364,224 @@ public void isGlobalStrongRequest(ConsistencyLevel defaultConsistencyLevel, RxDo
assertThat(consistencyWriter.isGlobalStrongRequest(req, storeResponse)).isEqualTo(isGlobalStrongExpected);
}

@Test(groups = "unit")
public void writeAsyncGlobalStrongRequest() {
runWriteAsyncBarrierableRequestTest(true, true);
}

@Test(groups = "unit")
public void writeAsyncGlobalStrongRequestFailed() {
runWriteAsyncBarrierableRequestTest(true, false);
}

@Test(groups = "unit")
public void writeAsyncNRegionCommitRequest() {
runWriteAsyncBarrierableRequestTest(false, true);
}

@Test(groups = "unit")
public void writeAsyncNRegionCommitRequestFailed() {
runWriteAsyncBarrierableRequestTest(false, false);
}

@Test(groups = "unit")
public void writeAsyncNoBarrierRequest() {
initializeConsistencyWriter(false);
RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext);
TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class);
Mockito.doReturn(false).when(timeoutHelper).isElapsed();
StoreResponse storeResponse = Mockito.mock(StoreResponse.class);
Mockito.doReturn("0").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS);
Mockito.doReturn(ConsistencyLevel.SESSION).when(serviceConfigReader).getDefaultConsistencyLevel();
ConsistencyWriter spyWriter = Mockito.spy(this.consistencyWriter);
Mockito.doReturn(Mono.just(storeResponse)).when(spyWriter).barrierForGlobalStrong(Mockito.any(), Mockito.any(), Mockito.any());
AddressInformation addressInformation = Mockito.mock(AddressInformation.class);
Uri primaryUri = Mockito.mock(Uri.class);
Mockito.doReturn(true).when(primaryUri).isPrimary();
Mockito.doReturn("Healthy").when(primaryUri).getHealthStatusDiagnosticString();
Mockito.doReturn(primaryUri).when(addressInformation).getPhysicalUri();
List<AddressInformation> addressList = Collections.singletonList(addressInformation);
Mockito.doReturn(Mono.just(addressList)).when(addressSelector).resolveAddressesAsync(Mockito.any(), Mockito.anyBoolean());
Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class));
Mono<StoreResponse> result = spyWriter.writeAsync(request, timeoutHelper, false);
StepVerifier.create(result)
.expectNext(storeResponse)
.expectComplete()
.verify();
}

@Test
public void isBarrierRequest() {
Comment on lines +413 to +414
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test method is missing the TestNG groups attribute that is present on similar test methods in this file. For consistency with the other test methods, this should include @test(groups = "unit").

Copilot uses AI. Check for mistakes.
// Setup ConsistencyWriter with useMultipleWriteLocations false
initializeConsistencyWriter(false);
ConsistencyWriter writer = this.consistencyWriter;
RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext);
StoreResponse response = Mockito.mock(StoreResponse.class);

// 1. Global strong enabled and isGlobalStrongRequest returns true
try (MockedStatic<ReplicatedResourceClient> replicatedResourceClientMock = Mockito.mockStatic(ReplicatedResourceClient.class)) {
replicatedResourceClientMock.when(ReplicatedResourceClient::isGlobalStrongEnabled).thenReturn(true);
ConsistencyWriter spyWriter = Mockito.spy(writer);
Mockito.doReturn(true).when(spyWriter).isGlobalStrongRequest(request, response);
boolean result = spyWriter.isBarrierRequest(request, response);
assertThat(result).isTrue();
}

// 2. NRegionSynchronousCommitEnabled path
// Setup request.requestContext.getNRegionSynchronousCommitEnabled() to true
request.requestContext.setNRegionSynchronousCommitEnabled(true);
// useMultipleWriteLocations is already false
Mockito.doReturn("123").when(response).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN);
Mockito.doReturn(2L).when(response).getNumberOfReadRegions();
boolean nRegionResult = writer.isBarrierRequest(request, response);
assertThat(nRegionResult).isTrue();

// 3. Negative case: NRegionSynchronousCommitEnabled false
request.requestContext.setNRegionSynchronousCommitEnabled(false);
boolean negativeResult = writer.isBarrierRequest(request, response);
assertThat(negativeResult).isFalse();

// 4. Negative case: useMultipleWriteLocations true
initializeConsistencyWriter(true);
writer = this.consistencyWriter;
request.requestContext.setNRegionSynchronousCommitEnabled(true);
boolean negativeResult2 = writer.isBarrierRequest(request, response);
assertThat(negativeResult2).isFalse();

// 5. Negative case: GLOBAL_NREGION_COMMITTED_LSN header missing
initializeConsistencyWriter(false);
writer = this.consistencyWriter;
request.requestContext.setNRegionSynchronousCommitEnabled(true);
Mockito.doReturn(null).when(response).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN);
boolean negativeResult3 = writer.isBarrierRequest(request, response);
assertThat(negativeResult3).isFalse();

// 6. Negative case: NUMBER_OF_READ_REGIONS header missing or zero
Mockito.doReturn(0L).when(response).getNumberOfReadRegions();
boolean negativeResult4 = writer.isBarrierRequest(request, response);
assertThat(negativeResult4).isFalse();
}

private void runWriteAsyncBarrierableRequestTest(boolean globalStrong, boolean barrierMet) {
RxDocumentServiceRequest request = setupRequest(!globalStrong);
TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class);
Mockito.doReturn(false).when(timeoutHelper).isElapsed();
StoreResponse storeResponse = setupStoreResponse(!globalStrong);
List<AddressInformation> addressList = setupAddressList();
List<StoreResult> storeResults = new ArrayList<>();
if (barrierMet) {
storeResults.add(getStoreResult(storeResponse, 1L));
storeResults.add(getStoreResult(storeResponse, 2L));
} else {
storeResults.add(getStoreResult(storeResponse, 1L));
}
StoreReader storeReader = setupStoreReader(storeResults);
initializeConsistencyWriterWithStoreReader(false, storeReader);
ConsistencyWriter spyWriter = Mockito.spy(this.consistencyWriter);
Mockito.doReturn(globalStrong ? ConsistencyLevel.STRONG : ConsistencyLevel.SESSION)
.when(serviceConfigReader).getDefaultConsistencyLevel();
Mockito.doReturn(Mono.just(addressList)).when(addressSelector).resolveAddressesAsync(Mockito.any(), Mockito.anyBoolean());
Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class));
Mono<StoreResponse> result = spyWriter.writeAsync(request, timeoutHelper, false);
if (!barrierMet) {
StepVerifier.create(result)
.expectErrorSatisfies(error -> {
assertThat(error).isInstanceOf(GoneException.class);
FailureValidator failureValidator = FailureValidator.builder()
.instanceOf(GoneException.class)
.statusCode(GONE)
.subStatusCode(globalStrong ? SubStatusCodes.GLOBAL_STRONG_WRITE_BARRIER_NOT_MET : SubStatusCodes.GLOBAL_N_REGION_COMMIT_WRITE_BARRIER_NOT_MET)
.build();
failureValidator.validate(error);
})
.verify();
} else {
StepVerifier.create(result)
.expectNext(storeResponse)
.expectComplete()
.verify();
}
}

private RxDocumentServiceRequest setupRequest(boolean nRegionCommit) {
RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext);
if (nRegionCommit) {
request.requestContext.setNRegionSynchronousCommitEnabled(true);
}
Mockito.doReturn(ResourceType.Document).when(request).getResourceType();
Mockito.doReturn(OperationType.Create).when(request).getOperationType();
Mockito.doReturn("1-MxAPlgMgA=").when(request).getResourceId();
request.authorizationTokenType = AuthorizationTokenType.PrimaryMasterKey;
return request;
}

private StoreResponse setupStoreResponse(boolean nRegionCommit) {
StoreResponse storeResponse = Mockito.mock(StoreResponse.class);
Mockito.doReturn(1L).when(storeResponse).getNumberOfReadRegions();
Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS);
if (nRegionCommit) {
Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN);
} else {
Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN);
}
Mockito.doReturn("2").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.LSN);
return storeResponse;
}

private List<AddressInformation> setupAddressList() {
AddressInformation addressInformation = Mockito.mock(AddressInformation.class);
Uri primaryUri = Mockito.mock(Uri.class);
Mockito.doReturn(true).when(primaryUri).isPrimary();
Mockito.doReturn("Healthy").when(primaryUri).getHealthStatusDiagnosticString();
Mockito.doReturn(primaryUri).when(addressInformation).getPhysicalUri();
return Collections.singletonList(addressInformation);
}

private StoreReader setupStoreReader(List<StoreResult> storeResults) {
StoreReader storeReader = Mockito.mock(StoreReader.class);
Mono<List<StoreResult>>[] monos = storeResults.stream()
.map(Collections::singletonList)
.map(Mono::just)
.toArray(Mono[]::new);
Mockito.when(storeReader.readMultipleReplicaAsync(
Mockito.any(),
Mockito.anyBoolean(),
Mockito.anyInt(),
Mockito.anyBoolean(),
Mockito.anyBoolean(),
Mockito.any(),
Mockito.anyBoolean(),
Mockito.anyBoolean()))
.thenReturn(monos.length > 0 ? monos[0] : Mono.empty(),
Arrays.copyOfRange(monos, 1, monos.length));
return storeReader;
}

private StoreResult getStoreResult(StoreResponse storeResponse, long globalCommittedLsn) {
return new StoreResult(
storeResponse,
null,
"1",
1,
1,
1.0,
UUID.randomUUID().toString(),
UUID.randomUUID().toString(),
4,
2,
true,
null,
globalCommittedLsn,
1,
1,
null,
0.3,
90.0);
}




private void initializeConsistencyWriter(boolean useMultipleWriteLocation) {
addressSelector = Mockito.mock(AddressSelector.class);
sessionContainer = Mockito.mock(ISessionContainer.class);
Expand All @@ -375,6 +599,24 @@ private void initializeConsistencyWriter(boolean useMultipleWriteLocation) {
null);
}

private void initializeConsistencyWriterWithStoreReader(boolean useMultipleWriteLocation, StoreReader reader) {
addressSelector = Mockito.mock(AddressSelector.class);
sessionContainer = Mockito.mock(ISessionContainer.class);
transportClient = Mockito.mock(TransportClient.class);
IAuthorizationTokenProvider authorizationTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class);
serviceConfigReader = Mockito.mock(GatewayServiceConfigurationReader.class);

consistencyWriter = new ConsistencyWriter(clientContext,
addressSelector,
sessionContainer,
transportClient,
authorizationTokenProvider,
serviceConfigReader,
useMultipleWriteLocation,
reader,
null);
}

public static <T> void validateError(Mono<T> single,
FailureValidator validator) {
TestSubscriber<T> testSubscriber = TestSubscriber.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ public static final class Properties {
public static final String THINCLIENT_READABLE_LOCATIONS = "thinClientReadableLocations";
public static final String DATABASE_ACCOUNT_ENDPOINT = "databaseAccountEndpoint";
public static final String ENABLE_PER_PARTITION_FAILOVER_BEHAVIOR = "enablePerPartitionFailoverBehavior";
public static final String ENABLE_N_REGION_SYNCHRONOUS_COMMIT = "enableNRegionSynchronousCommit";

//Authorization
public static final String MASTER_TOKEN = "master";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,23 @@ public Boolean isPerPartitionFailoverBehaviorEnabled() {
return null;
}

/**
* Returns true if the account supports N region synchronous commit,
* false if enableNRegionSynchronousCommit evaluates to null or false.
* <p>
* If enableNRegionSynchronousCommit property does not exist in account metadata JSON payload, null is returned.
*
* @return true if the account supports N region synchronous commit, false otherwise.
*/
public Boolean isNRegionSynchronousCommitEnabled() {

if (super.has(Constants.Properties.ENABLE_N_REGION_SYNCHRONOUS_COMMIT)) {
return ObjectUtils.defaultIfNull(super.getBoolean(Constants.Properties.ENABLE_N_REGION_SYNCHRONOUS_COMMIT), false);
}

return null;
}

public void setIsPerPartitionFailoverBehaviorEnabled(boolean value) {
this.set(Constants.Properties.ENABLE_PER_PARTITION_FAILOVER_BEHAVIOR, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.ReadConsistencyStrategy;
import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PartitionLevelFailoverInfo;
import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionFailoverInfoHolder;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.LocationSpecificHealthContext;
import com.azure.cosmos.implementation.directconnectivity.BarrierType;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.StoreResult;
import com.azure.cosmos.implementation.directconnectivity.TimeoutHelper;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PartitionLevelFailoverInfo;
import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionFailoverInfoHolder;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.LocationSpecificHealthContext;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
import com.azure.cosmos.implementation.throughputControl.ThroughputControlRequestContext;
Expand All @@ -38,7 +39,7 @@ public class DocumentServiceRequestContext implements Cloneable {
public volatile ISessionToken sessionToken;
public volatile long quorumSelectedLSN;
public volatile long globalCommittedSelectedLSN;
public volatile StoreResponse globalStrongWriteResponse;
public volatile StoreResponse cachedWriteResponse;
public volatile ConsistencyLevel originalRequestConsistencyLevel;
public volatile ReadConsistencyStrategy readConsistencyStrategy;
public volatile PartitionKeyRange resolvedPartitionKeyRange;
Expand All @@ -65,6 +66,8 @@ public class DocumentServiceRequestContext implements Cloneable {
private volatile long approximateBloomFilterInsertionCount;
private final Set<String> sessionTokenEvaluationResults = ConcurrentHashMap.newKeySet();
private volatile List<String> unavailableRegionsForPartition;
private volatile Boolean nRegionSynchronousCommitEnabled;
private volatile BarrierType barrierType = BarrierType.NONE;

// For cancelled rntbd requests, track the response as OperationCancelledException which later will be used to populate the cosmosDiagnostics
public final Map<String, CosmosException> rntbdCancelledRequestMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -148,7 +151,7 @@ public DocumentServiceRequestContext clone() {
context.sessionToken = this.sessionToken;
context.quorumSelectedLSN = this.quorumSelectedLSN;
context.globalCommittedSelectedLSN = this.globalCommittedSelectedLSN;
context.globalStrongWriteResponse = this.globalStrongWriteResponse;
context.cachedWriteResponse = this.cachedWriteResponse;
context.originalRequestConsistencyLevel = this.originalRequestConsistencyLevel;
context.readConsistencyStrategy = this.readConsistencyStrategy;
context.resolvedPartitionKeyRange = this.resolvedPartitionKeyRange;
Expand Down Expand Up @@ -266,5 +269,21 @@ public void setPerPartitionAutomaticFailoverInfoHolder(PartitionLevelFailoverInf
this.perPartitionFailoverInfoHolder.setPartitionLevelFailoverInfo(partitionLevelFailoverInfo);
}
}

public Boolean getNRegionSynchronousCommitEnabled() {
return nRegionSynchronousCommitEnabled;
}

public void setNRegionSynchronousCommitEnabled(Boolean nRegionSynchronousCommitEnabled) {
this.nRegionSynchronousCommitEnabled = nRegionSynchronousCommitEnabled;
}

public BarrierType getBarrierType() {
return barrierType;
}

public void setBarrierType(BarrierType barrierType) {
this.barrierType = barrierType;
}
}

Loading
Loading