diff --git a/sdk/cosmos/azure-cosmos-tests/THINCLIENT_TEST_MATRIX.md b/sdk/cosmos/azure-cosmos-tests/THINCLIENT_TEST_MATRIX.md new file mode 100644 index 000000000000..80d8f7af40a5 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/THINCLIENT_TEST_MATRIX.md @@ -0,0 +1,212 @@ +# Thin Client E2E Test Matrix — Gateway V2 QueryPlan Support + +--- + +## 1. Query Tests (`ThinClientQueryE2ETest`) — 80 tests + +### Filtering (WHERE clause) +| Test | SQL | Query Feature | +|------|-----|---------------| +| `testSelectAll` | `SELECT * FROM c` | Full scan | +| `testWhereEquality` | `SELECT * FROM c WHERE c.category = 'electronics'` | Equality filter | +| `testWhereEqualityParameterized` | `SELECT * FROM c WHERE c.category = @cat` | Parameterized query | +| `testWhereRangeGreaterThan` | `SELECT * FROM c WHERE c.age > 30` | Range (>) | +| `testWhereRangeLessThanOrEqual` | `SELECT * FROM c WHERE c.price <= 25.00` | Range (<=) | +| `testWhereRangeBetween` | `SELECT * FROM c WHERE c.age >= 18 AND c.age <= 40` | Range (between) | +| `testWhereIn` | `SELECT * FROM c WHERE c.category IN ('electronics', 'toys')` | IN operator | +| `testWhereCompoundAndOr` | `SELECT * FROM c WHERE c.status = 'active' AND (...)` | Compound AND/OR | +| `testWhereNotEqual` | `SELECT * FROM c WHERE c.status != 'inactive'` | Not equal | +| `testWhereBooleanField` | `SELECT * FROM c WHERE c.isActive = true` | Boolean filter | +| `testWhereIsDefined` | `SELECT * FROM c WHERE IS_DEFINED(c.address)` | IS_DEFINED | +| `testWhereStartsWith` | `SELECT * FROM c WHERE STARTSWITH(c.category, 'elec')` | STARTSWITH | +| `testWhereContains` | `SELECT * FROM c WHERE CONTAINS(c.category, 'ook')` | CONTAINS | +| `testWhereArrayContains` | `SELECT * FROM c WHERE ARRAY_CONTAINS(c.scores, 50)` | ARRAY_CONTAINS | +| `testWhereNestedProperty` | `SELECT * FROM c WHERE c.address.city = 'Seattle'` | Nested property | +| `testBetween` | `SELECT * FROM c WHERE c.age BETWEEN 18 AND 40` | BETWEEN keyword | + +### Projection +| Test | SQL | Query Feature | +|------|-----|---------------| +| `testSelectSpecificFields` | `SELECT c.id, c.category, c.price FROM c` | Field projection | +| `testSelectComputedAlias` | `SELECT c.id, c.price * 1.1 AS taxedPrice FROM c` | Computed alias | +| `testSelectValueObject` | `SELECT VALUE { name: c.category, loc: c.address.city } FROM c` | VALUE with JSON object | +| `testSelectValueScalar` | `SELECT VALUE c.category FROM c` | VALUE scalar | + +### ORDER BY +| Test | SQL | Query Feature | +|------|-----|---------------| +| `testOrderByAsc` | `SELECT * FROM c ORDER BY c.age` | ORDER BY ASC | +| `testOrderByDesc` | `SELECT * FROM c ORDER BY c.price DESC` | ORDER BY DESC | + +### DISTINCT +| Test | SQL | Query Feature | +|------|-----|---------------| +| `testDistinctValue` | `SELECT DISTINCT VALUE c.category FROM c` | DISTINCT VALUE (string) | +| `testDistinctValueBoolean` | `SELECT DISTINCT VALUE c.isActive FROM c` | DISTINCT VALUE (boolean) | + +### TOP +| Test | SQL | Query Feature | +|------|-----|---------------| +| `testTop` | `SELECT TOP 3 * FROM c` | TOP | +| `testTopWithOrderBy` | `SELECT TOP 5 * FROM c ORDER BY c.price DESC` | TOP + ORDER BY | + +### Aggregates +| Test | SQL | Query Feature | +|------|-----|---------------| +| `testCount` | `SELECT VALUE COUNT(1) FROM c` | COUNT | +| `testSum` | `SELECT VALUE SUM(c.price) FROM c` | SUM | +| `testAvg` | `SELECT VALUE AVG(c.age) FROM c` | AVG | +| `testMin` | `SELECT VALUE MIN(c.price) FROM c` | MIN | +| `testMax` | `SELECT VALUE MAX(c.age) FROM c` | MAX | + +### GROUP BY +| Test | SQL | Query Feature | +|------|-----|---------------| +| `testGroupByCount` | `SELECT c.category, COUNT(1) as cnt FROM c GROUP BY c.category` | GROUP BY + COUNT | +| `testGroupBySumAvg` | `SELECT c.category, SUM(c.price) as total, AVG(c.price) as avg FROM c GROUP BY c.category` | GROUP BY + SUM + AVG | + +### OFFSET / LIMIT +| Test | SQL | Query Feature | +|------|-----|---------------| +| `testOffsetLimit` | `SELECT * FROM c ORDER BY c.idx OFFSET 3 LIMIT 4` | OFFSET + LIMIT | + +### JOIN (self-join on arrays) +| Test | SQL | Query Feature | +|------|-----|---------------| +| `testJoinScoresArray` | `SELECT c.id, s AS score FROM c JOIN s IN c.scores` | JOIN (int array) | +| `testJoinWithFilter` | `SELECT c.id, s AS score FROM c JOIN s IN c.scores WHERE s >= 50` | JOIN + WHERE | +| `testJoinTagsArray` | `SELECT c.id, t AS tag FROM c JOIN t IN c.tags` | JOIN (string array) | + +### EXISTS subquery +| Test | SQL | Query Feature | +|------|-----|---------------| +| `testExistsSubquery` | `SELECT * FROM c WHERE EXISTS (SELECT VALUE s FROM s IN c.scores WHERE s > 60)` | EXISTS | +| `testExistsSubqueryWithStringMatch` | `SELECT * FROM c WHERE EXISTS (SELECT VALUE t FROM t IN c.tags WHERE t = 'on-sale')` | EXISTS + string match | +| `testExistsAliasInProjection` | `SELECT c.id, EXISTS (...) AS hasHighScore FROM c` | EXISTS in projection | + +### LIKE +| Test | SQL | Query Feature | +|------|-----|---------------| +| `testLikePrefix` | `SELECT * FROM c WHERE c.category LIKE 'elec%'` | LIKE prefix | +| `testLikeSuffix` | `SELECT * FROM c WHERE c.category LIKE '%ing'` | LIKE suffix | +| `testLikeContains` | `SELECT * FROM c WHERE c.category LIKE '%ook%'` | LIKE contains | + +### String Functions +| Test | SQL | Function | +|------|-----|----------| +| `testStringConcat` | `SELECT CONCAT(c.category, '-', c.status) AS label FROM c` | CONCAT | +| `testStringEndsWith` | `SELECT * FROM c WHERE ENDSWITH(c.category, 'ics')` | ENDSWITH | +| `testStringLower` | `SELECT LOWER(c.category) AS lowerCat FROM c` | LOWER | +| `testStringUpper` | `SELECT UPPER(c.status) AS upperStatus FROM c` | UPPER | +| `testStringLength` | `SELECT c.category, LENGTH(c.category) AS len FROM c` | LENGTH | +| `testStringSubstring` | `SELECT SUBSTRING(c.category, 0, 4) AS prefix FROM c` | SUBSTRING | +| `testStringReplace` | `SELECT REPLACE(c.category, 'o', '0') AS replaced FROM c` | REPLACE | +| `testStringIndexOf` | `SELECT INDEX_OF(c.category, 'o') AS pos FROM c` | INDEX_OF | +| `testStringLeft` | `SELECT LEFT(c.category, 3) AS l FROM c` | LEFT | +| `testStringReverse` | `SELECT REVERSE(c.category) AS rev FROM c` | REVERSE | +| `testStringTrim` | `SELECT TRIM(c.status) AS trimmed FROM c` | TRIM | +| `testRegexMatch` | `SELECT * FROM c WHERE RegexMatch(c.category, '^elec.*')` | RegexMatch | + +### Type Checking Functions +| Test | SQL | Function | +|------|-----|----------| +| `testIsArray` | `SELECT c.id, IS_ARRAY(c.scores) AS isArr FROM c` | IS_ARRAY | +| `testIsBool` | `SELECT c.id, IS_BOOL(c.isActive) AS isBool FROM c` | IS_BOOL | +| `testIsNull` | `SELECT * FROM c WHERE IS_NULL(c.nonExistentField)` | IS_NULL | +| `testIsNumber` | `SELECT c.id, IS_NUMBER(c.age) AS isNum FROM c` | IS_NUMBER | +| `testIsString` | `SELECT c.id, IS_STRING(c.category) AS isStr FROM c` | IS_STRING | +| `testIsObject` | `SELECT c.id, IS_OBJECT(c.address) AS isObj FROM c` | IS_OBJECT | + +### Math Functions +| Test | SQL | Function | +|------|-----|----------| +| `testMathAbs` | `SELECT ABS(c.age - 30) AS diff FROM c` | ABS | +| `testMathCeilingFloor` | `SELECT CEILING(c.price) AS ceil, FLOOR(c.price) AS flr FROM c` | CEILING, FLOOR | +| `testMathRound` | `SELECT ROUND(c.price) AS rounded FROM c` | ROUND | +| `testMathPower` | `SELECT POWER(c.age, 2) AS ageSq FROM c` | POWER | +| `testMathSqrt` | `SELECT SQRT(c.price) AS sqrtPrice FROM c` | SQRT | + +### Array Functions +| Test | SQL | Function | +|------|-----|----------| +| `testArrayLength` | `SELECT c.id, ARRAY_LENGTH(c.scores) AS len FROM c` | ARRAY_LENGTH | +| `testArraySlice` | `SELECT c.id, ARRAY_SLICE(c.tags, 0, 1) AS firstTag FROM c` | ARRAY_SLICE | + +### Conditional Functions +| Test | SQL | Function | +|------|-----|----------| +| `testIif` | `SELECT c.id, IIF(c.age >= 18, 'adult', 'minor') AS ageGroup FROM c` | IIF | + +### Date/Time Functions +| Test | SQL | Function | +|------|-----|----------| +| `testGetCurrentDateTime` | `SELECT VALUE GetCurrentDateTime()` | GetCurrentDateTime | + +### Cross-Partition +| Test | SQL | Query Feature | +|------|-----|---------------| +| `testCrossPartitionSelectAll` | `SELECT * FROM c ORDER BY c.idx` | Cross-partition (no PK filter) | +| `testCrossPartitionWhereFilter` | `SELECT * FROM c WHERE c.category = 'electronics' ORDER BY c.idx` | Cross-partition + filter | + +### Multi-Range (creates dedicated container with multiple PKs) +| Test | SQL | Query Feature | +|------|-----|---------------| +| `testMultiRangePartitionKeyInClause` | `SELECT * FROM c WHERE c.mypk IN (pk1, pk3, pk5)` | Multi-range IN | +| `testMultiRangePartitionKeyOrClause` | `SELECT * FROM c WHERE c.mypk = 'pk-or-1' OR c.mypk = 'pk-or-3'` | Multi-range OR | +| `testMultiRangeManyPartitionKeys` | `SELECT * FROM c WHERE c.mypk IN (pk1..pk10)` | Multi-range (10 PKs) | + +### Continuation Token +| Test | SQL | Query Feature | +|------|-----|---------------| +| `testContinuationTokenDraining` | `SELECT * FROM c` (page size 2) | Pagination / continuation tokens | + +### Error Handling +| Test | SQL | Query Feature | +|------|-----|---------------| +| `testInvalidQueryReturnsBadRequest` | `SELEC * FORM c` (invalid) | 400 BadRequest validation | + +### Vector Search (requires `EnableNoSQLVectorSearch` capability) +| Test | SQL | Query Feature | +|------|-----|---------------| +| `testVectorSearchGatewayVsThinClient` | `SELECT TOP 5 c.id, VectorDistance(c.embedding, [...]) AS score FROM c ORDER BY VectorDistance(...)` | VectorDistance + FLAT index | +| `testFullTextSearchGatewayVsThinClient` | `SELECT TOP 10 * FROM c WHERE FullTextContains(c.text, 'mountain')` | FullTextContains | +| `testHybridSearchGatewayVsThinClient` | `SELECT TOP 3 * FROM c ORDER BY RANK RRF(VectorDistance(...), FullTextScore(...))` | Hybrid RRF (vector + full-text) | + +--- + +## 2. Point Operations (`ThinClientPointOperationE2ETest`) — 3 tests + +| Test | Operation | Coverage | +|------|-----------|----------| +| `testThinClientDocumentPointOperations` | Create, Read, Replace, Upsert, Patch, Delete | Full CRUD + Patch lifecycle | +| `testThinClientBulk` | Bulk create + bulk read | Bulk operations | +| `testThinClientBatch` | Transactional batch (create + read) | CosmosBatch | + +--- + +## 3. Change Feed (`ThinClientChangeFeedE2ETest`) — 3 tests + +| Test | FeedRange | Coverage | +|------|-----------|----------| +| `testThinClientIncrementalChangeFeed` | `FeedRange.forLogicalPartition(pk)` | Incremental change feed (via batch insert) | +| `testThinClientChangeFeedFullRange` | `FeedRange.forFullRange()` | Cross-partition change feed | +| `testThinClientChangeFeedPartitionKey` | `FeedRange.forLogicalPartition(pk)` | Single-PK feed with exact count + PK validation | + +--- + +## 4. Stored Procedures (`ThinClientStoredProcedureE2ETest`) — 3 tests + +| Test | Operation | Coverage | +|------|-----------|----------| +| `testThinClientStoredProcedure` | Create + execute sproc | Sproc creates a document, verifies execution | +| `testStoredProcedureExecutionWithoutPartitionKeyThrows` | Execute without PK | Validates 400 error | +| `testThinClientStoredProcedureWithPartitionKeyNone` | Execute with `PartitionKey.NONE` | Non-partitioned sproc execution | + +--- + +## Test Infrastructure + +- **Test data**: 10 diverse documents seeded per partition (categories, prices, ages, nested objects, arrays, tags, booleans) +- **Shared container**: `/mypk` partition key, reused across query tests +- **Comparison method**: Direct TCP vs Thin Client (HTTP/2 → Proxy), assert identical results +- **Endpoint validation**: Every test asserts thin client used `:10250` endpoint, gateway used `:443` diff --git a/sdk/cosmos/azure-cosmos-tests/pom.xml b/sdk/cosmos/azure-cosmos-tests/pom.xml index 6d019db1dabb..b42e6ed08ed3 100644 --- a/sdk/cosmos/azure-cosmos-tests/pom.xml +++ b/sdk/cosmos/azure-cosmos-tests/pom.xml @@ -848,6 +848,9 @@ Licensed under the MIT License. true + ${cosmos.use.aad.auth} + true + true 1 256 paranoid diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientE2ETest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientE2ETest.java deleted file mode 100644 index 5589d147783b..000000000000 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientE2ETest.java +++ /dev/null @@ -1,378 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package com.azure.cosmos.implementation; - -import com.azure.cosmos.ConsistencyLevel; -import com.azure.cosmos.CosmosAsyncClient; -import com.azure.cosmos.CosmosAsyncContainer; -import com.azure.cosmos.CosmosClientBuilder; -import com.azure.cosmos.CosmosDiagnostics; -import com.azure.cosmos.CosmosDiagnosticsContext; -import com.azure.cosmos.CosmosDiagnosticsRequestInfo; -import com.azure.cosmos.FlakyTestRetryAnalyzer; -import com.azure.cosmos.models.CosmosBatch; -import com.azure.cosmos.models.CosmosBatchResponse; -import com.azure.cosmos.models.CosmosBulkItemResponse; -import com.azure.cosmos.models.CosmosBulkOperationResponse; -import com.azure.cosmos.models.CosmosBulkOperations; -import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; -import com.azure.cosmos.models.CosmosQueryRequestOptions; -import com.azure.cosmos.models.FeedRange; -import com.azure.cosmos.models.PartitionKey; -import com.azure.cosmos.models.SqlQuerySpec; -import com.azure.cosmos.models.SqlParameter; -import com.azure.cosmos.models.FeedResponse; -import com.azure.cosmos.models.CosmosContainerProperties; -import com.azure.cosmos.models.ThroughputProperties; -import com.azure.cosmos.models.CosmosItemResponse; -import com.azure.cosmos.models.CosmosItemRequestOptions; -import com.azure.cosmos.models.CosmosPatchOperations; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.Test; -import reactor.core.publisher.Flux; - -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.UUID; - -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import static org.assertj.core.api.Fail.fail; - -// End to end sanity tests for basic thin client functionality. -public class ThinClientE2ETest { - private static final Logger logger = LoggerFactory.getLogger(ThinClientE2ETest.class); - private static final String thinClientEndpointIndicator = ":10250/"; - - @Test(groups = {"thinclient"}, retryAnalyzer = FlakyTestRetryAnalyzer.class) - public void testThinClientQuery() { - CosmosAsyncClient client = null; - try { - // If running locally, uncomment these lines - // System.setProperty("COSMOS.THINCLIENT_ENABLED", "true"); - // System.setProperty("COSMOS.HTTP2_ENABLED", "true"); - - client = new CosmosClientBuilder() - .endpoint(TestConfigurations.HOST) - .key(TestConfigurations.MASTER_KEY) - .gatewayMode() - .consistencyLevel(ConsistencyLevel.SESSION) - .buildAsyncClient(); - - CosmosAsyncContainer container = client.getDatabase("db1").getContainer("c2"); - String idName = "id"; - String partitionKeyName = "partitionKey"; - ObjectMapper mapper = new ObjectMapper(); - ObjectNode doc = mapper.createObjectNode(); - String idValue = UUID.randomUUID().toString(); - doc.put(idName, idValue); - doc.put(partitionKeyName, idValue); - - container.createItem(doc, new PartitionKey(idValue), null).block(); - - String query = "select * from c WHERE c." + partitionKeyName + "=@id"; - SqlQuerySpec querySpec = new SqlQuerySpec(query); - querySpec.setParameters(Arrays.asList(new SqlParameter("@id", idValue))); - CosmosQueryRequestOptions requestOptions = - new CosmosQueryRequestOptions().setPartitionKey(new PartitionKey(idValue)); - FeedResponse response = container - .queryItems(querySpec, requestOptions, ObjectNode.class) - .byPage() - .blockFirst(); - - ObjectNode docFromResponse = response.getResults().get(0); - assertThat(docFromResponse.get(partitionKeyName).textValue()).isEqualTo(idValue); - assertThat(docFromResponse.get(idName).textValue()).isEqualTo(idValue); - assertThinClientEndpointUsed(response.getCosmosDiagnostics()); - - } finally { - if (client != null) { - client.close(); - } - } - } - - @Test(groups = {"thinclient"}, retryAnalyzer = FlakyTestRetryAnalyzer.class) - public void testThinClientBulk() { - CosmosAsyncClient client = null; - try { - // If running locally, uncomment these lines - // System.setProperty("COSMOS.THINCLIENT_ENABLED", "true"); - // System.setProperty("COSMOS.HTTP2_ENABLED", "true"); - - client = new CosmosClientBuilder() - .endpoint(TestConfigurations.HOST) - .key(TestConfigurations.MASTER_KEY) - .gatewayMode() - .consistencyLevel(ConsistencyLevel.EVENTUAL) - .buildAsyncClient(); - - CosmosAsyncContainer container = client.getDatabase("db1").getContainer("c2"); - String idName = "id"; - String partitionKeyName = "partitionKey"; - ObjectMapper mapper = new ObjectMapper(); - ObjectNode doc = mapper.createObjectNode(); - String idValue = UUID.randomUUID().toString(); - doc.put(idName, idValue); - doc.put(partitionKeyName, idValue); - - Flux> responsesFlux = container.executeBulkOperations(Flux.just( - CosmosBulkOperations.getCreateItemOperation(doc, new PartitionKey(idValue)) - )); - - List> responses = responsesFlux.collectList().block(); - - assertThat(responses.size()).isEqualTo(1); - assertThat(responses.get(0).getException()).isNull(); - CosmosBulkItemResponse bulkResponse = responses.get(0).getResponse(); - assertThat(bulkResponse.isSuccessStatusCode()).isEqualTo(true); - assertThinClientEndpointUsed(bulkResponse.getCosmosDiagnostics()); - } finally { - if (client != null) { - client.close(); - } - } - } - - @Test(groups = {"thinclient"}, retryAnalyzer = FlakyTestRetryAnalyzer.class) - public void testThinClientBatch() { - CosmosAsyncClient client = null; - try { - // If running locally, uncomment these lines - // System.setProperty("COSMOS.THINCLIENT_ENABLED", "true"); - // System.setProperty("COSMOS.HTTP2_ENABLED", "true"); - - client = new CosmosClientBuilder() - .endpoint(TestConfigurations.HOST) - .key(TestConfigurations.MASTER_KEY) - .gatewayMode() - .consistencyLevel(ConsistencyLevel.SESSION) - .buildAsyncClient(); - - CosmosAsyncContainer container = client.getDatabase("db1").getContainer("c2"); - String idName = "id"; - String partitionKeyName = "partitionKey"; - ObjectMapper mapper = new ObjectMapper(); - String pkValue = UUID.randomUUID().toString(); - ObjectNode doc1 = mapper.createObjectNode(); - String idValue1 = UUID.randomUUID().toString(); - doc1.put(idName, idValue1); - doc1.put(partitionKeyName, pkValue); - - ObjectNode doc2 = mapper.createObjectNode(); - String idValue2 = UUID.randomUUID().toString(); - doc2.put(idName, idValue2); - doc2.put(partitionKeyName, pkValue); - - CosmosBatch batch = CosmosBatch.createCosmosBatch(new PartitionKey(pkValue)); - batch.createItemOperation(doc1); - batch.createItemOperation(doc2); - - CosmosBatchResponse response = container - .executeCosmosBatch(batch) - .block(); - - assertThat(response.getStatusCode()).isEqualTo(200); - assertThinClientEndpointUsed(response.getDiagnostics()); - } finally { - if (client != null) { - client.close(); - } - } - } - - @Test(groups = {"thinclient"}, retryAnalyzer = FlakyTestRetryAnalyzer.class) - public void testThinClientIncrementalChangeFeed() { - CosmosAsyncClient client = null; - try { - // If running locally, uncomment these lines -// System.setProperty("COSMOS.THINCLIENT_ENABLED", "true"); -// System.setProperty("COSMOS.HTTP2_ENABLED", "true"); - - client = new CosmosClientBuilder() - .endpoint(TestConfigurations.HOST) - .key(TestConfigurations.MASTER_KEY) - .gatewayMode() - .consistencyLevel(ConsistencyLevel.SESSION) - .buildAsyncClient(); - - CosmosAsyncContainer container = client.getDatabase("db1").getContainer("c2"); - String idName = "id"; - String partitionKeyName = "partitionKey"; - ObjectMapper mapper = new ObjectMapper(); - String pkValue = UUID.randomUUID().toString(); - ObjectNode doc1 = mapper.createObjectNode(); - String idValue1 = UUID.randomUUID().toString(); - doc1.put(idName, idValue1); - doc1.put(partitionKeyName, pkValue); - - ObjectNode doc2 = mapper.createObjectNode(); - String idValue2 = UUID.randomUUID().toString(); - doc2.put(idName, idValue2); - doc2.put(partitionKeyName, pkValue); - - CosmosBatch batch = CosmosBatch.createCosmosBatch(new PartitionKey(pkValue)); - batch.createItemOperation(doc1); - batch.createItemOperation(doc2); - - CosmosBatchResponse response = container - .executeCosmosBatch(batch) - .block(); - - FeedResponse changeFeedResponse = container - .queryChangeFeed(CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange()), ObjectNode.class) - .byPage() - .blockFirst(); - - assertThat(changeFeedResponse).isNotNull(); - assertThat(changeFeedResponse.getResults()).isNotNull(); - assertThat(changeFeedResponse.getResults().size()).isGreaterThanOrEqualTo(1); - assertThinClientEndpointUsed(changeFeedResponse.getCosmosDiagnostics()); - } finally { - if (client != null) { - client.close(); - } - } - } - - private static void assertThinClientEndpointUsed(CosmosDiagnostics diagnostics) { - assertThat(diagnostics).isNotNull(); - - CosmosDiagnosticsContext ctx = diagnostics.getDiagnosticsContext(); - assertThat(ctx).isNotNull(); - - Collection requests = ctx.getRequestInfo(); - assertThat(requests).isNotNull(); - assertThat(requests.size()).isPositive(); - - for (CosmosDiagnosticsRequestInfo requestInfo : requests) { - logger.info( - "Endpoint: {}, RequestType: {}, Partition: {}/{}, ActivityId: {}", - requestInfo.getEndpoint(), - requestInfo.getRequestType(), - requestInfo.getPartitionId(), - requestInfo.getPartitionKeyRangeId(), - requestInfo.getActivityId()); - if (requestInfo.getEndpoint().contains(thinClientEndpointIndicator)) { - return; - } - } - - fail("No request targeting thin client proxy endpoint."); - } - - - @Test(groups = {"thinclient"}, retryAnalyzer = FlakyTestRetryAnalyzer.class) - public void testThinClientDocumentPointOperations() { - CosmosAsyncClient client = null; - try { - // if running locally, uncomment these lines - // System.setProperty("COSMOS.THINCLIENT_ENABLED", "true"); - // System.setProperty("COSMOS.HTTP2_ENABLED", "true"); - - client = new CosmosClientBuilder() - .endpoint(TestConfigurations.HOST) - .key(TestConfigurations.MASTER_KEY) - .gatewayMode() - .consistencyLevel(ConsistencyLevel.SESSION) - .buildAsyncClient(); - - String idName = "id"; - String partitionKeyName = "partitionKey"; - - client.createDatabaseIfNotExists("db1").block(); - - CosmosContainerProperties containerDef = - new CosmosContainerProperties("c2", "/" + partitionKeyName); - ThroughputProperties ruCfg = ThroughputProperties.createManualThroughput(35_000); - - client.getDatabase("db1").createContainerIfNotExists(containerDef, ruCfg).block(); - - CosmosAsyncContainer container = client.getDatabase("db1").getContainer("c2"); - - ObjectMapper mapper = new ObjectMapper(); - ObjectNode doc = mapper.createObjectNode(); - String idValue = UUID.randomUUID().toString(); - doc.put(idName, idValue); - doc.put(partitionKeyName, idValue); - - // create - CosmosItemResponse createResponse = container.createItem(doc).block(); - assertThat(createResponse.getStatusCode()).isEqualTo(201); - assertThat(createResponse.getRequestCharge()).isGreaterThan(0.0); - assertThinClientEndpointUsed(createResponse.getDiagnostics()); - - // read - CosmosItemResponse readResponse = container.readItem(idValue, new PartitionKey(idValue), ObjectNode.class).block(); - assertThat(readResponse.getStatusCode()).isEqualTo(200); - assertThat(readResponse.getRequestCharge()).isGreaterThan(0.0); - assertThinClientEndpointUsed(readResponse.getDiagnostics()); - - ObjectNode doc2 = mapper.createObjectNode(); - String idValue2 = UUID.randomUUID().toString(); - doc2.put(idName, idValue2); - doc2.put(partitionKeyName, idValue); - - // replace - CosmosItemResponse replaceResponse = container.replaceItem(doc2, idValue, new PartitionKey(idValue)).block(); - assertThat(replaceResponse.getStatusCode()).isEqualTo(200); - assertThat(replaceResponse.getRequestCharge()).isGreaterThan(0.0); - assertThinClientEndpointUsed(replaceResponse.getDiagnostics()); - - CosmosItemResponse readAfterReplaceResponse = container.readItem(idValue2, new PartitionKey(idValue), ObjectNode.class).block(); - assertThat(readAfterReplaceResponse.getStatusCode()).isEqualTo(200); - ObjectNode replacedItemFromRead = readAfterReplaceResponse.getItem(); - assertThat(replacedItemFromRead.get(idName).asText()).isEqualTo(idValue2); - assertThat(replacedItemFromRead.get(partitionKeyName).asText()).isEqualTo(idValue); - assertThinClientEndpointUsed(readAfterReplaceResponse.getDiagnostics()); - - ObjectNode doc3 = mapper.createObjectNode(); - doc3.put(idName, idValue2); - doc3.put(partitionKeyName, idValue); - doc3.put("newField", "newValue"); - - // upsert - CosmosItemResponse upsertResponse = container.upsertItem(doc3, new PartitionKey(idValue), new CosmosItemRequestOptions()).block(); - assertThat(upsertResponse.getStatusCode()).isEqualTo(200); - assertThat(upsertResponse.getRequestCharge()).isGreaterThan(0.0); - assertThinClientEndpointUsed(upsertResponse.getDiagnostics()); - - CosmosItemResponse readAfterUpsertResponse = container.readItem(idValue2, new PartitionKey(idValue), ObjectNode.class).block(); - ObjectNode upsertedItemFromRead = readAfterUpsertResponse.getItem(); - assertThat(upsertedItemFromRead.get(idName).asText()).isEqualTo(idValue2); - assertThat(upsertedItemFromRead.get(partitionKeyName).asText()).isEqualTo(idValue); - assertThat(upsertedItemFromRead.get("newField").asText()).isEqualTo("newValue"); - assertThinClientEndpointUsed(readAfterUpsertResponse.getDiagnostics()); - - // patch - CosmosPatchOperations patchOperations = CosmosPatchOperations.create(); - patchOperations.add("/anotherNewField", "anotherNewValue"); - patchOperations.replace("/newField", "patchedNewField"); - CosmosItemResponse patchResponse = container.patchItem(idValue2, new PartitionKey(idValue), patchOperations, ObjectNode.class).block(); - assertThat(patchResponse.getStatusCode()).isEqualTo(200); - assertThat(patchResponse.getRequestCharge()).isGreaterThan(0.0); - assertThinClientEndpointUsed(patchResponse.getDiagnostics()); - - CosmosItemResponse readAfterPatchResponse = container.readItem(idValue2, new PartitionKey(idValue), ObjectNode.class).block(); - ObjectNode patchedItemFromRead = readAfterPatchResponse.getItem(); - assertThat(patchedItemFromRead.get(idName).asText()).isEqualTo(idValue2); - assertThat(patchedItemFromRead.get(partitionKeyName).asText()).isEqualTo(idValue); - assertThat(patchedItemFromRead.get("newField").asText()).isEqualTo("patchedNewField"); - assertThat(patchedItemFromRead.get("anotherNewField").asText()).isEqualTo("anotherNewValue"); - assertThinClientEndpointUsed(readAfterPatchResponse.getDiagnostics()); - - // delete - CosmosItemResponse deleteResponse = container.deleteItem(idValue2, new PartitionKey(idValue)).block(); - assertThat(deleteResponse.getStatusCode()).isEqualTo(204); - assertThat(deleteResponse.getRequestCharge()).isGreaterThan(0.0); - assertThinClientEndpointUsed(deleteResponse.getDiagnostics()); - } finally { - if (client != null) { - client.close(); - } - } - } -} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/PartitionKeyInternalTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/PartitionKeyInternalTest.java index 427d8c763a7d..b565658f1533 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/PartitionKeyInternalTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/PartitionKeyInternalTest.java @@ -14,12 +14,18 @@ import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper; import com.azure.cosmos.implementation.routing.PartitionKeyInternalUtils; +import com.azure.cosmos.implementation.routing.Range; import com.azure.cosmos.implementation.guava25.collect.ImmutableList; import com.azure.cosmos.implementation.guava25.collect.Lists; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.ArrayNode; + import java.util.ArrayList; +import java.util.List; import java.util.function.BiFunction; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -27,6 +33,8 @@ public class PartitionKeyInternalTest { + private static final ObjectMapper MAPPER = new ObjectMapper(); + /** * Tests serialization of empty partition key. */ @@ -473,4 +481,117 @@ private static void verifyEffectivePartitionKeyEncoding(String buffer, int lengt PartitionKeyInternal pk = PartitionKeyInternalUtils.createPartitionKeyInternal(buffer.substring(0, length)); assertThat(PartitionKeyInternalHelper.getEffectivePartitionKeyString(pk, pkDefinition)).isEqualTo(expectedValue); } + + // ==================== convertToSortedEpkRanges Unit Tests ==================== + + private static PartitionKeyDefinition singleHashPkDef() { + PartitionKeyDefinition pkDef = new PartitionKeyDefinition(); + pkDef.setPaths(ImmutableList.of("/pk")); + pkDef.setVersion(PartitionKeyDefinitionVersion.V2); + pkDef.setKind(PartitionKind.HASH); + return pkDef; + } + + @Test(groups = "unit") + public void convertToSortedEpkRangesSingleValueRange() { + // Single PK value range: min=["testValue"], max=["testValue"] + // This is what ServiceInterop returns for WHERE pk = 'testValue' + ObjectNode json = MAPPER.createObjectNode(); + ArrayNode ranges = json.putArray("queryRanges"); + ObjectNode range = ranges.addObject(); + range.putArray("min").add("testValue"); + range.putArray("max").add("testValue"); + range.put("isMinInclusive", true); + range.put("isMaxInclusive", true); + + List> result = PartitionKeyInternalHelper.convertToSortedEpkRanges("queryRanges", json, singleHashPkDef()); + + assertThat(result.size()).isEqualTo(1); + // EPK for a string value is a non-empty hex hash + assertThat(result.get(0).getMin()).isNotNull(); + assertThat(result.get(0).getMin().length()).isGreaterThan(0); + assertThat(result.get(0).getMin()).isEqualTo(result.get(0).getMax()); + assertThat(result.get(0).isMinInclusive()).isTrue(); + assertThat(result.get(0).isMaxInclusive()).isTrue(); + } + + @Test(groups = "unit") + public void convertToSortedEpkRangesMultipleRangesSorted() { + // Multiple ranges that need sorting after EPK conversion + ObjectNode json = MAPPER.createObjectNode(); + ArrayNode ranges = json.putArray("queryRanges"); + + // Add two ranges with different PK values — EPK hash order may differ from insertion order + ObjectNode range1 = ranges.addObject(); + range1.putArray("min").add("zzzValue"); + range1.putArray("max").add("zzzValue"); + range1.put("isMinInclusive", true); + range1.put("isMaxInclusive", true); + + ObjectNode range2 = ranges.addObject(); + range2.putArray("min").add("aaaValue"); + range2.putArray("max").add("aaaValue"); + range2.put("isMinInclusive", true); + range2.put("isMaxInclusive", true); + + List> result = PartitionKeyInternalHelper.convertToSortedEpkRanges("queryRanges", json, singleHashPkDef()); + + assertThat(result.size()).isEqualTo(2); + // Verify sorted by min EPK (ascending) + assertThat(result.get(0).getMin().compareTo(result.get(1).getMin())).isLessThanOrEqualTo(0); + } + + @Test(groups = "unit", expectedExceptions = IllegalStateException.class) + public void convertToSortedEpkRangesMissingQueryRangesThrows() { + // Missing queryRanges property entirely + ObjectNode json = MAPPER.createObjectNode(); + json.put("queryInfo", "someValue"); + + PartitionKeyInternalHelper.convertToSortedEpkRanges("queryRanges", json, singleHashPkDef()); + } + + @Test(groups = "unit", expectedExceptions = IllegalStateException.class) + public void convertToSortedEpkRangesNonArrayQueryRangesThrows() { + // queryRanges is a string instead of array + ObjectNode json = MAPPER.createObjectNode(); + json.put("queryRanges", "notAnArray"); + + PartitionKeyInternalHelper.convertToSortedEpkRanges("queryRanges", json, singleHashPkDef()); + } + + @Test(groups = "unit", expectedExceptions = IllegalStateException.class) + public void convertToSortedEpkRangesNonObjectElementThrows() { + // queryRanges array contains a string instead of object + ObjectNode json = MAPPER.createObjectNode(); + json.putArray("queryRanges").add("notAnObject"); + + PartitionKeyInternalHelper.convertToSortedEpkRanges("queryRanges", json, singleHashPkDef()); + } + + @Test(groups = "unit", expectedExceptions = IllegalStateException.class) + public void convertToSortedEpkRangesNullMinBoundaryThrows() { + // Range with null min boundary + ObjectNode json = MAPPER.createObjectNode(); + ArrayNode ranges = json.putArray("queryRanges"); + ObjectNode range = ranges.addObject(); + range.putNull("min"); + range.putArray("max").add("value"); + range.put("isMinInclusive", true); + range.put("isMaxInclusive", false); + + PartitionKeyInternalHelper.convertToSortedEpkRanges("queryRanges", json, singleHashPkDef()); + } + + @Test(groups = "unit", expectedExceptions = IllegalStateException.class) + public void convertToSortedEpkRangesMissingInclusiveFieldsThrows() { + // Range missing isMinInclusive and isMaxInclusive + ObjectNode json = MAPPER.createObjectNode(); + ArrayNode ranges = json.putArray("queryRanges"); + ObjectNode range = ranges.addObject(); + range.putArray("min").add("value"); + range.putArray("max").add("value"); + // intentionally no isMinInclusive or isMaxInclusive + + PartitionKeyInternalHelper.convertToSortedEpkRanges("queryRanges", json, singleHashPkDef()); + } } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/QueryPlanRetrieverSupportedFeaturesTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/QueryPlanRetrieverSupportedFeaturesTest.java new file mode 100644 index 000000000000..8a90efa89d76 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/QueryPlanRetrieverSupportedFeaturesTest.java @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation.query; + +import org.testng.annotations.Test; + +import java.lang.reflect.Field; + +import static org.assertj.core.api.Assertions.assertThat; + +public class QueryPlanRetrieverSupportedFeaturesTest { + + @Test(groups = {"unit"}) + public void supportedFeaturesIncludeOptimizedCountIfFlag() throws Exception { + String supportedFeatures = getQueryPlanRetrieverString("SUPPORTED_QUERY_FEATURES"); + + assertThat(supportedFeatures) + .contains(QueryFeature.CountIf.name()) + .doesNotContain("HybridSearchSkipOrderByRewrite") + .doesNotContain("ListAndSetAggregate"); + } + + private static String getQueryPlanRetrieverString(String fieldName) throws Exception { + Class queryPlanRetrieverClass = + Class.forName("com.azure.cosmos.implementation.query.QueryPlanRetriever"); + Field field = queryPlanRetrieverClass.getDeclaredField(fieldName); + field.setAccessible(true); + return (String) field.get(null); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 66b6e6314ad0..ea9058ecd20d 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -180,6 +180,7 @@ protected static void executeWithRetry(Runnable action, int maxRetries, String c protected static final ImmutableList protocols; protected static final AzureKeyCredential credential; + protected static final boolean useAadAuth; protected int subscriberValidationTimeout = TIMEOUT; @@ -267,12 +268,21 @@ protected static CosmosAsyncContainer getSharedSinglePartitionCosmosContainer(Co objectMapper.configure(JsonParser.Feature.STRICT_DUPLICATE_DETECTION, true); credential = new AzureKeyCredential(TestConfigurations.MASTER_KEY); + useAadAuth = Boolean.parseBoolean(System.getProperty("COSMOS.USE_AAD_AUTH", "false")) + || Boolean.parseBoolean(System.getenv("COSMOS_USE_AAD_AUTH")); } private static ImmutableList immutableListOrNull(List list) { return list != null ? ImmutableList.copyOf(list) : null; } + protected static CosmosClientBuilder applyCredential(CosmosClientBuilder builder) { + if (useAadAuth) { + return builder.credential(new com.azure.identity.DefaultAzureCredentialBuilder().build()); + } + return builder.credential(credential); + } + private static class DatabaseManagerImpl implements CosmosDatabaseForTest.DatabaseManager { public static DatabaseManagerImpl getInstance(CosmosAsyncClient client) { return new DatabaseManagerImpl(client); @@ -1320,6 +1330,22 @@ public static Object[][] clientBuildersWithGatewayAndHttp2() { }; } + /** + * Data provider for thin client tests. Returns a gateway + HTTP/2 builder. + * Tests using this provider should enable thin client mode in their @BeforeClass + * by calling {@code System.setProperty("COSMOS.THINCLIENT_ENABLED", "true")} and + * clean up in @AfterClass with {@code System.clearProperty("COSMOS.THINCLIENT_ENABLED")}. + * + *

