Skip to content

[Bug]: Type hints lost in core transforms #36775

@claudevdm

Description

@claudevdm

What happened?

There is a pattern in core transforms where typevars are used as typehints for core transforms along with decorator type hints e.g.

T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')

@with_input_types(tuple[K, V])
@with_output_types(tuple[K, V]) 
class PerKey(ptransform.PTransform):
    """Compute elements with the latest timestamp for each key
    from a keyed PCollection"""
    @staticmethod
    def add_timestamp(element, timestamp=core.DoFn.TimestampParam):
      key, value = element
      return [(key, (value, timestamp))]

    def expand(self, pcoll):
      return (
          pcoll
          | core.ParDo(self.add_timestamp).with_output_types(
              tuple[K, tuple[T, TimestampType]])
          | core.CombinePerKey(LatestCombineFn()))

The @with_input_types suggest that the input type to the root ParDo of this transform will be tuple[K,V], but the decorator approach does not pass typehints to root transforms.

As a result when Pipeline._infer_result_type is invoked for the ParDo, this line returns false

if input_types and input_types[0]:
and then the resulting element type falls back to Any.

See https://stackoverflow.com/questions/79797891/apache-beam-2-68-0-throws-using-fallback-deterministic-coder-for-type-warning for an example where user's coder is not used because type hints are lost via this mechanism.

There are a couple of potential fixes
#1 - explicitly set with_input_type and with_output_type on all core transforms in the SDK.
#2 - pass the decorated input type hint to all root transforms in a composite transform. In the above example decorating @with_input_types(tuple[K, V]) means all root transforms should also have this type hint.

Both of these are breaking changes, so it might be good to audit all of the core transforms for where typehints are lost and do the breaking change in the same version?

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions