From d75ea9d65aba52d9a0b8fed62184efd61d45715d Mon Sep 17 00:00:00 2001 From: ffccites <99155080+PDGGK@users.noreply.github.com> Date: Fri, 6 Feb 2026 10:29:02 +0800 Subject: [PATCH 1/3] [Java SDK] Warn when ValueState contains collection types When users declare ValueState, ValueState, or ValueState, log a warning suggesting they use MapState, BagState, or SetState instead. Storing collections in ValueState requires reading and writing the entire collection on each access, which can cause performance issues for large collections. The specialized state types provide better performance. Fixes #36746 --- .../transforms/reflect/DoFnSignatures.java | 69 +++++++++++++++++++ .../reflect/DoFnSignaturesTest.java | 61 ++++++++++++++++ 2 files changed, 130 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 310736c014cc..bda1beb2b71c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -104,6 +104,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Utilities for working with {@link DoFnSignature}. See {@link #getSignature}. */ @Internal @@ -113,6 +115,8 @@ }) public class DoFnSignatures { + private static final Logger LOG = LoggerFactory.getLogger(DoFnSignatures.class); + private DoFnSignatures() {} /** @@ -2327,12 +2331,77 @@ private static Map analyzeStateDeclarati (TypeDescriptor) TypeDescriptor.of(fnClazz).resolveType(unresolvedStateType); + // Warn if ValueState contains a collection type that could benefit from specialized state + warnIfValueStateContainsCollection(fnClazz, id, stateType); + declarations.put(id, DoFnSignature.StateDeclaration.create(id, field, stateType)); } return ImmutableMap.copyOf(declarations); } + /** + * Warns if a ValueState is declared with a collection type (Map, List, Set) that could benefit + * from using specialized state types (MapState, BagState, SetState) for better performance. + */ + private static void warnIfValueStateContainsCollection( + Class fnClazz, String stateId, TypeDescriptor stateType) { + if (!stateType.isSubtypeOf(TypeDescriptor.of(ValueState.class))) { + return; + } + + try { + // Get the type directly and extract ValueState's type parameter + Type type = stateType.getType(); + if (!(type instanceof ParameterizedType)) { + return; + } + + // Find ValueState in the type hierarchy and get its type argument + Type valueType = null; + ParameterizedType pType = (ParameterizedType) type; + if (pType.getRawType() == ValueState.class) { + valueType = pType.getActualTypeArguments()[0]; + } else { + // For subtypes of ValueState, we need to resolve the type parameter + return; + } + + if (valueType == null + || valueType instanceof java.lang.reflect.TypeVariable + || valueType instanceof java.lang.reflect.WildcardType) { + // Cannot determine actual type, skip warning + return; + } + + TypeDescriptor valueTypeDescriptor = TypeDescriptor.of(valueType); + Class rawType = valueTypeDescriptor.getRawType(); + + String recommendation = null; + if (Map.class.isAssignableFrom(rawType)) { + recommendation = "MapState"; + } else if (List.class.isAssignableFrom(rawType)) { + recommendation = "BagState or OrderedListState"; + } else if (java.util.Set.class.isAssignableFrom(rawType)) { + recommendation = "SetState"; + } + + if (recommendation != null) { + LOG.warn( + "DoFn {} declares ValueState '{}' with type {}. " + + "Storing collections in ValueState requires reading and writing the entire " + + "collection on each access, which can cause performance issues. " + + "Consider using {} instead for better performance with large collections.", + fnClazz.getSimpleName(), + stateId, + rawType.getSimpleName(), + recommendation); + } + } catch (Exception e) { + // If we can't determine the type, don't warn - it's just an optimization hint + } + } + private static @Nullable Method findAnnotatedMethod( ErrorReporter errors, Class anno, Class fnClazz, boolean required) { Collection matches = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index de4a622e03d7..a394b23cd7a0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -1700,4 +1700,65 @@ public void onMyTimer() {} @Override public void processWithTimer(ProcessContext context, Timer timer) {} } + + // Test DoFns for ValueState collection warning tests + private static class DoFnWithMapValueState extends DoFn { + @StateId("mapState") + private final StateSpec>> mapState = + StateSpecs.value(); + + @ProcessElement + public void process() {} + } + + private static class DoFnWithListValueState extends DoFn { + @StateId("listState") + private final StateSpec>> listState = StateSpecs.value(); + + @ProcessElement + public void process() {} + } + + private static class DoFnWithSetValueState extends DoFn { + @StateId("setState") + private final StateSpec>> setState = StateSpecs.value(); + + @ProcessElement + public void process() {} + } + + private static class DoFnWithSimpleValueState extends DoFn { + @StateId("simpleState") + private final StateSpec> simpleState = StateSpecs.value(); + + @ProcessElement + public void process() {} + } + + @Test + public void testValueStateWithMapLogsWarning() { + // This test verifies that the signature can be parsed for DoFns with collection ValueState. + // The warning is logged but doesn't prevent the signature from being created. + DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithMapValueState.class); + assertThat(signature.stateDeclarations().get("mapState"), notNullValue()); + } + + @Test + public void testValueStateWithListLogsWarning() { + DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithListValueState.class); + assertThat(signature.stateDeclarations().get("listState"), notNullValue()); + } + + @Test + public void testValueStateWithSetLogsWarning() { + DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithSetValueState.class); + assertThat(signature.stateDeclarations().get("setState"), notNullValue()); + } + + @Test + public void testValueStateWithSimpleTypeNoWarning() { + // Simple types should not trigger any warning + DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithSimpleValueState.class); + assertThat(signature.stateDeclarations().get("simpleState"), notNullValue()); + } } From 390f421ec6fb5d14079189f424671a05ed807b86 Mon Sep 17 00:00:00 2001 From: ffccites <99155080+PDGGK@users.noreply.github.com> Date: Sat, 7 Feb 2026 13:55:31 +0800 Subject: [PATCH 2/3] Refine ValueState collection hint: downgrade to INFO and clarify trade-offs - Change LOG.warn to LOG.info (performance hint, not correctness issue) - Clarify that ValueState is appropriate for small collections or atomic replacement - Add runner compatibility caveat for specialized state types - Address community feedback from @Eliaaazzz --- .../beam/sdk/transforms/reflect/DoFnSignatures.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index bda1beb2b71c..96b83dc656e4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -2387,11 +2387,12 @@ private static void warnIfValueStateContainsCollection( } if (recommendation != null) { - LOG.warn( - "DoFn {} declares ValueState '{}' with type {}. " - + "Storing collections in ValueState requires reading and writing the entire " - + "collection on each access, which can cause performance issues. " - + "Consider using {} instead for better performance with large collections.", + LOG.info( + "DoFn {} declares ValueState '{}' with collection type {}. " + + "ValueState reads/writes the entire collection on each access. " + + "This is appropriate for small collections or atomic replacement. " + + "For large collections or frequent appends, consider using {} instead " + + "(if supported by your runner).", fnClazz.getSimpleName(), stateId, rawType.getSimpleName(), From 877547d67d251cc973c496ab95f327af6e36df31 Mon Sep 17 00:00:00 2001 From: ffccites <99155080+PDGGK@users.noreply.github.com> Date: Sat, 7 Feb 2026 14:51:57 +0800 Subject: [PATCH 3/3] Address reviewer feedback: use TypeDescriptor API throughout - Use TypeDescriptor.resolveType() instead of raw Type manipulation - Use hasUnresolvedParameters() instead of instanceof checks - Use isSubtypeOf() for collection type detection - Remove catch-all Exception block entirely Addresses @kennknowles code review comments --- .../transforms/reflect/DoFnSignatures.java | 77 +++++++------------ 1 file changed, 29 insertions(+), 48 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 96b83dc656e4..03de49445e2d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -2350,56 +2350,37 @@ private static void warnIfValueStateContainsCollection( return; } - try { - // Get the type directly and extract ValueState's type parameter - Type type = stateType.getType(); - if (!(type instanceof ParameterizedType)) { - return; - } - - // Find ValueState in the type hierarchy and get its type argument - Type valueType = null; - ParameterizedType pType = (ParameterizedType) type; - if (pType.getRawType() == ValueState.class) { - valueType = pType.getActualTypeArguments()[0]; - } else { - // For subtypes of ValueState, we need to resolve the type parameter - return; - } + // Use TypeDescriptor.resolveType() to extract ValueState's type parameter + // This preserves generic type information better than raw Type manipulation + TypeDescriptor valueTypeDescriptor = + stateType.resolveType(ValueState.class.getTypeParameters()[0]); - if (valueType == null - || valueType instanceof java.lang.reflect.TypeVariable - || valueType instanceof java.lang.reflect.WildcardType) { - // Cannot determine actual type, skip warning - return; - } - - TypeDescriptor valueTypeDescriptor = TypeDescriptor.of(valueType); - Class rawType = valueTypeDescriptor.getRawType(); - - String recommendation = null; - if (Map.class.isAssignableFrom(rawType)) { - recommendation = "MapState"; - } else if (List.class.isAssignableFrom(rawType)) { - recommendation = "BagState or OrderedListState"; - } else if (java.util.Set.class.isAssignableFrom(rawType)) { - recommendation = "SetState"; - } + // Skip if the type has unresolved parameters (e.g., TypeVariable, WildcardType) + if (valueTypeDescriptor.hasUnresolvedParameters()) { + return; + } - if (recommendation != null) { - LOG.info( - "DoFn {} declares ValueState '{}' with collection type {}. " - + "ValueState reads/writes the entire collection on each access. " - + "This is appropriate for small collections or atomic replacement. " - + "For large collections or frequent appends, consider using {} instead " - + "(if supported by your runner).", - fnClazz.getSimpleName(), - stateId, - rawType.getSimpleName(), - recommendation); - } - } catch (Exception e) { - // If we can't determine the type, don't warn - it's just an optimization hint + // Use TypeDescriptor.isSubtypeOf() for type checking - stays in TypeDescriptor API + String recommendation = null; + if (valueTypeDescriptor.isSubtypeOf(TypeDescriptor.of(Map.class))) { + recommendation = "MapState"; + } else if (valueTypeDescriptor.isSubtypeOf(TypeDescriptor.of(List.class))) { + recommendation = "BagState or OrderedListState"; + } else if (valueTypeDescriptor.isSubtypeOf(TypeDescriptor.of(java.util.Set.class))) { + recommendation = "SetState"; + } + + if (recommendation != null) { + LOG.info( + "DoFn {} declares ValueState '{}' with collection type {}. " + + "ValueState reads/writes the entire collection on each access. " + + "This is appropriate for small collections or atomic replacement. " + + "For large collections or frequent appends, consider using {} instead " + + "(if supported by your runner).", + fnClazz.getSimpleName(), + stateId, + valueTypeDescriptor.getRawType().getSimpleName(), + recommendation); } }