This provider can be adopted by existing test classes (e.g., query, stored procedure tests) + * to gradually add thin client coverage using the same test logic.

+ */ + @DataProvider + public static Object[][] clientBuildersWithThinClient() { + return new Object[][]{ + {createGatewayRxDocumentClient(TestConfigurations.HOST, null, true, null, true, true, true)}, + }; + } + @DataProvider public static Object[][] clientBuildersWithSessionConsistency() { return new Object[][]{ @@ -1646,12 +1672,11 @@ static protected CosmosClientBuilder createGatewayHouseKeepingDocumentClient(boo ThrottlingRetryOptions options = new ThrottlingRetryOptions(); options.setMaxRetryWaitTime(Duration.ofSeconds(SUITE_SETUP_TIMEOUT)); GatewayConnectionConfig gatewayConnectionConfig = new GatewayConnectionConfig(); - return new CosmosClientBuilder().endpoint(TestConfigurations.HOST) - .credential(credential) + return applyCredential(new CosmosClientBuilder().endpoint(TestConfigurations.HOST) .gatewayMode(gatewayConnectionConfig) .throttlingRetryOptions(options) .contentResponseOnWriteEnabled(contentResponseOnWriteEnabled) - .consistencyLevel(ConsistencyLevel.SESSION); + .consistencyLevel(ConsistencyLevel.SESSION)); } static protected CosmosClientBuilder createGatewayRxDocumentClient( @@ -1690,13 +1715,12 @@ static protected CosmosClientBuilder createGatewayRxDocumentClient( gatewayConnectionConfig.setHttp2ConnectionConfig(http2ConnectionConfig); } - CosmosClientBuilder builder = new CosmosClientBuilder().endpoint(endpoint) - .credential(credential) + CosmosClientBuilder builder = applyCredential(new CosmosClientBuilder().endpoint(endpoint) .gatewayMode(gatewayConnectionConfig) .multipleWriteRegionsEnabled(multiMasterEnabled) .preferredRegions(preferredRegions) .contentResponseOnWriteEnabled(contentResponseOnWriteEnabled) - .consistencyLevel(consistencyLevel); + .consistencyLevel(consistencyLevel)); ImplementationBridgeHelpers .CosmosClientBuilderHelper .getCosmosClientBuilderAccessor() @@ -1719,11 +1743,12 @@ static protected CosmosClientBuilder createDirectRxDocumentClient(ConsistencyLev List preferredRegions, boolean contentResponseOnWriteEnabled, boolean retryOnThrottledRequests) { - CosmosClientBuilder builder = new CosmosClientBuilder().endpoint(TestConfigurations.HOST) - .credential(credential) - .directMode(DirectConnectionConfig.getDefaultConfig()) - .contentResponseOnWriteEnabled(contentResponseOnWriteEnabled) - .consistencyLevel(consistencyLevel); + CosmosClientBuilder builder = applyCredential( + new CosmosClientBuilder() + .endpoint(TestConfigurations.HOST) + .directMode(DirectConnectionConfig.getDefaultConfig()) + .contentResponseOnWriteEnabled(contentResponseOnWriteEnabled) + .consistencyLevel(consistencyLevel)); if (preferredRegions != null) { builder.preferredRegions(preferredRegions); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ThinClientChangeFeedE2ETest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ThinClientChangeFeedE2ETest.java new file mode 100644 index 000000000000..8a24adb46121 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ThinClientChangeFeedE2ETest.java @@ -0,0 +1,120 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.rx; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnostics; +import com.azure.cosmos.models.CosmosBatch; +import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; +import com.azure.cosmos.models.FeedRange; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.models.PartitionKey; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** + * Thin client E2E tests for change feed operations. + * Container is truncated in {@code @BeforeClass} — no per-test cleanup needed. + */ +public class ThinClientChangeFeedE2ETest extends ThinClientTestBase { + + @Factory(dataProvider = "clientBuildersWithGatewayAndHttp2") + public ThinClientChangeFeedE2ETest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testThinClientIncrementalChangeFeed() { + String pkValue = UUID.randomUUID().toString(); + ObjectNode doc1 = createTestDocument(UUID.randomUUID().toString(), pkValue); + ObjectNode doc2 = createTestDocument(UUID.randomUUID().toString(), pkValue); + + CosmosBatch batch = CosmosBatch.createCosmosBatch(new PartitionKey(pkValue)); + batch.createItemOperation(doc1); + batch.createItemOperation(doc2); + container.executeCosmosBatch(batch).block(); + + // Scope change feed to the specific logical partition to avoid + // consuming changes from other tests or partitions. + CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(pkValue))); + + List changeFeedResults = new ArrayList<>(); + List allDiag = new ArrayList<>(); + for (FeedResponse page : container.queryChangeFeed(options, ObjectNode.class).byPage().toIterable()) { + changeFeedResults.addAll(page.getResults()); + allDiag.add(page.getCosmosDiagnostics()); + if (page.getResults().isEmpty()) { + break; + } + } + + assertThat(changeFeedResults.size()).isGreaterThanOrEqualTo(2); + for (CosmosDiagnostics d : allDiag) { + assertThinClientEndpointUsed(d); + } + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testThinClientChangeFeedFullRange() { + // Insert docs across two different partition keys so the full-range feed spans multiple partitions. + String pk1 = "cfFullRange1_" + UUID.randomUUID().toString().substring(0, 8); + String pk2 = "cfFullRange2_" + UUID.randomUUID().toString().substring(0, 8); + container.createItem(createTestDocument(UUID.randomUUID().toString(), pk1)).block(); + container.createItem(createTestDocument(UUID.randomUUID().toString(), pk2)).block(); + + CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning(FeedRange.forFullRange()); + + List changeFeedResults = new ArrayList<>(); + List allDiag = new ArrayList<>(); + for (FeedResponse page : container.queryChangeFeed(options, ObjectNode.class).byPage().toIterable()) { + changeFeedResults.addAll(page.getResults()); + allDiag.add(page.getCosmosDiagnostics()); + if (page.getResults().isEmpty()) { + break; + } + } + + assertThat(changeFeedResults.size()).isGreaterThanOrEqualTo(2); + for (CosmosDiagnostics d : allDiag) { + assertThinClientEndpointUsed(d); + } + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testThinClientChangeFeedPartitionKey() { + String pkValue = "cfPk_" + UUID.randomUUID().toString().substring(0, 8); + container.createItem(createTestDocument(UUID.randomUUID().toString(), pkValue)).block(); + container.createItem(createTestDocument(UUID.randomUUID().toString(), pkValue)).block(); + + CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(pkValue))); + + List changeFeedResults = new ArrayList<>(); + List allDiag = new ArrayList<>(); + for (FeedResponse page : container.queryChangeFeed(options, ObjectNode.class).byPage().toIterable()) { + changeFeedResults.addAll(page.getResults()); + allDiag.add(page.getCosmosDiagnostics()); + if (page.getResults().isEmpty()) { + break; + } + } + + // Should only see the 2 docs from this partition key + assertThat(changeFeedResults.size()).isEqualTo(2); + for (ObjectNode result : changeFeedResults) { + assertThat(result.get(PARTITION_KEY_FIELD).asText()).isEqualTo(pkValue); + } + for (CosmosDiagnostics d : allDiag) { + assertThinClientEndpointUsed(d); + } + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ThinClientPointOperationE2ETest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ThinClientPointOperationE2ETest.java new file mode 100644 index 000000000000..1233f72dacfb --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ThinClientPointOperationE2ETest.java @@ -0,0 +1,120 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.rx; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.models.CosmosBatch; +import com.azure.cosmos.models.CosmosBatchResponse; +import com.azure.cosmos.models.CosmosBulkItemResponse; +import com.azure.cosmos.models.CosmosBulkOperationResponse; +import com.azure.cosmos.models.CosmosBulkOperations; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosPatchOperations; +import com.azure.cosmos.models.PartitionKey; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; +import reactor.core.publisher.Flux; + +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** + * Thin client E2E tests for point operations: Create, Read, Replace, Upsert, Patch, Delete, Bulk, Batch. + * Container is truncated in {@code @BeforeClass} — no per-test cleanup needed. + */ +public class ThinClientPointOperationE2ETest extends ThinClientTestBase { + + @Factory(dataProvider = "clientBuildersWithGatewayAndHttp2") + public ThinClientPointOperationE2ETest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testThinClientDocumentPointOperations() { + String idValue = UUID.randomUUID().toString(); + ObjectNode doc = createTestDocument(idValue, idValue); + + // create + CosmosItemResponse createResponse = container.createItem(doc).block(); + assertThat(createResponse.getStatusCode()).isEqualTo(201); + assertThat(createResponse.getRequestCharge()).isGreaterThan(0.0); + assertThinClientEndpointUsed(createResponse.getDiagnostics()); + + // read + CosmosItemResponse readResponse = container.readItem(idValue, new PartitionKey(idValue), ObjectNode.class).block(); + assertThat(readResponse.getStatusCode()).isEqualTo(200); + assertThinClientEndpointUsed(readResponse.getDiagnostics()); + + String idValue2 = UUID.randomUUID().toString(); + ObjectNode doc2 = createTestDocument(idValue2, idValue); + + // replace + CosmosItemResponse replaceResponse = container.replaceItem(doc2, idValue, new PartitionKey(idValue)).block(); + assertThat(replaceResponse.getStatusCode()).isEqualTo(200); + assertThinClientEndpointUsed(replaceResponse.getDiagnostics()); + + // upsert + ObjectNode doc3 = createTestDocument(idValue2, idValue); + doc3.put("newField", "newValue"); + CosmosItemResponse upsertResponse = container.upsertItem(doc3, new PartitionKey(idValue), new CosmosItemRequestOptions()).block(); + assertThat(upsertResponse.getStatusCode()).isEqualTo(200); + assertThinClientEndpointUsed(upsertResponse.getDiagnostics()); + + CosmosItemResponse readAfterUpsertResponse = container.readItem(idValue2, new PartitionKey(idValue), ObjectNode.class).block(); + assertThat(readAfterUpsertResponse.getItem().get("newField").asText()).isEqualTo("newValue"); + + // patch + CosmosPatchOperations patchOperations = CosmosPatchOperations.create(); + patchOperations.add("/anotherNewField", "anotherNewValue"); + patchOperations.replace("/newField", "patchedNewField"); + CosmosItemResponse patchResponse = container.patchItem(idValue2, new PartitionKey(idValue), patchOperations, ObjectNode.class).block(); + assertThat(patchResponse.getStatusCode()).isEqualTo(200); + assertThinClientEndpointUsed(patchResponse.getDiagnostics()); + + CosmosItemResponse readAfterPatchResponse = container.readItem(idValue2, new PartitionKey(idValue), ObjectNode.class).block(); + assertThat(readAfterPatchResponse.getItem().get("newField").asText()).isEqualTo("patchedNewField"); + assertThat(readAfterPatchResponse.getItem().get("anotherNewField").asText()).isEqualTo("anotherNewValue"); + + // delete + CosmosItemResponse deleteResponse = container.deleteItem(idValue2, new PartitionKey(idValue)).block(); + assertThat(deleteResponse.getStatusCode()).isEqualTo(204); + assertThinClientEndpointUsed(deleteResponse.getDiagnostics()); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testThinClientBulk() { + String idValue = UUID.randomUUID().toString(); + ObjectNode doc = createTestDocument(idValue, idValue); + + Flux> responsesFlux = container.executeBulkOperations(Flux.just( + CosmosBulkOperations.getCreateItemOperation(doc, new PartitionKey(idValue)) + )); + + List> responses = responsesFlux.collectList().block(); + assertThat(responses.size()).isEqualTo(1); + CosmosBulkItemResponse bulkResponse = responses.get(0).getResponse(); + assertThat(bulkResponse.isSuccessStatusCode()).isEqualTo(true); + assertThinClientEndpointUsed(bulkResponse.getCosmosDiagnostics()); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testThinClientBatch() { + String pkValue = UUID.randomUUID().toString(); + String idValue1 = UUID.randomUUID().toString(); + String idValue2 = UUID.randomUUID().toString(); + ObjectNode doc1 = createTestDocument(idValue1, pkValue); + ObjectNode doc2 = createTestDocument(idValue2, pkValue); + + CosmosBatch batch = CosmosBatch.createCosmosBatch(new PartitionKey(pkValue)); + batch.createItemOperation(doc1); + batch.createItemOperation(doc2); + + CosmosBatchResponse response = container.executeCosmosBatch(batch).block(); + assertThat(response.getStatusCode()).isEqualTo(200); + assertThinClientEndpointUsed(response.getDiagnostics()); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ThinClientQueryE2ETest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ThinClientQueryE2ETest.java new file mode 100644 index 000000000000..ffa1246ef368 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ThinClientQueryE2ETest.java @@ -0,0 +1,1209 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.rx; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosAsyncDatabase; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnostics; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.models.CosmosBulkOperations; +import com.azure.cosmos.models.CosmosContainerProperties; +import com.azure.cosmos.models.CosmosFullTextIndex; +import com.azure.cosmos.models.CosmosFullTextPath; +import com.azure.cosmos.models.CosmosFullTextPolicy; +import com.azure.cosmos.models.CosmosItemOperation; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosVectorDataType; +import com.azure.cosmos.models.CosmosVectorDistanceFunction; +import com.azure.cosmos.models.CosmosVectorEmbedding; +import com.azure.cosmos.models.CosmosVectorEmbeddingPolicy; +import com.azure.cosmos.models.CosmosVectorIndexSpec; +import com.azure.cosmos.models.CosmosVectorIndexType; +import com.azure.cosmos.models.ExcludedPath; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.models.IncludedPath; +import com.azure.cosmos.models.IndexingMode; +import com.azure.cosmos.models.IndexingPolicy; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.PartitionKeyDefinition; +import com.azure.cosmos.models.SqlParameter; +import com.azure.cosmos.models.SqlQuerySpec; +import com.azure.cosmos.models.ThroughputProperties; +import com.azure.cosmos.implementation.TestConfigurations; +import com.azure.cosmos.implementation.directconnectivity.Protocol; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import static com.azure.cosmos.rx.ThinClientTestBase.assertThinClientEndpointUsed; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.Fail.fail; + +/** + * Thin client query E2E tests comparing Direct TCP (baseline) vs Gateway V2 (thin client). + *

+ * Each query is executed through both connection modes and results are compared: + *

    + *
  • Direct TCP — baseline, runs against backend partition replicas.
  • + *
  • Gateway V2 (thin client) — system under test, routes through proxy (:10250), + * proxy returns raw PartitionKeyInternal arrays, SDK converts to EPK client-side.
  • + *
+ * Assertions: + *
    + *
  1. Gateway V2 requests routed through the :10250 thin client endpoint.
  2. + *
  3. Result set sizes match between Direct and Gateway V2.
  4. + *
  5. Result set contents match (document IDs in order for ordered queries, set equality for unordered).
  6. + *
+ */ +public class ThinClientQueryE2ETest extends TestSuiteBase { + + private CosmosAsyncClient directClient; // Baseline: Direct TCP + private CosmosAsyncClient thinClient; // SUT: Gateway V2 (thin client) + private CosmosAsyncContainer directContainer; + private CosmosAsyncContainer thinClientContainer; + + private final List seededDocs = new ArrayList<>(); + private final String commonPk = "tc-query-" + UUID.randomUUID().toString().substring(0, 8); + + // Use constants and helpers from ThinClientTestBase to avoid duplication. + private static final String ID_FIELD = ThinClientTestBase.ID_FIELD; + private static final String PK_FIELD = ThinClientTestBase.PARTITION_KEY_FIELD; + private static final ObjectMapper OBJECT_MAPPER = ThinClientTestBase.OBJECT_MAPPER; + + @BeforeClass(groups = {"thinclient"}, timeOut = SETUP_TIMEOUT * 2) + public void before_ThinClientQueryE2ETest() { + try { + // 1. Direct TCP client (baseline) + CosmosClientBuilder directBuilder = createDirectRxDocumentClient(ConsistencyLevel.SESSION, Protocol.TCP, false, null, true, true); + this.directClient = directBuilder.buildAsyncClient(); + this.directContainer = getSharedMultiPartitionCosmosContainer(this.directClient); + + // 2. Gateway V2 thin client (system under test) + ThinClientTestBase.enableThinClientForTest(); + CosmosClientBuilder thinBuilder = createGatewayRxDocumentClient( + TestConfigurations.HOST, null, true, null, true, true, true); + this.thinClient = thinBuilder.buildAsyncClient(); + this.thinClientContainer = this.thinClient.getDatabase( + directContainer.getDatabase().getId()).getContainer(directContainer.getId()); + + // 3. Clean up shared container to prevent cross-test-class pollution + cleanUpContainer(this.directContainer); + + // 4. Seed diverse test data for broad query coverage + seedTestData(); + } catch (Exception e) { + // Clean up any clients that were successfully created before the failure + if (this.thinClient != null) { this.thinClient.close(); this.thinClient = null; } + if (this.directClient != null) { this.directClient.close(); this.directClient = null; } + throw e; + } + } + + /** + * Seeds 10 documents into the shared container with the following schema: + *
+     * {
+     *   "id":        "tcdoc-{i}-{uuid}",  // unique document ID
+     *   "mypk":      "{commonPk}",         // partition key — same for all seeded docs
+     *   "category":  string,               // one of: electronics, books, clothing, toys
+     *   "status":    string,               // "active" or "inactive"
+     *   "age":       int,                  // range: 8–61
+     *   "price":     double,               // range: 7.50–549.99
+     *   "idx":       int,                  // sequential index 0–9
+     *   "isActive":  boolean,              // derived from status == "active"
+     *   "address":   { "city": string, "zip": int },  // nested object
+     *   "scores":    [int, int],           // two-element int array: [i*10, i*10+5]
+     *   "tags":      [string, ...]         // variable-length string array
+     * }
+     * 
+ */ + private void seedTestData() { + String[] categories = {"electronics", "books", "clothing", "electronics", "books", + "clothing", "electronics", "toys", "toys", "books"}; + String[] statuses = {"active", "inactive", "active", "active", "inactive", + "active", "inactive", "active", "active", "active"}; + int[] ages = {25, 30, 17, 42, 55, 19, 38, 12, 8, 61}; + double[] prices = {99.99, 14.50, 45.00, 299.99, 9.99, 25.00, 549.99, 19.99, 7.50, 22.00}; + + for (int i = 0; i < 10; i++) { + String docId = "tcdoc-" + i + "-" + UUID.randomUUID().toString().substring(0, 8); + ObjectNode doc = OBJECT_MAPPER.createObjectNode(); + doc.put(ID_FIELD, docId); + doc.put(PK_FIELD, commonPk); + doc.put("category", categories[i]); + doc.put("status", statuses[i]); + doc.put("age", ages[i]); + doc.put("price", prices[i]); + doc.put("idx", i); + doc.put("isActive", statuses[i].equals("active")); + + ObjectNode address = OBJECT_MAPPER.createObjectNode(); + address.put("city", i % 2 == 0 ? "Seattle" : "Portland"); + address.put("zip", 98100 + i); + doc.set("address", address); + + doc.putArray("scores").add(i * 10).add(i * 10 + 5); + + // Tags array for JOIN/EXISTS tests — varies per doc + ArrayNode tags = doc.putArray("tags"); + tags.add(categories[i]); // first tag matches category + if (i % 2 == 0) tags.add("on-sale"); + if (i % 3 == 0) tags.add("featured"); + + seededDocs.add(doc); + } + + bulkInsert(directContainer, seededDocs).blockLast(); + } + + @AfterClass(groups = {"thinclient"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + if (directContainer != null && !seededDocs.isEmpty()) { + try { + List deleteOps = seededDocs.stream() + .map(doc -> CosmosBulkOperations.getDeleteItemOperation( + doc.get(ID_FIELD).asText(), new PartitionKey(commonPk))) + .collect(Collectors.toList()); + directContainer.executeBulkOperations(Flux.fromIterable(deleteOps)).blockLast(); + } catch (Exception e) { + logger.warn("Bulk delete of seeded docs failed: {}", e.getMessage()); + } + } + ThinClientTestBase.clearThinClientForTest(); + if (this.thinClient != null) { this.thinClient.close(); } + if (this.directClient != null) { this.directClient.close(); } + } + + // ==================== Equality & Filter Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testSelectAll() { + assertDirectAndThinClientMatch("SELECT * FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testWhereEquality() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE c.category = 'electronics'"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testWhereEqualityParameterized() { + SqlQuerySpec qs = new SqlQuerySpec("SELECT * FROM c WHERE c.category = @cat"); + qs.setParameters(Arrays.asList(new SqlParameter("@cat", "books"))); + assertDirectAndThinClientMatch(qs, partitionedOptions()); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testWhereRangeGreaterThan() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE c.age > 30"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testWhereRangeLessThanOrEqual() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE c.price <= 25.00"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testWhereRangeBetween() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE c.age >= 18 AND c.age <= 40"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testWhereIn() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE c.category IN ('electronics', 'toys')"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testWhereCompoundAndOr() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE c.status = 'active' AND (c.category = 'electronics' OR c.category = 'books')"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testWhereNotEqual() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE c.status != 'inactive'"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testWhereBooleanField() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE c.isActive = true"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testWhereIsDefined() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE IS_DEFINED(c.address)"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testWhereStartsWith() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE STARTSWITH(c.category, 'elec')"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testWhereContains() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE CONTAINS(c.category, 'ook')"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testWhereArrayContains() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE ARRAY_CONTAINS(c.scores, 50)"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testWhereNestedProperty() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE c.address.city = 'Seattle'"); + } + + // ==================== Projection Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testSelectSpecificFields() { + String query = "SELECT c.id, c.category, c.price FROM c"; + QueryResult gwResult = drainQuery(directContainer, query, partitionedOptions(), ObjectNode.class); + QueryResult tcResult = drainQuery(thinClientContainer, query, partitionedOptions(), ObjectNode.class); + for (CosmosDiagnostics d : tcResult.diagnostics) { assertThinClientEndpointUsed(d); } + + assertThat(tcResult.results.size()).as("Count mismatch: " + query).isEqualTo(gwResult.results.size()); + for (int i = 0; i < gwResult.results.size(); i++) { + assertThat(tcResult.results.get(i).get("category").asText()).isEqualTo(gwResult.results.get(i).get("category").asText()); + assertThat(tcResult.results.get(i).get("price").asDouble()).isEqualTo(gwResult.results.get(i).get("price").asDouble()); + } + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testSelectComputedAlias() { + String query = "SELECT c.id, c.price * 1.1 AS taxedPrice FROM c"; + QueryResult gwResult = drainQuery(directContainer, query, partitionedOptions(), ObjectNode.class); + QueryResult tcResult = drainQuery(thinClientContainer, query, partitionedOptions(), ObjectNode.class); + for (CosmosDiagnostics d : tcResult.diagnostics) { assertThinClientEndpointUsed(d); } + + assertThat(tcResult.results.size()).as("Count mismatch: " + query).isEqualTo(gwResult.results.size()); + } + + // ==================== ORDER BY Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testOrderByAsc() { + assertDirectAndThinClientMatch("SELECT * FROM c ORDER BY c.age"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testOrderByDesc() { + assertDirectAndThinClientMatch("SELECT * FROM c ORDER BY c.price DESC"); + } + + // ==================== DISTINCT Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testDistinctValue() { + assertScalarDirectAndThinClientMatch("SELECT DISTINCT VALUE c.category FROM c", String.class); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testDistinctValueBoolean() { + assertScalarDirectAndThinClientMatch("SELECT DISTINCT VALUE c.isActive FROM c", Boolean.class); + } + + // ==================== TOP Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testTop() { + assertDirectAndThinClientMatch("SELECT TOP 3 * FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testTopWithOrderBy() { + assertDirectAndThinClientMatch("SELECT TOP 5 * FROM c ORDER BY c.price DESC"); + } + + // ==================== Aggregate Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testCount() { + assertScalarDirectAndThinClientMatch("SELECT VALUE COUNT(1) FROM c", Integer.class); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testSum() { + assertScalarDirectAndThinClientMatch("SELECT VALUE SUM(c.price) FROM c", Double.class); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testAvg() { + assertScalarDirectAndThinClientMatch("SELECT VALUE AVG(c.age) FROM c", Double.class); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testMin() { + assertScalarDirectAndThinClientMatch("SELECT VALUE MIN(c.price) FROM c", Double.class); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testMax() { + assertScalarDirectAndThinClientMatch("SELECT VALUE MAX(c.age) FROM c", Integer.class); + } + + // ==================== GROUP BY Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testGroupByCount() { + assertGroupByDirectAndThinClientMatch("SELECT c.category, COUNT(1) as cnt FROM c GROUP BY c.category", "category"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testGroupBySumAvg() { + assertGroupByDirectAndThinClientMatch("SELECT c.category, SUM(c.price) as total, AVG(c.price) as avg FROM c GROUP BY c.category", "category"); + } + + // ==================== OFFSET / LIMIT Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testOffsetLimit() { + assertDirectAndThinClientMatch("SELECT * FROM c ORDER BY c.idx OFFSET 3 LIMIT 4"); + } + + // ==================== JOIN Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testJoinScoresArray() { + // Self-join on scores array — produces one row per array element + assertDirectAndThinClientMatch("SELECT c.id, s AS score FROM c JOIN s IN c.scores"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testJoinWithFilter() { + // Self-join with WHERE filter on the joined element + assertDirectAndThinClientMatch("SELECT c.id, s AS score FROM c JOIN s IN c.scores WHERE s >= 50"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testJoinTagsArray() { + // Self-join on tags string array + assertDirectAndThinClientMatch("SELECT c.id, t AS tag FROM c JOIN t IN c.tags"); + } + + // ==================== EXISTS Subquery Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testExistsSubquery() { + // Docs pattern: use EXISTS to check if any array element matches + assertDirectAndThinClientMatch( + "SELECT * FROM c WHERE EXISTS (SELECT VALUE s FROM s IN c.scores WHERE s > 60)"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testExistsSubqueryWithStringMatch() { + // EXISTS on tags array with string match + assertDirectAndThinClientMatch( + "SELECT * FROM c WHERE EXISTS (SELECT VALUE t FROM t IN c.tags WHERE t = 'on-sale')"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testExistsAliasInProjection() { + // EXISTS aliased in SELECT — returns boolean column + assertDirectAndThinClientMatch( + "SELECT c.id, EXISTS (SELECT VALUE s FROM s IN c.scores WHERE s > 60) AS hasHighScore FROM c"); + } + + // ==================== LIKE Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testLikePrefix() { + // LIKE with prefix pattern + assertDirectAndThinClientMatch("SELECT * FROM c WHERE c.category LIKE 'elec%'"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testLikeSuffix() { + // LIKE with suffix pattern + assertDirectAndThinClientMatch("SELECT * FROM c WHERE c.category LIKE '%ing'"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testLikeContains() { + // LIKE with contains pattern (substring match via wildcards) + assertDirectAndThinClientMatch("SELECT * FROM c WHERE c.category LIKE '%ook%'"); + } + + // ==================== BETWEEN Keyword ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testBetween() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE c.age BETWEEN 18 AND 40"); + } + + // ==================== String Function Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testStringConcat() { + assertDirectAndThinClientMatch("SELECT CONCAT(c.category, '-', c.status) AS label FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testStringEndsWith() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE ENDSWITH(c.category, 'ics')"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testStringLower() { + assertDirectAndThinClientMatch("SELECT LOWER(c.category) AS lowerCat FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testStringUpper() { + assertDirectAndThinClientMatch("SELECT UPPER(c.status) AS upperStatus FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testStringLength() { + assertDirectAndThinClientMatch("SELECT c.category, LENGTH(c.category) AS len FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testStringSubstring() { + assertDirectAndThinClientMatch("SELECT SUBSTRING(c.category, 0, 4) AS prefix FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testStringReplace() { + assertDirectAndThinClientMatch("SELECT REPLACE(c.category, 'o', '0') AS replaced FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testStringIndexOf() { + assertDirectAndThinClientMatch("SELECT INDEX_OF(c.category, 'o') AS pos FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testStringLeft() { + assertDirectAndThinClientMatch("SELECT LEFT(c.category, 3) AS l FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testStringReverse() { + assertDirectAndThinClientMatch("SELECT REVERSE(c.category) AS rev FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testStringTrim() { + assertDirectAndThinClientMatch("SELECT TRIM(c.status) AS trimmed FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testRegexMatch() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE RegexMatch(c.category, '^elec.*')"); + } + + // ==================== Type Checking Function Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testIsArray() { + assertDirectAndThinClientMatch("SELECT c.id, IS_ARRAY(c.scores) AS isArr FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testIsBool() { + assertDirectAndThinClientMatch("SELECT c.id, IS_BOOL(c.isActive) AS isBool FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testIsNull() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE IS_NULL(c.nonExistentField)"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testIsNumber() { + assertDirectAndThinClientMatch("SELECT c.id, IS_NUMBER(c.age) AS isNum FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testIsString() { + assertDirectAndThinClientMatch("SELECT c.id, IS_STRING(c.category) AS isStr FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testIsObject() { + assertDirectAndThinClientMatch("SELECT c.id, IS_OBJECT(c.address) AS isObj FROM c"); + } + + // ==================== Math Function Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testMathAbs() { + assertDirectAndThinClientMatch("SELECT ABS(c.age - 30) AS diff FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testMathCeilingFloor() { + assertDirectAndThinClientMatch("SELECT CEILING(c.price) AS ceil, FLOOR(c.price) AS flr FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testMathRound() { + assertDirectAndThinClientMatch("SELECT ROUND(c.price) AS rounded FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testMathPower() { + assertDirectAndThinClientMatch("SELECT POWER(c.age, 2) AS ageSq FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testMathSqrt() { + assertDirectAndThinClientMatch("SELECT SQRT(c.price) AS sqrtPrice FROM c"); + } + + // ==================== Array Function Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testArrayLength() { + assertDirectAndThinClientMatch("SELECT c.id, ARRAY_LENGTH(c.scores) AS len FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testArraySlice() { + assertDirectAndThinClientMatch("SELECT c.id, ARRAY_SLICE(c.tags, 0, 1) AS firstTag FROM c"); + } + + // ==================== Conditional Function Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testIif() { + assertDirectAndThinClientMatch("SELECT c.id, IIF(c.age >= 18, 'adult', 'minor') AS ageGroup FROM c"); + } + + // ==================== Date/Time Function Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testGetCurrentDateTime() { + // Only assert both paths return a non-empty ISO 8601 string — exact values + // will differ because gateway and proxy execute at slightly different times. + QueryResult gwResult = drainQuery(directContainer, + "SELECT VALUE GetCurrentDateTime()", partitionedOptions(), String.class); + QueryResult tcResult = drainQuery(thinClientContainer, + "SELECT VALUE GetCurrentDateTime()", partitionedOptions(), String.class); + for (CosmosDiagnostics d : tcResult.diagnostics) { assertThinClientEndpointUsed(d); } + assertThat(gwResult.results.size()).isEqualTo(1); + assertThat(tcResult.results.size()).isEqualTo(1); + assertThat(gwResult.results.get(0)).matches("\\d{4}-\\d{2}-\\d{2}T.*Z"); + assertThat(tcResult.results.get(0)).matches("\\d{4}-\\d{2}-\\d{2}T.*Z"); + } + + // ==================== SELECT VALUE / Nested Projection Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testSelectValueObject() { + assertDirectAndThinClientMatch( + "SELECT VALUE { name: c.category, loc: c.address.city } FROM c"); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testSelectValueScalar() { + assertScalarDirectAndThinClientMatch("SELECT VALUE c.category FROM c", String.class); + } + + // ==================== Cross-Partition Tests ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testCrossPartitionSelectAll() { + assertDirectAndThinClientMatch("SELECT * FROM c ORDER BY c.idx", new CosmosQueryRequestOptions()); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testCrossPartitionWhereFilter() { + assertDirectAndThinClientMatch("SELECT * FROM c WHERE c.category = 'electronics' ORDER BY c.idx", + new CosmosQueryRequestOptions()); + } + + // ==================== Multi-EPK-Range Tests (Sort Validation) ==================== + // These tests use a dedicated 24,000 RU/s container (3 physical partitions) to ensure + // documents with different partition keys land on different physical partitions. + // After PartitionKeyInternal → EPK hash conversion, the sort in + // parseQueryRangesForThinClient() ensures RoutingMapProviderHelper.getOverlappingRanges() + // doesn't throw IllegalArgumentException for unsorted ranges. + + /** + * Helper: creates a 24K RU container, runs the test, deletes the container. + */ + private void runMultiRangeTest(String[] pkValues, String queryTemplate, int expectedCount) { + String containerId = "multiRange_" + UUID.randomUUID().toString().substring(0, 8); + CosmosAsyncDatabase db = directClient.getDatabase(directContainer.getDatabase().getId()); + CosmosAsyncContainer directTestContainer = db.getContainer(containerId); + + try { + PartitionKeyDefinition pkDef = new PartitionKeyDefinition(); + pkDef.setPaths(Collections.singletonList("/" + PK_FIELD)); + CosmosContainerProperties props = new CosmosContainerProperties(containerId, pkDef); + db.createContainer(props, ThroughputProperties.createManualThroughput(24000)).block(); + + CosmosAsyncContainer tcContainer = thinClient.getDatabase(db.getId()).getContainer(containerId); + + for (int i = 0; i < pkValues.length; i++) { + String docId = "mr-" + i + "-" + UUID.randomUUID().toString().substring(0, 8); + ObjectNode doc = OBJECT_MAPPER.createObjectNode(); + doc.put(ID_FIELD, docId); + doc.put(PK_FIELD, pkValues[i]); + doc.put("idx", i); + doc.put("val", i * 100); + directTestContainer.createItem(doc, new PartitionKey(pkValues[i]), null).block(); + } + + String query = queryTemplate; + + QueryResult directResult = drainQuery(directTestContainer, query, new CosmosQueryRequestOptions(), ObjectNode.class); + QueryResult tcResult = drainQuery(tcContainer, query, new CosmosQueryRequestOptions(), ObjectNode.class); + + for (CosmosDiagnostics d : tcResult.diagnostics) { assertThinClientEndpointUsed(d); } + + assertThat(tcResult.results.size()).as("Multi-range count mismatch for: " + query).isEqualTo(directResult.results.size()); + assertThat(tcResult.results.size()).isEqualTo(expectedCount); + + List directIds = directResult.results.stream().map(d -> d.get(ID_FIELD).asText()).sorted().collect(Collectors.toList()); + List tcIds = tcResult.results.stream().map(d -> d.get(ID_FIELD).asText()).sorted().collect(Collectors.toList()); + assertThat(tcIds).isEqualTo(directIds); + + } finally { + safeDeleteContainer(directTestContainer); + } + } + + /** + * Test: IN clause on partition key with 3 values → 3 disjoint EPK ranges across 3 physical partitions. + */ + @Test(groups = {"thinclient"}, timeOut = TIMEOUT * 3) + public void testMultiRangePartitionKeyInClause() { + String[] pkValues = {"pk-alpha", "pk-beta", "pk-gamma", "pk-delta", "pk-epsilon"}; + runMultiRangeTest(pkValues, + "SELECT * FROM c WHERE c.mypk IN ('pk-alpha', 'pk-gamma', 'pk-epsilon')", + 3); + } + + /** + * Test: OR on partition key values → 2 disjoint EPK ranges. + */ + @Test(groups = {"thinclient"}, timeOut = TIMEOUT * 3) + public void testMultiRangePartitionKeyOrClause() { + String[] pkValues = {"pk-or-1", "pk-or-2", "pk-or-3"}; + runMultiRangeTest(pkValues, + "SELECT * FROM c WHERE c.mypk = 'pk-or-1' OR c.mypk = 'pk-or-3'", + 2); + } + + /** + * Test: IN clause with 10 PK values → 10 disjoint EPK ranges, stress test for sort correctness. + * Uses UUID-based PK values to maximize EPK hash spread. + */ + @Test(groups = {"thinclient"}, timeOut = TIMEOUT * 3) + public void testMultiRangeManyPartitionKeys() { + String[] pkValues = new String[10]; + for (int i = 0; i < 10; i++) { + pkValues[i] = "pk-many-" + UUID.randomUUID().toString(); + } + + // Build IN clause dynamically from the random PK values + StringBuilder sb = new StringBuilder("SELECT * FROM c WHERE c.mypk IN ("); + for (int i = 0; i < pkValues.length; i++) { + if (i > 0) sb.append(", "); + sb.append("'").append(pkValues[i]).append("'"); + } + sb.append(")"); + runMultiRangeTest(pkValues, sb.toString(), 10); + } + + // ==================== Continuation Token Draining ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testContinuationTokenDraining() { + // Drain gateway fully for expected count + QueryResult gwResult = drainQuery(directContainer, "SELECT * FROM c", partitionedOptions(), ObjectNode.class); + + // Drain thin client with small page size to force multiple continuations + List tcAll = new ArrayList<>(); + List tcDiag = new ArrayList<>(); + String continuationToken = null; + int pageCount = 0; + int maxIterations = 100; + do { + Iterable> pages = thinClientContainer + .queryItems("SELECT * FROM c", partitionedOptions(), ObjectNode.class) + .byPage(continuationToken, 3) // small page size + .toIterable(); + for (FeedResponse page : pages) { + tcAll.addAll(page.getResults()); + tcDiag.add(page.getCosmosDiagnostics()); + continuationToken = page.getContinuationToken(); + pageCount++; + } + } while (continuationToken != null && --maxIterations > 0); + + for (CosmosDiagnostics d : tcDiag) { assertThinClientEndpointUsed(d); } + assertThat(pageCount).as("Should have multiple pages with page size 3").isGreaterThan(1); + assertThat(tcAll.size()).as("Continuation draining count mismatch").isEqualTo(gwResult.results.size()); + } + + // ==================== Invalid Query ==================== + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testInvalidQueryReturnsBadRequest() { + try { + thinClientContainer.queryItems("SELEC * FORM c", new CosmosQueryRequestOptions(), ObjectNode.class) + .byPage().blockFirst(); + fail("Expected exception for invalid query"); + } catch (CosmosException e) { + assertThat(e.getStatusCode() == 400 || e.getStatusCode() == 0) + .as("Invalid query should return 400 Bad Request or a thin-client transport rejection, got " + + e.getStatusCode()) + .isTrue(); + if (e.getStatusCode() == 0) { + assertThinClientEndpointUsed(e.getDiagnostics()); + } + logger.info("Expected error for invalid query: {} (status {})", e.getMessage(), e.getStatusCode()); + } + } + + // ==================== Vector Search ==================== + + /** + * Creates a vector-enabled container, runs VectorDistance query through both + * Direct and thin client, compares results. + */ + @Test(groups = {"thinclient"}, timeOut = TIMEOUT * 2) + public void testVectorSearchGatewayVsThinClient() { + String vectorContainerId = "vecCompare_" + UUID.randomUUID().toString().substring(0, 8); + CosmosAsyncDatabase db = directClient.getDatabase(directContainer.getDatabase().getId()); + CosmosAsyncContainer directVecContainer = db.getContainer(vectorContainerId); + + try { + PartitionKeyDefinition pkDef = new PartitionKeyDefinition(); + pkDef.setPaths(Collections.singletonList("/" + PK_FIELD)); + + CosmosContainerProperties props = new CosmosContainerProperties(vectorContainerId, pkDef); + + CosmosVectorEmbeddingPolicy policy = new CosmosVectorEmbeddingPolicy(); + CosmosVectorEmbedding emb = new CosmosVectorEmbedding(); + emb.setPath("/embedding"); + emb.setDataType(CosmosVectorDataType.FLOAT32); + emb.setEmbeddingDimensions(3); + emb.setDistanceFunction(CosmosVectorDistanceFunction.COSINE); + policy.setCosmosVectorEmbeddings(Collections.singletonList(emb)); + props.setVectorEmbeddingPolicy(policy); + + IndexingPolicy idxPolicy = new IndexingPolicy(); + idxPolicy.setIndexingMode(IndexingMode.CONSISTENT); + idxPolicy.setIncludedPaths(Collections.singletonList(new IncludedPath("/*"))); + idxPolicy.setExcludedPaths(Arrays.asList(new ExcludedPath("/embedding/*"), new ExcludedPath("/\"_etag\"/?"))); + CosmosVectorIndexSpec vecIdx = new CosmosVectorIndexSpec(); + vecIdx.setPath("/embedding"); + vecIdx.setType(CosmosVectorIndexType.FLAT.toString()); + idxPolicy.setVectorIndexes(Collections.singletonList(vecIdx)); + props.setIndexingPolicy(idxPolicy); + + db.createContainer(props).block(); + CosmosAsyncContainer tcVecContainer = thinClient.getDatabase(db.getId()).getContainer(vectorContainerId); + + double[][] embeddings = { + {1.0, 0.0, 0.0}, {0.0, 1.0, 0.0}, {0.0, 0.0, 1.0}, + {1.0, 1.0, 0.0}, {0.9, 0.1, 0.0}, + }; + + String vecPk = UUID.randomUUID().toString(); + List docIds = new ArrayList<>(); + for (int i = 0; i < embeddings.length; i++) { + String docId = "vec_" + i + "_" + UUID.randomUUID().toString().substring(0, 8); + docIds.add(docId); + ObjectNode doc = OBJECT_MAPPER.createObjectNode(); + doc.put(ID_FIELD, docId); + doc.put(PK_FIELD, vecPk); + doc.put("text", "document " + i); + ArrayNode arr = doc.putArray("embedding"); + for (double v : embeddings[i]) { arr.add(v); } + directVecContainer.createItem(doc, new PartitionKey(vecPk), null).block(); + } + + String query = "SELECT TOP 5 c.id, c.text, VectorDistance(c.embedding, [1.0, 0.0, 0.0]) AS score " + + "FROM c ORDER BY VectorDistance(c.embedding, [1.0, 0.0, 0.0])"; + + QueryResult directResult = drainQuery(directVecContainer, query, new CosmosQueryRequestOptions(), ObjectNode.class); + QueryResult tcResult = drainQuery(tcVecContainer, query, new CosmosQueryRequestOptions(), ObjectNode.class); + + for (CosmosDiagnostics d : tcResult.diagnostics) { assertThinClientEndpointUsed(d); } + + assertThat(tcResult.results.size()).isEqualTo(directResult.results.size()); + assertThat(tcResult.results.size()).isEqualTo(5); + + for (int i = 0; i < directResult.results.size(); i++) { + assertThat(tcResult.results.get(i).get("id").asText()).isEqualTo(directResult.results.get(i).get("id").asText()); + } + + assertThat(tcResult.results.get(0).get("id").asText()).isEqualTo(docIds.get(0)); + assertThat(tcResult.results.get(0).get("score").asDouble()).isGreaterThan(0.99); + + // Euclidean variant — validates ORDER BY score semantics with a different distance function + String euclideanQuery = "SELECT TOP 5 c.id, VectorDistance(c.embedding, [1.0, 0.0, 0.0], false, {'distanceFunction':'euclidean'}) AS score " + + "FROM c ORDER BY VectorDistance(c.embedding, [1.0, 0.0, 0.0], false, {'distanceFunction':'euclidean'})"; + + QueryResult directEuclidean = drainQuery(directVecContainer, euclideanQuery, new CosmosQueryRequestOptions(), ObjectNode.class); + QueryResult tcEuclidean = drainQuery(tcVecContainer, euclideanQuery, new CosmosQueryRequestOptions(), ObjectNode.class); + + for (CosmosDiagnostics d : tcEuclidean.diagnostics) { assertThinClientEndpointUsed(d); } + + assertThat(tcEuclidean.results.size()).isEqualTo(directEuclidean.results.size()); + assertThat(tcEuclidean.results.size()).isEqualTo(5); + + for (int i = 0; i < directEuclidean.results.size(); i++) { + assertThat(tcEuclidean.results.get(i).get("id").asText()) + .as("Euclidean vector search result mismatch at position " + i) + .isEqualTo(directEuclidean.results.get(i).get("id").asText()); + } + + } finally { + safeDeleteContainer(directVecContainer); + } + } + + // ==================== Full-Text Search ==================== + + /** + * Creates a container with full-text policy and index, runs FullTextContains query. + */ + @Test(groups = {"thinclient"}, timeOut = TIMEOUT * 2) + public void testFullTextSearchGatewayVsThinClient() { + String containerId = "ftsCompare_" + UUID.randomUUID().toString().substring(0, 8); + CosmosAsyncDatabase db = directClient.getDatabase(directContainer.getDatabase().getId()); + CosmosAsyncContainer directFtsContainer = db.getContainer(containerId); + + try { + PartitionKeyDefinition pkDef = new PartitionKeyDefinition(); + pkDef.setPaths(Collections.singletonList("/" + PK_FIELD)); + + CosmosContainerProperties props = new CosmosContainerProperties(containerId, pkDef); + + CosmosFullTextPath ftPath = new CosmosFullTextPath(); + ftPath.setPath("/text"); + ftPath.setLanguage("en-US"); + CosmosFullTextPolicy ftPolicy = new CosmosFullTextPolicy(); + ftPolicy.setDefaultLanguage("en-US"); + ftPolicy.setPaths(Collections.singletonList(ftPath)); + props.setFullTextPolicy(ftPolicy); + + IndexingPolicy idxPolicy = new IndexingPolicy(); + idxPolicy.setIndexingMode(IndexingMode.CONSISTENT); + idxPolicy.setIncludedPaths(Collections.singletonList(new IncludedPath("/*"))); + idxPolicy.setExcludedPaths(Collections.singletonList(new ExcludedPath("/\"_etag\"/?"))); + CosmosFullTextIndex ftIndex = new CosmosFullTextIndex(); + ftIndex.setPath("/text"); + idxPolicy.setCosmosFullTextIndexes(Collections.singletonList(ftIndex)); + props.setIndexingPolicy(idxPolicy); + + db.createContainer(props).block(); + CosmosAsyncContainer tcFtsContainer = thinClient.getDatabase(db.getId()).getContainer(containerId); + + String ftsPk = UUID.randomUUID().toString(); + String[] texts = { + "The quick brown fox jumps over the lazy dog", + "A red bicycle parked near the mountain trail", + "Electronic devices on sale at the downtown store", + "Mountain biking trails with scenic views", + "The lazy cat sleeps on the warm brown couch" + }; + for (int i = 0; i < texts.length; i++) { + ObjectNode doc = OBJECT_MAPPER.createObjectNode(); + doc.put(ID_FIELD, "fts_" + i + "_" + UUID.randomUUID().toString().substring(0, 8)); + doc.put(PK_FIELD, ftsPk); + doc.put("text", texts[i]); + directFtsContainer.createItem(doc, new PartitionKey(ftsPk), null).block(); + } + + String query = "SELECT TOP 10 * FROM c WHERE FullTextContains(c.text, 'mountain')"; + + QueryResult directResult = drainQuery(directFtsContainer, query, new CosmosQueryRequestOptions(), ObjectNode.class); + QueryResult tcResult = drainQuery(tcFtsContainer, query, new CosmosQueryRequestOptions(), ObjectNode.class); + + for (CosmosDiagnostics d : tcResult.diagnostics) { assertThinClientEndpointUsed(d); } + assertThat(directResult.results.size()).as("Full-text query should return results").isPositive(); + assertThat(tcResult.results.size()).isEqualTo(directResult.results.size()); + + List directIds = directResult.results.stream().map(d -> d.get("id").asText()).sorted().collect(Collectors.toList()); + List tcIds = tcResult.results.stream().map(d -> d.get("id").asText()).sorted().collect(Collectors.toList()); + assertThat(tcIds).isEqualTo(directIds); + + } finally { + safeDeleteContainer(directFtsContainer); + } + } + + /** + * Creates a container with full-text policy and index, runs ORDER BY RANK FullTextScore query. + * Compares exact ordering between Direct and thin client. + */ + @Test(groups = {"thinclient"}, timeOut = TIMEOUT * 2) + public void testFullTextScoreRanking() { + String containerId = "ftsRank_" + UUID.randomUUID().toString().substring(0, 8); + CosmosAsyncDatabase db = directClient.getDatabase(directContainer.getDatabase().getId()); + CosmosAsyncContainer directFtsContainer = db.getContainer(containerId); + + try { + PartitionKeyDefinition pkDef = new PartitionKeyDefinition(); + pkDef.setPaths(Collections.singletonList("/" + PK_FIELD)); + + CosmosContainerProperties props = new CosmosContainerProperties(containerId, pkDef); + + CosmosFullTextPath ftPath = new CosmosFullTextPath(); + ftPath.setPath("/text"); + ftPath.setLanguage("en-US"); + CosmosFullTextPolicy ftPolicy = new CosmosFullTextPolicy(); + ftPolicy.setDefaultLanguage("en-US"); + ftPolicy.setPaths(Collections.singletonList(ftPath)); + props.setFullTextPolicy(ftPolicy); + + IndexingPolicy idxPolicy = new IndexingPolicy(); + idxPolicy.setIndexingMode(IndexingMode.CONSISTENT); + idxPolicy.setIncludedPaths(Collections.singletonList(new IncludedPath("/*"))); + idxPolicy.setExcludedPaths(Collections.singletonList(new ExcludedPath("/\"_etag\"/?"))); + CosmosFullTextIndex ftIndex = new CosmosFullTextIndex(); + ftIndex.setPath("/text"); + idxPolicy.setCosmosFullTextIndexes(Collections.singletonList(ftIndex)); + props.setIndexingPolicy(idxPolicy); + + db.createContainer(props).block(); + CosmosAsyncContainer tcFtsContainer = thinClient.getDatabase(db.getId()).getContainer(containerId); + + String ftsPk = UUID.randomUUID().toString(); + String[] texts = { + "The quick brown fox jumps over the lazy dog", + "A red bicycle parked near the mountain trail", + "Electronic devices on sale at the downtown store", + "Mountain biking trails with scenic views", + "The lazy cat sleeps on the warm brown couch" + }; + for (int i = 0; i < texts.length; i++) { + ObjectNode doc = OBJECT_MAPPER.createObjectNode(); + doc.put(ID_FIELD, "ftsRank_" + i + "_" + UUID.randomUUID().toString().substring(0, 8)); + doc.put(PK_FIELD, ftsPk); + doc.put("text", texts[i]); + directFtsContainer.createItem(doc, new PartitionKey(ftsPk), null).block(); + } + + String query = "SELECT TOP 5 * FROM c ORDER BY RANK FullTextScore(c.text, 'mountain')"; + + QueryResult directResult = drainQuery(directFtsContainer, query, new CosmosQueryRequestOptions(), ObjectNode.class); + QueryResult tcResult = drainQuery(tcFtsContainer, query, new CosmosQueryRequestOptions(), ObjectNode.class); + + for (CosmosDiagnostics d : tcResult.diagnostics) { assertThinClientEndpointUsed(d); } + assertThat(directResult.results.size()).as("FullTextScore ranking query should return results").isPositive(); + assertThat(tcResult.results.size()).isEqualTo(directResult.results.size()); + + for (int i = 0; i < directResult.results.size(); i++) { + assertThat(tcResult.results.get(i).get("id").asText()) + .as("FullTextScore ranking result mismatch at position " + i) + .isEqualTo(directResult.results.get(i).get("id").asText()); + } + + } finally { + safeDeleteContainer(directFtsContainer); + } + } + + // ==================== Hybrid Search ==================== + + /** + * Creates a container with vector + full-text policies, runs hybrid RRF query. + */ + @Test(groups = {"thinclient"}, timeOut = TIMEOUT * 2) + public void testHybridSearchGatewayVsThinClient() { + String containerId = "hybridCompare_" + UUID.randomUUID().toString().substring(0, 8); + CosmosAsyncDatabase db = directClient.getDatabase(directContainer.getDatabase().getId()); + CosmosAsyncContainer directHybridContainer = db.getContainer(containerId); + + try { + PartitionKeyDefinition pkDef = new PartitionKeyDefinition(); + pkDef.setPaths(Collections.singletonList("/" + PK_FIELD)); + + CosmosContainerProperties props = new CosmosContainerProperties(containerId, pkDef); + + CosmosVectorEmbeddingPolicy vecPolicy = new CosmosVectorEmbeddingPolicy(); + CosmosVectorEmbedding emb = new CosmosVectorEmbedding(); + emb.setPath("/vector"); + emb.setDataType(CosmosVectorDataType.FLOAT32); + emb.setEmbeddingDimensions(3); + emb.setDistanceFunction(CosmosVectorDistanceFunction.COSINE); + vecPolicy.setCosmosVectorEmbeddings(Collections.singletonList(emb)); + props.setVectorEmbeddingPolicy(vecPolicy); + + CosmosFullTextPath ftPath = new CosmosFullTextPath(); + ftPath.setPath("/text"); + ftPath.setLanguage("en-US"); + CosmosFullTextPolicy ftPolicy = new CosmosFullTextPolicy(); + ftPolicy.setDefaultLanguage("en-US"); + ftPolicy.setPaths(Collections.singletonList(ftPath)); + props.setFullTextPolicy(ftPolicy); + + IndexingPolicy idxPolicy = new IndexingPolicy(); + idxPolicy.setIndexingMode(IndexingMode.CONSISTENT); + idxPolicy.setIncludedPaths(Collections.singletonList(new IncludedPath("/*"))); + idxPolicy.setExcludedPaths(Arrays.asList(new ExcludedPath("/vector/*"), new ExcludedPath("/\"_etag\"/?"))); + CosmosVectorIndexSpec vecIdx = new CosmosVectorIndexSpec(); + vecIdx.setPath("/vector"); + vecIdx.setType(CosmosVectorIndexType.FLAT.toString()); + idxPolicy.setVectorIndexes(Collections.singletonList(vecIdx)); + CosmosFullTextIndex ftIndex = new CosmosFullTextIndex(); + ftIndex.setPath("/text"); + idxPolicy.setCosmosFullTextIndexes(Collections.singletonList(ftIndex)); + props.setIndexingPolicy(idxPolicy); + + db.createContainer(props).block(); + CosmosAsyncContainer tcHybridContainer = thinClient.getDatabase(db.getId()).getContainer(containerId); + + String hybridPk = UUID.randomUUID().toString(); + String[] texts = { + "Red bicycle on the mountain trail", + "Blue car parked in the city", + "Green bicycle near the lake" + }; + double[][] vectors = { + {1.0, 0.0, 0.0}, {0.0, 1.0, 0.0}, {0.0, 0.0, 1.0} + }; + for (int i = 0; i < texts.length; i++) { + ObjectNode doc = OBJECT_MAPPER.createObjectNode(); + doc.put(ID_FIELD, "hybrid_" + i + "_" + UUID.randomUUID().toString().substring(0, 8)); + doc.put(PK_FIELD, hybridPk); + doc.put("text", texts[i]); + ArrayNode arr = doc.putArray("vector"); + for (double v : vectors[i]) { arr.add(v); } + directHybridContainer.createItem(doc, new PartitionKey(hybridPk), null).block(); + } + + String query = "SELECT TOP 3 * FROM c " + + "ORDER BY RANK RRF(VectorDistance(c.vector, [1.0, 0.0, 0.0]), FullTextScore(c.text, 'bicycle'))"; + + QueryResult directResult = drainQuery(directHybridContainer, query, new CosmosQueryRequestOptions(), ObjectNode.class); + QueryResult tcResult = drainQuery(tcHybridContainer, query, new CosmosQueryRequestOptions(), ObjectNode.class); + + for (CosmosDiagnostics d : tcResult.diagnostics) { assertThinClientEndpointUsed(d); } + assertThat(tcResult.results.size()).isEqualTo(directResult.results.size()); + + for (int i = 0; i < directResult.results.size(); i++) { + assertThat(tcResult.results.get(i).get("id").asText()) + .as("Hybrid search result mismatch at position " + i) + .isEqualTo(directResult.results.get(i).get("id").asText()); + } + + } finally { + safeDeleteContainer(directHybridContainer); + } + } + + // ==================== Assertion & Drain Helpers ==================== + + private static void safeDeleteContainer(CosmosAsyncContainer container) { + if (container != null) { + try { container.delete().block(); } catch (Exception e) { logger.warn("Container cleanup failed: {}", e.getMessage()); } + } + } + + /** Holds query results and per-page diagnostics from a fully drained query. */ + private static class QueryResult { + final List results = new ArrayList<>(); + final List diagnostics = new ArrayList<>(); + } + + private CosmosQueryRequestOptions partitionedOptions() { + CosmosQueryRequestOptions opts = new CosmosQueryRequestOptions(); + opts.setPartitionKey(new PartitionKey(commonPk)); + return opts; + } + + private QueryResult drainQuery(CosmosAsyncContainer c, String query, CosmosQueryRequestOptions opts, Class type) { + QueryResult result = new QueryResult<>(); + for (FeedResponse page : c.queryItems(query, opts, type).byPage().toIterable()) { + result.results.addAll(page.getResults()); + result.diagnostics.add(page.getCosmosDiagnostics()); + } + return result; + } + + private QueryResult drainQuery(CosmosAsyncContainer c, SqlQuerySpec qs, CosmosQueryRequestOptions opts, Class type) { + QueryResult result = new QueryResult<>(); + for (FeedResponse page : c.queryItems(qs, opts, type).byPage().toIterable()) { + result.results.addAll(page.getResults()); + result.diagnostics.add(page.getCosmosDiagnostics()); + } + return result; + } + + /** + * Direct vs thin client comparison: run query via both Direct TCP and thin client. + * Assert: (1) thin client used :10250, (2) same count, (3) same document IDs in order. + */ + private void assertDirectAndThinClientMatch(String query) { + assertDirectAndThinClientMatch(query, partitionedOptions()); + } + + private void assertDirectAndThinClientMatch(String query, CosmosQueryRequestOptions options) { + QueryResult gwResult = drainQuery(directContainer, query, options, ObjectNode.class); + QueryResult tcResult = drainQuery(thinClientContainer, query, options, ObjectNode.class); + for (CosmosDiagnostics d : tcResult.diagnostics) { assertThinClientEndpointUsed(d); } + + assertThat(tcResult.results.size()).as("Count mismatch: " + query).isEqualTo(gwResult.results.size()); + + List gwIds = gwResult.results.stream().filter(d -> d.has(ID_FIELD)).map(d -> d.get(ID_FIELD).asText()).collect(Collectors.toList()); + List tcIds = tcResult.results.stream().filter(d -> d.has(ID_FIELD)).map(d -> d.get(ID_FIELD).asText()).collect(Collectors.toList()); + assertThat(tcIds).as("IDs mismatch: " + query).isEqualTo(gwIds); + } + + private void assertDirectAndThinClientMatch(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) { + QueryResult gwResult = drainQuery(directContainer, querySpec, options, ObjectNode.class); + QueryResult tcResult = drainQuery(thinClientContainer, querySpec, options, ObjectNode.class); + for (CosmosDiagnostics d : tcResult.diagnostics) { assertThinClientEndpointUsed(d); } + + assertThat(tcResult.results.size()).as("Count mismatch: " + querySpec.getQueryText()).isEqualTo(gwResult.results.size()); + + List gwIds = gwResult.results.stream().filter(d -> d.has(ID_FIELD)).map(d -> d.get(ID_FIELD).asText()).collect(Collectors.toList()); + List tcIds = tcResult.results.stream().filter(d -> d.has(ID_FIELD)).map(d -> d.get(ID_FIELD).asText()).collect(Collectors.toList()); + assertThat(tcIds).as("IDs mismatch: " + querySpec.getQueryText()).isEqualTo(gwIds); + } + + private void assertScalarDirectAndThinClientMatch(String query, Class resultType) { + assertScalarDirectAndThinClientMatch(query, partitionedOptions(), resultType); + } + + private void assertScalarDirectAndThinClientMatch(String query, CosmosQueryRequestOptions options, Class resultType) { + QueryResult gwResult = drainQuery(directContainer, query, options, resultType); + QueryResult tcResult = drainQuery(thinClientContainer, query, options, resultType); + for (CosmosDiagnostics d : tcResult.diagnostics) { assertThinClientEndpointUsed(d); } + + assertThat(tcResult.results.size()).as("Scalar count mismatch: " + query).isEqualTo(gwResult.results.size()); + for (int i = 0; i < gwResult.results.size(); i++) { + assertThat(tcResult.results.get(i).toString()).as("Scalar value mismatch at " + i + ": " + query) + .isEqualTo(gwResult.results.get(i).toString()); + } + } + + /** Direct vs thin client comparison for GROUP BY where result order may vary — compare as sets. */ + private void assertGroupByDirectAndThinClientMatch(String query, String groupField) { + QueryResult gwResult = drainQuery(directContainer, query, partitionedOptions(), ObjectNode.class); + QueryResult tcResult = drainQuery(thinClientContainer, query, partitionedOptions(), ObjectNode.class); + for (CosmosDiagnostics d : tcResult.diagnostics) { assertThinClientEndpointUsed(d); } + + assertThat(tcResult.results.size()).as("GROUP BY count mismatch: " + query).isEqualTo(gwResult.results.size()); + for (ObjectNode gwRow : gwResult.results) { + String key = gwRow.get(groupField).asText(); + boolean found = tcResult.results.stream().anyMatch(tc -> tc.get(groupField).asText().equals(key) + && tc.toString().equals(gwRow.toString())); + assertThat(found).as("GROUP BY row not found in thin client results: " + key).isTrue(); + } + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ThinClientStoredProcedureE2ETest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ThinClientStoredProcedureE2ETest.java new file mode 100644 index 000000000000..6af552954082 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ThinClientStoredProcedureE2ETest.java @@ -0,0 +1,118 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.rx; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.models.CosmosStoredProcedureProperties; +import com.azure.cosmos.models.CosmosStoredProcedureRequestOptions; +import com.azure.cosmos.models.CosmosStoredProcedureResponse; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.PartitionKey; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.UUID; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.Fail.fail; + +/** + * Thin client E2E tests for stored procedure execution. + * Container is truncated in {@code @BeforeClass} — no per-test cleanup needed. + */ +public class ThinClientStoredProcedureE2ETest extends ThinClientTestBase { + + @Factory(dataProvider = "clientBuildersWithGatewayAndHttp2") + public ThinClientStoredProcedureE2ETest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testThinClientStoredProcedure() { + String sprocId = "createDocSproc_" + UUID.randomUUID(); + String pkValue = UUID.randomUUID().toString(); + String docId = UUID.randomUUID().toString(); + + CosmosStoredProcedureProperties storedProcedureDef = new CosmosStoredProcedureProperties( + sprocId, + "function createDocument(docToCreate) {" + + "var context = getContext();" + + "var container = context.getCollection();" + + "var response = context.getResponse();" + + "var accepted = container.createDocument(" + + " container.getSelfLink()," + + " docToCreate," + + " function(err, docCreated) {" + + " if (err) throw new Error('Error creating document: ' + err.message);" + + " response.setBody(docCreated);" + + " });" + + "if (!accepted) throw new Error('Document creation was not accepted');" + + "}" + ); + + CosmosStoredProcedureResponse createResponse = container.getScripts() + .createStoredProcedure(storedProcedureDef).block(); + assertThat(createResponse).isNotNull(); + assertThat(createResponse.getStatusCode()).isEqualTo(201); + + CosmosStoredProcedureRequestOptions options = new CosmosStoredProcedureRequestOptions(); + options.setPartitionKey(new PartitionKey(pkValue)); + + ObjectNode docToCreate = createTestDocument(docId, pkValue); + + CosmosStoredProcedureResponse executeResponse = container.getScripts() + .getStoredProcedure(sprocId) + .execute(Arrays.asList(docToCreate), options).block(); + + assertThat(executeResponse).isNotNull(); + assertThat(executeResponse.getStatusCode()).isEqualTo(200); + assertThat(executeResponse.getRequestCharge()).isGreaterThan(0.0); + assertThinClientEndpointUsed(executeResponse.getDiagnostics()); + + CosmosItemResponse readResponse = container.readItem(docId, new PartitionKey(pkValue), ObjectNode.class).block(); + assertThat(readResponse).isNotNull(); + assertThat(readResponse.getItem().get(ID_FIELD).asText()).isEqualTo(docId); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testStoredProcedureExecutionWithoutPartitionKeyThrows() { + String sprocId = "noPartitionKeySproc_" + UUID.randomUUID(); + + CosmosStoredProcedureProperties storedProcedureDef = new CosmosStoredProcedureProperties( + sprocId, "function() { getContext().getResponse().setBody('Hello'); }"); + + container.getScripts().createStoredProcedure(storedProcedureDef).block(); + + CosmosStoredProcedureRequestOptions options = new CosmosStoredProcedureRequestOptions(); + + try { + container.getScripts().getStoredProcedure(sprocId).execute(null, options).block(); + fail("Expected UnsupportedOperationException for sproc execution without partition key"); + } catch (UnsupportedOperationException e) { + assertThat(e.getMessage()).contains("PartitionKey value must be supplied"); + } + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void testThinClientStoredProcedureWithPartitionKeyNone() { + String sprocId = "pkNoneSproc_" + UUID.randomUUID(); + + CosmosStoredProcedureProperties storedProcedureDef = new CosmosStoredProcedureProperties( + sprocId, "function() { getContext().getResponse().setBody('Hello from PK.NONE'); }"); + + container.getScripts().createStoredProcedure(storedProcedureDef).block(); + + CosmosStoredProcedureRequestOptions options = new CosmosStoredProcedureRequestOptions(); + options.setPartitionKey(PartitionKey.NONE); + + CosmosStoredProcedureResponse executeResponse = container.getScripts() + .getStoredProcedure(sprocId).execute(null, options).block(); + + assertThat(executeResponse).isNotNull(); + assertThat(executeResponse.getStatusCode()).isEqualTo(200); + assertThat(executeResponse.getRequestCharge()).isGreaterThan(0.0); + assertThinClientEndpointUsed(executeResponse.getDiagnostics()); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ThinClientTestBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ThinClientTestBase.java new file mode 100644 index 000000000000..379d12bd3720 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ThinClientTestBase.java @@ -0,0 +1,111 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.rx; + +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosDiagnostics; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.CosmosDiagnosticsRequestInfo; +import com.azure.cosmos.CosmosClientBuilder; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; + +import java.util.Collection; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** + * Base class for thin client E2E tests. Provides shared setup/teardown, + * constants, and helper methods common to all thin client test classes. + */ +public abstract class ThinClientTestBase extends TestSuiteBase { + + protected static final String THIN_CLIENT_ENDPOINT_INDICATOR = ":10250/"; + protected static final String ID_FIELD = "id"; + protected static final String PARTITION_KEY_FIELD = "mypk"; + protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + protected CosmosAsyncClient client; + protected CosmosAsyncContainer container; + + protected ThinClientTestBase(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"thinclient"}, timeOut = SETUP_TIMEOUT) + public void before_ThinClientTest() { + assertThat(this.client).isNull(); + enableThinClientForTest(); + this.client = getClientBuilder().buildAsyncClient(); + this.container = getSharedMultiPartitionCosmosContainer(this.client); + + // Clean up shared container to prevent cross-test-class pollution. + cleanUpContainer(this.container); + } + + @AfterClass(groups = {"thinclient"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + clearThinClientForTest(); + if (this.client != null) { + this.client.close(); + } + } + + protected static void enableThinClientForTest() { + System.setProperty("COSMOS.THINCLIENT_ENABLED", "true"); + } + + protected static void clearThinClientForTest() { + System.clearProperty("COSMOS.THINCLIENT_ENABLED"); + } + + /** + * Creates a test document with id and mypk fields (matching shared container partition key). + */ + protected ObjectNode createTestDocument(String id, String mypk) { + ObjectNode doc = OBJECT_MAPPER.createObjectNode(); + doc.put(ID_FIELD, id); + doc.put(PARTITION_KEY_FIELD, mypk); + return doc; + } + + /** + * Asserts that all requests in the diagnostics were routed through the thin client endpoint. + */ + protected static void assertThinClientEndpointUsed(CosmosDiagnostics diagnostics) { + assertThat(diagnostics).isNotNull(); + CosmosDiagnosticsContext ctx = diagnostics.getDiagnosticsContext(); + assertThat(ctx).isNotNull(); + Collection requests = ctx.getRequestInfo(); + assertThat(requests).isNotNull(); + assertThat(requests.size()).isPositive(); + int requestCountAgainstThinClientEndpoint = 0; + for (CosmosDiagnosticsRequestInfo requestInfo : requests) { + if (requestInfo.getEndpoint().contains(THIN_CLIENT_ENDPOINT_INDICATOR)) { + requestCountAgainstThinClientEndpoint++; + } + } + assertThat(requestCountAgainstThinClientEndpoint).isEqualTo(requests.size()); + } + + /** + * Asserts that NO requests in the diagnostics were routed through the thin client endpoint, + * confirming the gateway client used the standard :443 path. + */ + protected static void assertGatewayEndpointUsed(CosmosDiagnostics diagnostics) { + assertThat(diagnostics).isNotNull(); + CosmosDiagnosticsContext ctx = diagnostics.getDiagnosticsContext(); + assertThat(ctx).isNotNull(); + Collection requests = ctx.getRequestInfo(); + assertThat(requests).isNotNull(); + assertThat(requests.size()).isPositive(); + for (CosmosDiagnosticsRequestInfo requestInfo : requests) { + assertThat(requestInfo.getEndpoint()) + .as("Gateway client must not route through thin client endpoint, but found: " + requestInfo.getEndpoint()) + .doesNotContain(THIN_CLIENT_ENDPOINT_INDICATOR); + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index d4f99c183c24..84b9ff5d3b02 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -5602,6 +5602,11 @@ public GlobalEndpointManager getGlobalEndpointManager() { public GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker getGlobalPartitionEndpointManagerForCircuitBreaker() { return RxDocumentClientImpl.this.globalPartitionEndpointManagerForPerPartitionCircuitBreaker; } + + @Override + public boolean useThinClient(RxDocumentServiceRequest request) { + return RxDocumentClientImpl.this.useThinClientStoreModel(request); + } }; } @@ -7388,7 +7393,8 @@ private RxStoreModel getStoreProxy(RxDocumentServiceRequest request) { resourceType == ResourceType.ClientEncryptionKey || resourceType.isScript() && operationType != OperationType.ExecuteJavaScript || resourceType == ResourceType.PartitionKeyRange || - resourceType == ResourceType.PartitionKey && operationType == OperationType.Delete) { + resourceType == ResourceType.PartitionKey && operationType == OperationType.Delete || + operationType == OperationType.QueryPlan) { return this.gatewayProxy; } @@ -7426,7 +7432,7 @@ private RxStoreModel getStoreProxy(RxDocumentServiceRequest request) { if ((operationType == OperationType.Query || operationType == OperationType.SqlQuery || operationType == OperationType.ReadFeed) && - Utils.isCollectionChild(request.getResourceType())) { + Utils.isCollectionChild(request.getResourceType())) { // Go to gateway only when partition key range and partition key are not set. This should be very rare if (request.getPartitionKeyRangeIdentity() == null && request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY) == null) { @@ -8983,7 +8989,8 @@ public boolean useThinClient() { private boolean useThinClientStoreModel(RxDocumentServiceRequest request) { if (!useThinClient || !this.globalEndpointManager.hasThinClientReadLocations() - || request.getResourceType() != ResourceType.Document) { + || (request.getResourceType() != ResourceType.Document + && !request.isExecuteStoredProcedureBasedRequest())) { return false; } @@ -8993,7 +9000,10 @@ private boolean useThinClientStoreModel(RxDocumentServiceRequest request) { return operationType.isPointOperation() || operationType == OperationType.Query || operationType == OperationType.Batch - || request.isChangeFeedRequest() && !request.isAllVersionsAndDeletesChangeFeedMode(); + || (request.isChangeFeedRequest() + && !request.isAllVersionsAndDeletesChangeFeedMode()) + || request.isExecuteStoredProcedureBasedRequest() + || operationType == OperationType.QueryPlan; } private DocumentClientRetryPolicy getRetryPolicyForPointOperation( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index 03f27156f62f..1e2958040978 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -968,6 +968,10 @@ public boolean isChangeFeedRequest() { return this.headers.containsKey(HttpConstants.HttpHeaders.A_IM); } + public boolean isExecuteStoredProcedureBasedRequest() { + return this.resourceType == ResourceType.StoredProcedure && this.operationType == OperationType.ExecuteJavaScript; + } + public boolean isAllVersionsAndDeletesChangeFeedMode() { String aImHeader = this.headers.get(HttpConstants.HttpHeaders.A_IM); return this.headers.containsKey(HttpConstants.HttpHeaders.A_IM) && HttpConstants.A_IMHeaderValues.FULL_FIDELITY_FEED.equals(aImHeader); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java index 3a849b4c5baa..0f9cc8ae18cf 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java @@ -252,10 +252,13 @@ public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI reque byte[] epk = partitionKey.getEffectivePartitionKeyBytes(request.getPartitionKeyInternal(), request.getPartitionKeyDefinition()); rntbdRequest.setHeaderValue(RntbdConstants.RntbdRequestHeader.EffectivePartitionKey, epk); } else if (request.requestContext.resolvedPartitionKeyRange == null) { - throw new IllegalStateException( - "Resolved partition key range should not be null at this point. ResourceType: " - + request.getResourceType() + ", OperationType: " - + request.getOperationType()); + + if (request.getOperationType() != OperationType.QueryPlan) { + throw new IllegalStateException( + "Resolved partition key range should not be null at this point. ResourceType: " + + request.getResourceType() + ", OperationType: " + + request.getOperationType()); + } } else { PartitionKeyRange pkRange = request.requestContext.resolvedPartitionKeyRange; rntbdRequest.setHeaderValue(RntbdConstants.RntbdRequestHeader.StartEpkHash, HexConvert.hexToBytes(pkRange.getMinInclusive())); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java index 15b4b50df559..4313be8e8293 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java @@ -283,7 +283,8 @@ public enum RntbdOperationType { PreReplaceValidation((short) 0x0020, OperationType.PreReplaceValidation), AddComputeGatewayRequestCharges((short) 0x0021, OperationType.AddComputeGatewayRequestCharges), MigratePartition((short) 0x0022, OperationType.MigratePartition), - Batch((short) 0x0025, OperationType.Batch); + Batch((short) 0x0025, OperationType.Batch), + QueryPlan((short) 0x0042, OperationType.QueryPlan); private final short id; private final OperationType type; @@ -367,6 +368,8 @@ public static RntbdOperationType fromId(final short id) { return RntbdOperationType.MigratePartition; case 0x0025: return RntbdOperationType.Batch; + case 0x0042: + return RntbdOperationType.QueryPlan; default: throw new DecoderException(String.format("expected byte value matching %s value, not %s", RntbdOperationType.class.getSimpleName(), @@ -444,6 +447,8 @@ public static RntbdOperationType fromType(OperationType type) { return RntbdOperationType.AddComputeGatewayRequestCharges; case Batch: return RntbdOperationType.Batch; + case QueryPlan: + return RntbdOperationType.QueryPlan; default: throw new IllegalArgumentException(String.format("unrecognized operation type: %s", type)); } @@ -619,7 +624,10 @@ public enum RntbdRequestHeader implements RntbdHeader { ThroughputBucket((short)0x00DB, RntbdTokenType.Byte, false), WorkloadId((short)0x00DC, RntbdTokenType.Byte, false), HubRegionProcessingOnly((short)0x00EF, RntbdTokenType.Byte , false), - ReadConsistencyStrategy((short)0x00FE, RntbdTokenType.Byte, false); + ReadConsistencyStrategy((short)0x00FE, RntbdTokenType.Byte, false), + // QueryPlan headers for proxy — IDs match server-side RntbdConstants.cs (ADO PR 1982503) + SupportedQueryFeatures((short) 0x00FF, RntbdTokenType.String, false), + QueryVersion((short) 0x0100, RntbdTokenType.SmallString, false); public static final List thinClientHeadersInOrderList = Arrays.asList( EffectivePartitionKey, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestFrame.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestFrame.java index 8eb632c48c8c..294b7399d6f5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestFrame.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestFrame.java @@ -204,6 +204,8 @@ private static RntbdOperationType map(final OperationType operationType) { return RntbdOperationType.AddComputeGatewayRequestCharges; case Batch: return RntbdOperationType.Batch; + case QueryPlan: + return RntbdOperationType.QueryPlan; default: final String reason = String.format("Unrecognized operation type: %s", operationType); throw new UnsupportedOperationException(reason); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java index 38de8021c44f..6652d3d94cc8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java @@ -195,6 +195,11 @@ private static ImplementationBridgeHelpers.PriorityLevelHelper.PriorityLevelAcce // and BE will respect the per-request value. this.fillTokenFromHeader(headers, this::getClientVersion, HttpHeaders.VERSION); + + // QueryPlan headers — needed for proxy to extract supported features and query version + // from the RNTBD body (IDs match server-side proxy: ADO PR 1982503) + this.fillTokenFromHeader(headers, this::getSupportedQueryFeatures, HttpHeaders.SUPPORTED_QUERY_FEATURES); + this.fillTokenFromHeader(headers, this::getQueryVersion, HttpHeaders.QUERY_VERSION); } private RntbdRequestHeaders(ByteBuf in) { @@ -653,6 +658,14 @@ private RntbdToken getChangeFeedWireFormatVersion() { return this.get(RntbdRequestHeader.ChangeFeedWireFormatVersion); } + private RntbdToken getSupportedQueryFeatures() { + return this.get(RntbdRequestHeader.SupportedQueryFeatures); + } + + private RntbdToken getQueryVersion() { + return this.get(RntbdRequestHeader.QueryVersion); + } + private void addAimHeader(final Map headers) { final String value = headers.get(HttpHeaders.A_IM); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java index 11528ed4f7ea..5ac697dad68b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java @@ -127,6 +127,7 @@ private static Mono getPartitionKeyRangesAn query, resourceLink, cosmosQueryRequestOptions, + collection, queryPlanCachingEnabled, queryPlanCache) .flatMap( @@ -145,6 +146,7 @@ private static Mono fetchQueryPlan( SqlQuerySpec query, String resourceLink, CosmosQueryRequestOptions cosmosQueryRequestOptions, + DocumentCollection collection, boolean queryPlanCachingEnabled, Map queryPlanCache) { @@ -163,7 +165,8 @@ private static Mono fetchQueryPlan( client, query, resourceLink, - cosmosQueryRequestOptions) + cosmosQueryRequestOptions, + collection) .doOnNext(partitionedQueryExecutionInfo -> { if (queryPlanCachingEnabled && isScopedToSinglePartition(cosmosQueryRequestOptions)) { tryCacheQueryPlan(query, partitionedQueryExecutionInfo, queryPlanCache); @@ -355,6 +358,7 @@ public static Mono fetchQueryPlanForValidation( sqlQuerySpec, resourceLink, queryRequestOptions, + null, queryPlanCachingEnabled, queryPlanCache); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/HybridSearchDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/HybridSearchDocumentQueryExecutionContext.java index 8dd42f5aa32c..830a37a2dda4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/HybridSearchDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/HybridSearchDocumentQueryExecutionContext.java @@ -4,6 +4,7 @@ package com.azure.cosmos.implementation.query; import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.CosmosDiagnostics; import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.ClientSideRequestStatistics; import com.azure.cosmos.implementation.DiagnosticsClientContext; @@ -173,6 +174,7 @@ private void initialize( aggregatedGlobalStatistics = Flux.fromIterable(globalStatsProducers) .flatMap(producer -> producer.produceAsync() .map(documentProducerFeedResponse -> { + this.captureClientSideRequestStatistics(documentProducerFeedResponse.pageResult); List results = documentProducerFeedResponse.pageResult.getResults(); return new GlobalFullTextSearchQueryStatistics(results.get(0)); })) @@ -443,10 +445,27 @@ private Flux getComponentQueryResults(List targetFee return Flux.fromIterable(componentProducers) .flatMap(DocumentProducer::produceAsync) + .doOnNext(response -> this.captureClientSideRequestStatistics(response.pageResult)) .flatMap(response -> Flux.fromIterable(response.pageResult.getResults())); }); } + private void captureClientSideRequestStatistics(FeedResponse response) { + if (response == null || response.getCosmosDiagnostics() == null) { + return; + } + + CosmosDiagnostics diagnostics = response.getCosmosDiagnostics(); + Collection requestStatistics = + diagAccessor().getFeedResponseDiagnostics(diagnostics) != null + ? diagAccessor().getClientSideRequestStatisticsForQueryPipelineAggregations(diagnostics) + : diagAccessor().getClientSideRequestStatistics(diagnostics); + + if (requestStatistics != null && !requestStatistics.isEmpty()) { + this.clientSideRequestStatistics.addAll(requestStatistics); + } + } + private Flux retrieveRewrittenQueryInfos(List componentQueryInfos) { return aggregatedGlobalStatistics.hasElement().flatMapMany(globalStatistics -> { if (globalStatistics != null) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/IDocumentQueryClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/IDocumentQueryClient.java index 91679d40490e..bfc2293f6aeb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/IDocumentQueryClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/IDocumentQueryClient.java @@ -110,4 +110,6 @@ Mono addPartitionLevelUnavailableRegionsOnRequest( GlobalEndpointManager getGlobalEndpointManager(); GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker getGlobalPartitionEndpointManagerForCircuitBreaker(); + + boolean useThinClient(RxDocumentServiceRequest request); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PartitionedQueryExecutionInfo.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PartitionedQueryExecutionInfo.java index 4967d2b1e6be..41b33ae2846b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PartitionedQueryExecutionInfo.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PartitionedQueryExecutionInfo.java @@ -5,10 +5,11 @@ import com.azure.cosmos.implementation.RequestTimeline; import com.azure.cosmos.implementation.query.hybridsearch.HybridSearchQueryInfo; +import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper; import com.azure.cosmos.implementation.routing.Range; import com.azure.cosmos.implementation.JsonSerializable; -import com.azure.cosmos.implementation.Constants; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.azure.cosmos.models.PartitionKeyDefinition; import java.util.List; @@ -25,23 +26,32 @@ public final class PartitionedQueryExecutionInfo extends JsonSerializable { private RequestTimeline queryPlanRequestTimeline; private HybridSearchQueryInfo hybridSearchQueryInfo; - PartitionedQueryExecutionInfo(QueryInfo queryInfo, List> queryRanges) { - this.queryInfo = queryInfo; - this.queryRanges = queryRanges; - - this.set( - PartitionedQueryExecutionInfoInternal.PARTITIONED_QUERY_EXECUTION_INFO_VERSION_PROPERTY, - Constants.PartitionedQueryExecutionInfo.VERSION_1 - ); - } - + /** + * Constructs with EPK hex string format expected for queryRanges. + */ public PartitionedQueryExecutionInfo(ObjectNode content, RequestTimeline queryPlanRequestTimeline) { super(content); this.queryPlanRequestTimeline = queryPlanRequestTimeline; } - public PartitionedQueryExecutionInfo(String jsonString) { - super(jsonString); + /** + * Constructs with PartitionKeyInternal array format expected for queryRanges. + * The {@code partitionKeyDefinition} is required for converting PartitionKeyInternal + * values to EPK hex strings. + * + * @param partitionKeyDefinition the container's partition key definition, must not be null. + */ + PartitionedQueryExecutionInfo(ObjectNode content, RequestTimeline queryPlanRequestTimeline, PartitionKeyDefinition partitionKeyDefinition) { + super(content); + if (partitionKeyDefinition == null) { + throw new IllegalArgumentException("partitionKeyDefinition must not be null"); + } + this.queryPlanRequestTimeline = queryPlanRequestTimeline; + this.queryRanges = PartitionKeyInternalHelper.convertToSortedEpkRanges( + PartitionedQueryExecutionInfoInternal.QUERY_RANGES_PROPERTY, + content, + partitionKeyDefinition); + this.getPropertyBag().remove(PartitionedQueryExecutionInfoInternal.QUERY_RANGES_PROPERTY); } public int getVersion() { @@ -54,10 +64,26 @@ public QueryInfo getQueryInfo() { PartitionedQueryExecutionInfoInternal.QUERY_INFO_PROPERTY, QueryInfo.class)); } + /** + * Returns the query ranges as sorted EPK hex string ranges. + *

+ * Two formats exist: + *

    + *
  • Gateway V1: {@code min}/{@code max} are EPK hex strings and are + * deserialized directly via {@code getList()}.
  • + *
  • Thin client proxy: {@code min}/{@code max} are PartitionKeyInternal + * JSON arrays and are converted to EPK hex by the thin-client constructor.
  • + *
+ */ public List> getQueryRanges() { - return this.queryRanges != null ? this.queryRanges - : (this.queryRanges = super.getList( - PartitionedQueryExecutionInfoInternal.QUERY_RANGES_PROPERTY, QUERY_RANGES_CLASS)); + if (this.queryRanges != null) { + return this.queryRanges; + } + + // EPK hex string format — direct deserialization + this.queryRanges = super.getList( + PartitionedQueryExecutionInfoInternal.QUERY_RANGES_PROPERTY, QUERY_RANGES_CLASS); + return this.queryRanges; } public RequestTimeline getQueryPlanRequestTimeline() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java index b8ea415ebf2f..e65101b02d51 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java @@ -16,6 +16,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -166,10 +167,23 @@ public boolean hasNonStreamingOrderBy() { return this.nonStreamingOrderBy; } - public Map getGroupByAliasToAggregateType(){ - Map groupByAliasToAggregateMap; - groupByAliasToAggregateMap = super.getMap("groupByAliasToAggregateType"); - return groupByAliasToAggregateMap; + public Map getGroupByAliasToAggregateType() { + Map rawMap = super.getMap("groupByAliasToAggregateType"); + if (rawMap == null) { + return null; + } + + Map groupByAliasToAggregateMap = new HashMap<>(rawMap.size()); + rawMap.forEach((key, value) -> { + if (value == null || (value instanceof String && ((String) value).isEmpty())) { + groupByAliasToAggregateMap.put(key, null); + } else if (value instanceof AggregateOperator) { + groupByAliasToAggregateMap.put(key, (AggregateOperator) value); + } else { + groupByAliasToAggregateMap.put(key, AggregateOperator.valueOf(String.valueOf(value))); + } + }); + return groupByAliasToAggregateMap; } public List getGroupByAliases() { @@ -267,4 +281,3 @@ public int hashCode() { return super.hashCode(); } } - diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java index 591b601eb7c4..308dcefb1c3b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java @@ -8,13 +8,18 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.DiagnosticsClientContext; +import com.azure.cosmos.implementation.DocumentCollection; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.PathsHelper; +import com.azure.cosmos.implementation.RequestTimeline; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; +import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper; +import com.azure.cosmos.implementation.routing.Range; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.ModelBridgeInternal; import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.PartitionKeyDefinition; import com.azure.cosmos.models.SqlParameter; import com.azure.cosmos.models.SqlQuerySpec; import com.azure.cosmos.implementation.BackoffRetryUtility; @@ -50,6 +55,9 @@ private static ImplementationBridgeHelpers.CosmosQueryRequestOptionsHelper.Cosmo // new NonStreamingOrderBy query feature the client might run into some issue of not being able to recognize this, // and throw a 400 exception. If the environment variable `AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY` is set to // True to opt out of this new query feature, we will return the OLD query features to operate correctly. + // TODO: Add ListAndSetAggregate after Java supports MAKELIST/MAKESET query aggregation. + // TODO: Add HybridSearchSkipOrderByRewrite after the Java hybrid query pipeline can consume the optimized plan. + // See PR #47759 review. private static final String SUPPORTED_QUERY_FEATURES = QueryFeature.Aggregate.name() + ", " + QueryFeature.CompositeAggregate.name() + ", " + QueryFeature.MultipleOrderBy.name() + ", " + @@ -63,6 +71,7 @@ private static ImplementationBridgeHelpers.CosmosQueryRequestOptionsHelper.Cosmo QueryFeature.NonValueAggregate.name() + ", " + QueryFeature.NonStreamingOrderBy.name() + ", " + QueryFeature.HybridSearch.name() + ", " + + QueryFeature.CountIf.name() + ", " + QueryFeature.WeightedRankFusion.name(); private static final String OLD_SUPPORTED_QUERY_FEATURES = QueryFeature.Aggregate.name() + ", " + @@ -81,7 +90,8 @@ static Mono getQueryPlanThroughGatewayAsync(Diagn IDocumentQueryClient queryClient, SqlQuerySpec sqlQuerySpec, String resourceLink, - CosmosQueryRequestOptions initialQueryRequestOptions) { + CosmosQueryRequestOptions initialQueryRequestOptions, + DocumentCollection collection) { CosmosQueryRequestOptions nonNullRequestOptions = initialQueryRequestOptions != null ? initialQueryRequestOptions @@ -89,6 +99,8 @@ static Mono getQueryPlanThroughGatewayAsync(Diagn PartitionKey partitionKey = nonNullRequestOptions.getPartitionKey(); + PartitionKeyDefinition partitionKeyDefinition = collection != null ? collection.getPartitionKey() : null; + final Map requestHeaders = new HashMap<>(); requestHeaders.put(HttpConstants.HttpHeaders.CONTENT_TYPE, RuntimeConstants.MediaTypes.JSON); requestHeaders.put(HttpConstants.HttpHeaders.IS_QUERY_PLAN_REQUEST, TRUE); @@ -106,7 +118,10 @@ static Mono getQueryPlanThroughGatewayAsync(Diagn ResourceType.Document, resourceLink, requestHeaders); - queryPlanRequest.useGatewayMode = true; + // Validation callers do not have collection metadata, so keep those query-plan + // requests on Gateway V1. Normal query execution can route through thin client, + // where PartitionKeyDefinition is available to convert proxy query ranges. + queryPlanRequest.useGatewayMode = partitionKeyDefinition == null; // Create a defensive copy to prevent concurrent modification of the shared // SqlQuerySpec's internal ObjectNode when multiple threads retrieve the query @@ -138,10 +153,29 @@ static Mono getQueryPlanThroughGatewayAsync(Diagn return BackoffRetryUtility.executeRetry(() -> { retryPolicyInstance.onBeforeSendRequest(req); return queryClient.executeQueryAsync(req).flatMap(rxDocumentServiceResponse -> { - PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = - new PartitionedQueryExecutionInfo( - (ObjectNode) rxDocumentServiceResponse.getResponseBody(), - rxDocumentServiceResponse.getGatewayHttpRequestTimeline()); + ObjectNode responseBody = (ObjectNode) rxDocumentServiceResponse.getResponseBody(); + RequestTimeline timeline = rxDocumentServiceResponse.getGatewayHttpRequestTimeline(); + + PartitionedQueryExecutionInfo partitionedQueryExecutionInfo; + + // In thin client mode, the proxy returns queryRanges in PartitionKeyInternal + // format (e.g., {"min": ["value"], "max": ["Infinity"]}). Convert to sorted + // List> with EPK hex strings and pass directly to the DTO — + // avoiding a redundant JSON round-trip. + if (req.useThinClientMode && partitionKeyDefinition == null) { + throw new IllegalStateException( + "PartitionKeyDefinition must not be null in thin client mode. " + + "Ensure DocumentCollection is resolved before calling getQueryPlanThroughGatewayAsync."); + } + + if (req.useThinClientMode) { + partitionedQueryExecutionInfo = new PartitionedQueryExecutionInfo( + responseBody, timeline, partitionKeyDefinition); + } else { + partitionedQueryExecutionInfo = new PartitionedQueryExecutionInfo( + responseBody, timeline); + } + return Mono.just(partitionedQueryExecutionInfo); }); }, retryPolicyInstance); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/PartitionKeyInternalHelper.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/PartitionKeyInternalHelper.java index 2803536084be..5a1e3c0528e1 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/PartitionKeyInternalHelper.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/PartitionKeyInternalHelper.java @@ -9,13 +9,24 @@ import com.azure.cosmos.implementation.ByteBufferOutputStream; import com.azure.cosmos.implementation.Bytes; import com.azure.cosmos.implementation.RMResources; +import com.azure.cosmos.implementation.Utils; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; public class PartitionKeyInternalHelper { + private static final Range.MinComparator MIN_COMPARATOR = new Range.MinComparator<>(); + public static final String MinimumInclusiveEffectivePartitionKey = toHexEncodedBinaryString(PartitionKeyInternal.EmptyPartitionKey.components); public static final byte[] MinimumInclusiveEffectivePartitionKeyBytes = toBinary(PartitionKeyInternal.EmptyPartitionKey.components); public static final String MaximumExclusiveEffectivePartitionKey = toHexEncodedBinaryString(PartitionKeyInternal.InfinityPartitionKey.components); @@ -294,4 +305,104 @@ static public Range getEPKRangeForPrefixPartitionKey( String maxEPK = minEPK + MaximumExclusiveEffectivePartitionKey; return new Range<>(minEPK, maxEPK, true, false); } + + /** + * Converts query ranges from PartitionKeyInternal JSON format to sorted EPK hex string ranges. + * + *

The thin client proxy returns queryRanges as PartitionKeyInternal JSON arrays + * (e.g., {@code {"min": ["value"], "max": ["Infinity"]}}). This method parses each range, + * computes the EPK hex string via {@link #getEffectivePartitionKeyString}, and sorts the + * result using {@link Range.MinComparator} to satisfy + * {@link RoutingMapProviderHelper#getOverlappingRanges} which requires sorted, non-overlapping input. + * + * @param queryRangesProperty the name of the JSON property containing the ranges array + * @param queryPlanJson the raw query plan JSON containing PartitionKeyInternal ranges + * @param partitionKeyDefinition the container's partition key definition + * @return sorted list of EPK hex string ranges; empty list if the property is absent + */ + public static List> convertToSortedEpkRanges( + String queryRangesProperty, + ObjectNode queryPlanJson, + PartitionKeyDefinition partitionKeyDefinition) { + + JsonNode queryRangesNode = queryPlanJson.get(queryRangesProperty); + if (queryRangesNode == null || !queryRangesNode.isArray()) { + String actualType = queryRangesNode == null ? "null (property absent)" : queryRangesNode.getNodeType().name(); + throw new IllegalStateException( + "Thin client proxy query plan response has missing or invalid '" + queryRangesProperty + "' property. " + + "Expected: JSON array of {min, max, isMinInclusive, isMaxInclusive} range objects. " + + "Actual node type: " + actualType + ". " + + "Response keys: " + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(queryPlanJson.fieldNames(), 0), false) + .collect(Collectors.joining(", ")) + ". " + + "This indicates a protocol mismatch between the SDK and the thin client proxy."); + } + + ArrayNode rawRanges = (ArrayNode) queryRangesNode; + List> epkRanges = new ArrayList<>(rawRanges.size()); + + for (JsonNode rangeNode : rawRanges) { + if (!rangeNode.isObject()) { + throw new IllegalStateException( + "Thin client proxy query plan response contains a non-object element in queryRanges array. " + + "Expected: JSON object with {min, max, isMinInclusive, isMaxInclusive}. " + + "Array size: " + rawRanges.size() + ". " + + "Actual node type: " + rangeNode.getNodeType().name() + "."); + } + ObjectNode rangeObj = (ObjectNode) rangeNode; + + String minEpk = partitionKeyInternalToEpkString(rangeObj.get("min"), partitionKeyDefinition); + String maxEpk = partitionKeyInternalToEpkString(rangeObj.get("max"), partitionKeyDefinition); + + JsonNode minInclusiveNode = rangeObj.get("isMinInclusive"); + JsonNode maxInclusiveNode = rangeObj.get("isMaxInclusive"); + if (minInclusiveNode == null || maxInclusiveNode == null) { + throw new IllegalStateException( + "Thin client proxy query plan range missing required fields. " + + "Expected: isMinInclusive and isMaxInclusive. " + + "Range object fields: " + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(rangeObj.fieldNames(), 0), false) + .collect(Collectors.joining(", ")) + "."); + } + boolean isMinInclusive = minInclusiveNode.asBoolean(); + boolean isMaxInclusive = maxInclusiveNode.asBoolean(); + + epkRanges.add(new Range<>(minEpk, maxEpk, isMinInclusive, isMaxInclusive)); + } + + epkRanges.sort(MIN_COMPARATOR); + return epkRanges; + } + + /** + * Converts a single PartitionKeyInternal JSON node to its EPK hex string representation. + * + * @param rangeBoundaryNode the JSON node representing a range boundary (min or max) in PartitionKeyInternal format + * @param partitionKeyDefinition the container's partition key definition + * @return the EPK hex string + */ + private static String partitionKeyInternalToEpkString(JsonNode rangeBoundaryNode, PartitionKeyDefinition partitionKeyDefinition) { + if (rangeBoundaryNode == null || rangeBoundaryNode.isNull()) { + throw new IllegalStateException( + "Thin client proxy query plan range has null boundary value. " + + "Expected: PartitionKeyInternal JSON array (e.g., [\"value\"] or [{\"type\":\"Infinity\"}])."); + } + + PartitionKeyInternal partitionKey; + try { + partitionKey = Utils.getSimpleObjectMapper().treeToValue(rangeBoundaryNode, PartitionKeyInternal.class); + } catch (JsonProcessingException e) { + throw new IllegalStateException( + "Failed to parse PartitionKeyInternal from range boundary. " + + "Boundary node type: " + rangeBoundaryNode.getNodeType().name() + ".", e); + } + + if (partitionKey.getComponents() == null) { + throw new IllegalStateException( + "Thin client proxy query plan range boundary deserialized to NonePartitionKey (null components). " + + "Boundary node type: " + rangeBoundaryNode.getNodeType().name() + "."); + } + + return getEffectivePartitionKeyString(partitionKey, partitionKeyDefinition); + } }