-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Description
What happened?
There is a pattern in core transforms where typevars are used as typehints for core transforms along with decorator type hints e.g.
T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')
@with_input_types(tuple[K, V])
@with_output_types(tuple[K, V])
class PerKey(ptransform.PTransform):
"""Compute elements with the latest timestamp for each key
from a keyed PCollection"""
@staticmethod
def add_timestamp(element, timestamp=core.DoFn.TimestampParam):
key, value = element
return [(key, (value, timestamp))]
def expand(self, pcoll):
return (
pcoll
| core.ParDo(self.add_timestamp).with_output_types(
tuple[K, tuple[T, TimestampType]])
| core.CombinePerKey(LatestCombineFn()))
The @with_input_types suggest that the input type to the root ParDo of this transform will be tuple[K,V], but the decorator approach does not pass typehints to root transforms.
As a result when Pipeline._infer_result_type is invoked for the ParDo, this line returns false
beam/sdks/python/apache_beam/pipeline.py
Line 924 in 5ffd998
| if input_types and input_types[0]: |
See https://stackoverflow.com/questions/79797891/apache-beam-2-68-0-throws-using-fallback-deterministic-coder-for-type-warning for an example where user's coder is not used because type hints are lost via this mechanism.
There are a couple of potential fixes
#1 - explicitly set with_input_type and with_output_type on all core transforms in the SDK.
#2 - pass the decorated input type hint to all root transforms in a composite transform. In the above example decorating @with_input_types(tuple[K, V]) means all root transforms should also have this type hint.
Both of these are breaking changes, so it might be good to audit all of the core transforms for where typehints are lost and do the breaking change in the same version?
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner