diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 310736c014cc..03de49445e2d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -104,6 +104,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Utilities for working with {@link DoFnSignature}. See {@link #getSignature}. */ @Internal @@ -113,6 +115,8 @@ }) public class DoFnSignatures { + private static final Logger LOG = LoggerFactory.getLogger(DoFnSignatures.class); + private DoFnSignatures() {} /** @@ -2327,12 +2331,59 @@ private static Map analyzeStateDeclarati (TypeDescriptor) TypeDescriptor.of(fnClazz).resolveType(unresolvedStateType); + // Warn if ValueState contains a collection type that could benefit from specialized state + warnIfValueStateContainsCollection(fnClazz, id, stateType); + declarations.put(id, DoFnSignature.StateDeclaration.create(id, field, stateType)); } return ImmutableMap.copyOf(declarations); } + /** + * Warns if a ValueState is declared with a collection type (Map, List, Set) that could benefit + * from using specialized state types (MapState, BagState, SetState) for better performance. + */ + private static void warnIfValueStateContainsCollection( + Class fnClazz, String stateId, TypeDescriptor stateType) { + if (!stateType.isSubtypeOf(TypeDescriptor.of(ValueState.class))) { + return; + } + + // Use TypeDescriptor.resolveType() to extract ValueState's type parameter + // This preserves generic type information better than raw Type manipulation + TypeDescriptor valueTypeDescriptor = + stateType.resolveType(ValueState.class.getTypeParameters()[0]); + + // Skip if the type has unresolved parameters (e.g., TypeVariable, WildcardType) + if (valueTypeDescriptor.hasUnresolvedParameters()) { + return; + } + + // Use TypeDescriptor.isSubtypeOf() for type checking - stays in TypeDescriptor API + String recommendation = null; + if (valueTypeDescriptor.isSubtypeOf(TypeDescriptor.of(Map.class))) { + recommendation = "MapState"; + } else if (valueTypeDescriptor.isSubtypeOf(TypeDescriptor.of(List.class))) { + recommendation = "BagState or OrderedListState"; + } else if (valueTypeDescriptor.isSubtypeOf(TypeDescriptor.of(java.util.Set.class))) { + recommendation = "SetState"; + } + + if (recommendation != null) { + LOG.info( + "DoFn {} declares ValueState '{}' with collection type {}. " + + "ValueState reads/writes the entire collection on each access. " + + "This is appropriate for small collections or atomic replacement. " + + "For large collections or frequent appends, consider using {} instead " + + "(if supported by your runner).", + fnClazz.getSimpleName(), + stateId, + valueTypeDescriptor.getRawType().getSimpleName(), + recommendation); + } + } + private static @Nullable Method findAnnotatedMethod( ErrorReporter errors, Class anno, Class fnClazz, boolean required) { Collection matches = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index de4a622e03d7..a394b23cd7a0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -1700,4 +1700,65 @@ public void onMyTimer() {} @Override public void processWithTimer(ProcessContext context, Timer timer) {} } + + // Test DoFns for ValueState collection warning tests + private static class DoFnWithMapValueState extends DoFn { + @StateId("mapState") + private final StateSpec>> mapState = + StateSpecs.value(); + + @ProcessElement + public void process() {} + } + + private static class DoFnWithListValueState extends DoFn { + @StateId("listState") + private final StateSpec>> listState = StateSpecs.value(); + + @ProcessElement + public void process() {} + } + + private static class DoFnWithSetValueState extends DoFn { + @StateId("setState") + private final StateSpec>> setState = StateSpecs.value(); + + @ProcessElement + public void process() {} + } + + private static class DoFnWithSimpleValueState extends DoFn { + @StateId("simpleState") + private final StateSpec> simpleState = StateSpecs.value(); + + @ProcessElement + public void process() {} + } + + @Test + public void testValueStateWithMapLogsWarning() { + // This test verifies that the signature can be parsed for DoFns with collection ValueState. + // The warning is logged but doesn't prevent the signature from being created. + DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithMapValueState.class); + assertThat(signature.stateDeclarations().get("mapState"), notNullValue()); + } + + @Test + public void testValueStateWithListLogsWarning() { + DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithListValueState.class); + assertThat(signature.stateDeclarations().get("listState"), notNullValue()); + } + + @Test + public void testValueStateWithSetLogsWarning() { + DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithSetValueState.class); + assertThat(signature.stateDeclarations().get("setState"), notNullValue()); + } + + @Test + public void testValueStateWithSimpleTypeNoWarning() { + // Simple types should not trigger any warning + DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithSimpleValueState.class); + assertThat(signature.stateDeclarations().get("simpleState"), notNullValue()); + } }