Skip to content
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `UnsupportedOperationException` when using `readManyByPartitionKeys` for empty pages. - See [PR 49311](https://github.com/Azure/azure-sdk-for-java/pull/49311)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `UnsupportedOperationException` when using `readManyByPartitionKeys` for empty pages. - See [PR 49311](https://github.com/Azure/azure-sdk-for-java/pull/49311)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `UnsupportedOperationException` when using `readManyByPartitionKeys` for empty pages. - See [PR 49311](https://github.com/Azure/azure-sdk-for-java/pull/49311)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `UnsupportedOperationException` when using `readManyByPartitionKeys` for empty pages. - See [PR 49311](https://github.com/Azure/azure-sdk-for-java/pull/49311)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `UnsupportedOperationException` when using `readManyByPartitionKeys` for empty pages. - See [PR 49311](https://github.com/Azure/azure-sdk-for-java/pull/49311)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_4-1_2-13/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `UnsupportedOperationException` when using `readManyByPartitionKeys` for empty pages. - See [PR 49311](https://github.com/Azure/azure-sdk-for-java/pull/49311)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,20 @@ public void roundtrip_lastBatchNoContinuation() {

@Test(groups = { "unit" })
public void setFeedResponseContinuationToken_handlesEmptyHeadersWithoutCopyingNormalCase() {
Map<String, String> immutableEmptyHeaders = Collections.emptyMap();
// Immutable inputs are normalized to a mutable map at FeedResponse construction
// time (so the field stays final). Clearing a continuation token on an empty
// header map is a no-op and must not throw.
FeedResponse<String> emptyResponse = ModelBridgeInternal.createFeedResponse(
Collections.emptyList(),
immutableEmptyHeaders);
Collections.emptyMap());

ModelBridgeInternal.setFeedResponseContinuationToken(null, emptyResponse);

assertThat(emptyResponse.getContinuationToken()).isNull();
assertThat(emptyResponse.getResponseHeaders()).isSameAs(immutableEmptyHeaders);
assertThat(emptyResponse.getResponseHeaders()).isEmpty();

// Mutable header maps are passed through without copying, preserving the
// reference returned by getResponseHeaders().
Map<String, String> normalHeaders = new HashMap<>();
normalHeaders.put(HttpConstants.HttpHeaders.ACTIVITY_ID, "test-activity-id");
FeedResponse<String> normalResponse = ModelBridgeInternal.createFeedResponse(
Expand All @@ -150,6 +153,30 @@ public void setFeedResponseContinuationToken_handlesEmptyHeadersWithoutCopyingNo
assertThat(normalResponse.getResponseHeaders()).isSameAs(normalHeaders);
}

/**
* Reproduces the customer-reported failure path: the parallel query pipeline's
* "artificial empty page" branch (ParallelDocumentQueryExecutionContext.headerResponse)
* emits a FeedResponse whose header map is {@code Utils.immutableMapOf(...)} - i.e. a
* non-empty {@code Collections.unmodifiableMap} wrapper. When such a page reaches the
* readManyByPartitionKeys stamping lambda, setFeedResponseContinuationToken attempts
* to put the composite token into the immutable map, throwing UnsupportedOperationException.
*/
@Test(groups = { "unit" })
public void setFeedResponseContinuationToken_immutableNonEmptyHeaders_doesNotThrow() {
Map<String, String> immutableSingleEntryHeaders =
Utils.immutableMapOf(HttpConstants.HttpHeaders.REQUEST_CHARGE, "1.23");
FeedResponse<String> response = ModelBridgeInternal.createFeedResponse(
Collections.emptyList(),
immutableSingleEntryHeaders);

ModelBridgeInternal.setFeedResponseContinuationToken("composite-token", response);
Comment thread
FabianMeiswinkel marked this conversation as resolved.

assertThat(response.getContinuationToken()).isEqualTo("composite-token");
assertThat(response.getResponseHeaders())
.containsEntry(HttpConstants.HttpHeaders.CONTINUATION, "composite-token")
.containsEntry(HttpConstants.HttpHeaders.REQUEST_CHARGE, "1.23");
}

@Test(groups = { "unit" })
public void deserialize_malformedInput_throws() {
// Either the base64 decoder or the JSON parsing layer rejects garbage; both raise
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

package com.azure.cosmos.implementation;

import org.testng.annotations.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Unit tests for {@link Utils#immutableMapOf(Object, Object)} and the matching detector
* {@link Utils#isImmutableMap(Map)}. These live next to the factory so that any future
* change to the factory's runtime class shape is caught by the same regression suite.
*/
public class UtilsImmutableMapTests {

@Test(groups = { "unit" })
public void immutableMapOf_isDetectedAsImmutable() {
Map<String, String> m = Utils.immutableMapOf("k", "v");
assertThat(Utils.isImmutableMap(m)).isTrue();
}

@Test(groups = { "unit" })
public void emptyMap_isDetectedAsImmutable() {
assertThat(Utils.isImmutableMap(Collections.emptyMap())).isTrue();
}

@Test(groups = { "unit" })
public void hashMap_isNotDetectedAsImmutable() {
assertThat(Utils.isImmutableMap(new HashMap<>())).isFalse();
Map<String, String> populated = new HashMap<>();
populated.put("k", "v");
assertThat(Utils.isImmutableMap(populated)).isFalse();
}

@Test(groups = { "unit" })
public void linkedHashMap_isNotDetectedAsImmutable() {
assertThat(Utils.isImmutableMap(new LinkedHashMap<>())).isFalse();
}

@Test(groups = { "unit" })
public void nullMap_isNotDetectedAsImmutable() {
assertThat(Utils.isImmutableMap(null)).isFalse();
}
}
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed `UnsupportedOperationException` when using `readManyByPartitionKeys` for empty pages. - See [PR 49311](https://github.com/Azure/azure-sdk-for-java/pull/49311)

#### Other Changes
* Replaced per-client `Schedulers.newSingle()` schedulers in `GlobalEndpointManager` and `GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker` with shared `BoundedElastic` schedulers in `CosmosSchedulers` to prevent thread count from scaling linearly with client/tenant count. - See [PR 49062](https://github.com/Azure/azure-sdk-for-java/pull/49062)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,33 @@ public static <V> List<V> immutableListOf() {
return map;
}

// Runtime classes produced by the immutable factory methods above. Captured once at
// class-init time so that callers can perform an O(1) reference-equality check to
// decide whether they need a defensive mutable copy, without resorting to
// exception-driven probing on the hot path.
private static final Class<?> UNMODIFIABLE_MAP_CLASS =
Collections.unmodifiableMap(new HashMap<>()).getClass();
private static final Class<?> EMPTY_MAP_CLASS = Collections.emptyMap().getClass();

/**
* Returns {@code true} if {@code map} is one of the immutable map shapes produced by
* the factory methods in this class ({@link #immutableMapOf(Object, Object)}) or by
* {@link Collections#emptyMap()}. The check is a single reference comparison and is
* safe to call from hot paths.
* <p>
* Note: this is intentionally narrow - it only recognizes the wrappers the Cosmos
* pipeline actually emits. It does not attempt to recognize every possible JDK or
* third-party immutable map (e.g. {@code Map.of(...)}, Guava {@code ImmutableMap}).
* Add new sentinels here if a new immutable producer is introduced.
*/
public static boolean isImmutableMap(Map<?, ?> map) {
if (map == null) {
return false;
}
Class<?> clazz = map.getClass();
return clazz == UNMODIFIABLE_MAP_CLASS || clazz == EMPTY_MAP_CLASS;
}

public static <V> V firstOrDefault(List<V> list) {
return list.size() > 0? list.get(0) : null ;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.azure.cosmos.implementation.QueryMetricsConstants;
import com.azure.cosmos.implementation.RxDocumentServiceResponse;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.query.queryadvisor.QueryAdvice;
import com.azure.cosmos.implementation.query.QueryInfo;
Expand Down Expand Up @@ -55,19 +56,21 @@ private static ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnos
private QueryInfo queryInfo;
private QueryInfo.QueryPlanDiagnosticsContext queryPlanDiagnosticsContext;

// All header maps are produced by the SDK's own query pipeline. Non-null maps
// are always mutable (HashMap or ConcurrentHashMap) - the SDK intentionally
// allows callers to add/modify headers on FeedResponse. The only known
// exception is empty-page responses where the query pipeline may pass null.
// We do NOT clone non-null maps here to avoid unnecessary allocations on every
// FeedResponse construction - the wider blast radius of cloning (every query,
// change feed, readMany response) is not justified by the narrow null case.
// If a future code path introduces an immutable non-null header map, the
// setContinuationTokenInternal method will fail fast with
// UnsupportedOperationException, and the fix should be to make the upstream
// pipeline emit a mutable map rather than adding defensive cloning here.
// The header map stored on FeedResponse must be mutable: downstream stages
// (e.g. the readManyByPartitionKeys stamping lambda) may add or replace
// headers in place. Normalize at construction time so the field can stay
// final and getResponseHeaders() consistently returns the same instance.
// Mutable inputs are passed through without copying; null and the known
// immutable shapes produced by Utils.immutableMapOf / Collections.emptyMap
// are replaced with a fresh HashMap (preserving entries).
private static Map<String, String> ensureMutableHeadersMap(Map<String, String> headers) {
return headers == null ? new HashMap<>() : headers;
if (headers == null) {
return new HashMap<>();
}
if (Utils.isImmutableMap(headers)) {
return new HashMap<>(headers);
}
return headers;
}

FeedResponse(List<T> results, Map<String, String> headers) {
Expand Down Expand Up @@ -452,16 +455,6 @@ private void setContinuationTokenInternal(String headerName, String continuation
if (!Strings.isNullOrWhiteSpace(continuationToken)) {
this.header.put(headerName, continuationToken);
} else if (!this.header.isEmpty() && this.header.containsKey(headerName)) {
// The query API returns unmodifiable header collections for empty
// responses (no documents returned - when only header set is request charge)
// the protection here to check for existence of the header before attempting
// to remove it would not be robust enough against unknown headers
// but since we only ever call our own query pipeline
// avoiding cloning in all cases and gating on continuation header
// existence is a reasonable trade-off - test coverage exists that uncovered
// the problem - so, this acts as regression test as well
// --> the test coverage is in ItemsPartitionReaderWithReadManyByPartitionKeyITest
// it should "return empty results for non-existent partition keys"
this.header.remove(headerName);
}
}
Expand Down
Loading