From 00bb7b59b3d8ca0d50b0d6c484df095cf40cd85e Mon Sep 17 00:00:00 2001 From: Nikita Grover Date: Wed, 10 Dec 2025 14:37:39 +0530 Subject: [PATCH 1/7] Move Row to typehints/row.py, add compatibility alias in pvalue.py, update imports, and update CHANGES.md (Fixes #35095) --- CHANGES.md | 3 + sdks/python/apache_beam/__init__.py | 2 +- sdks/python/apache_beam/pvalue.py | 2 +- sdks/python/apache_beam/typehints/row.py | 74 ++++++++++++++++++++++++ 4 files changed, 79 insertions(+), 2 deletions(-) create mode 100644 sdks/python/apache_beam/typehints/row.py diff --git a/CHANGES.md b/CHANGES.md index 09e249630447..317dc75b8656 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,6 +74,9 @@ * Support configuring Firestore database on ReadFn transforms (Java) ([#36904](https://github.com/apache/beam/issues/36904)). +* (Python) Moved `Row` to `apache_beam.typehints.row` to avoid import cycles and improve module organization. Kept a compatibility alias in `pvalue.py`. (Fixes #35095) + + ## Breaking Changes * (Python) Some Python dependencies have been split out into extras. To ensure all previously installed dependencies are installed, when installing Beam you can `pip install apache-beam[gcp,interactive,yaml,redis,hadoop,tfrecord]`, though most users will not need all of these extras ([#34554](https://github.com/apache/beam/issues/34554)). diff --git a/sdks/python/apache_beam/__init__.py b/sdks/python/apache_beam/__init__.py index 9906c95aee14..8e20805559cf 100644 --- a/sdks/python/apache_beam/__init__.py +++ b/sdks/python/apache_beam/__init__.py @@ -90,9 +90,9 @@ from apache_beam import version from apache_beam.pipeline import * from apache_beam.pvalue import PCollection -from apache_beam.pvalue import Row from apache_beam.pvalue import TaggedOutput from apache_beam.transforms import * +from apache_beam.typehints.row import Row try: # Add mitigation for CVE-2023-47248 while Beam allows affected versions diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index ca9a662d399e..deb37dbbb81a 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -33,7 +33,6 @@ from typing import Dict from typing import Generic from typing import Iterator -from typing import NamedTuple from typing import Optional from typing import Sequence from typing import TypeVar @@ -45,6 +44,7 @@ from apache_beam.portability import common_urns from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 +from apache_beam.typehints.row import Row, _make_Row if TYPE_CHECKING: from apache_beam.pipeline import AppliedPTransform diff --git a/sdks/python/apache_beam/typehints/row.py b/sdks/python/apache_beam/typehints/row.py new file mode 100644 index 000000000000..6a0db2417aad --- /dev/null +++ b/sdks/python/apache_beam/typehints/row.py @@ -0,0 +1,74 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import NamedTuple + + +class Row(object): + """A dynamic schema'd row object. + + This objects attributes are initialized from the keywords passed into its + constructor, e.g. Row(x=3, y=4) will create a Row with two attributes x and y. + + More importantly, when a Row object is returned from a `Map`, `FlatMap`, or + `DoFn` type inference is able to deduce the schema of the resulting + PCollection, e.g. + + pc | beam.Map(lambda x: Row(x=x, y=0.5 * x)) + + when applied to a PCollection of ints will produce a PCollection with schema + `(x=int, y=float)`. + + Note that in Beam 2.30.0 and later, Row objects are sensitive to field order. + So `Row(x=3, y=4)` is not considered equal to `Row(y=4, x=3)`. + """ + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + + def as_dict(self): + return dict(self.__dict__) + + # For compatibility with named tuples. + _asdict = as_dict + + def __iter__(self): + for _, value in self.__dict__.items(): + yield value + + def __repr__(self): + return 'Row(%s)' % ', '.join('%s=%r' % kv for kv in self.__dict__.items()) + + def __hash__(self): + return hash(self.__dict__.items()) + + def __eq__(self, other): + if type(self) == type(other): + other_dict = other.__dict__ + elif type(other) == type(NamedTuple): + other_dict = other._asdict() + else: + return False + return ( + len(self.__dict__) == len(other_dict) and + all(s == o for s, o in zip(self.__dict__.items(), other_dict.items()))) + + def __reduce__(self): + return _make_Row, tuple(self.__dict__.items()) + + +def _make_Row(*items): + return Row(**dict(items)) From 97c6d2efaa01d5d2efb67f0fe4ed7543ac17e74f Mon Sep 17 00:00:00 2001 From: Nikita Grover <145201799+nikitagrover19@users.noreply.github.com> Date: Wed, 10 Dec 2025 18:05:16 +0530 Subject: [PATCH 2/7] Update pvalue.py --- sdks/python/apache_beam/pvalue.py | 54 ------------------------------- 1 file changed, 54 deletions(-) diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index deb37dbbb81a..8a9f040a6899 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -638,57 +638,3 @@ class EmptySideInput(object): pass -class Row(object): - """A dynamic schema'd row object. - - This objects attributes are initialized from the keywords passed into its - constructor, e.g. Row(x=3, y=4) will create a Row with two attributes x and y. - - More importantly, when a Row object is returned from a `Map`, `FlatMap`, or - `DoFn` type inference is able to deduce the schema of the resulting - PCollection, e.g. - - pc | beam.Map(lambda x: Row(x=x, y=0.5 * x)) - - when applied to a PCollection of ints will produce a PCollection with schema - `(x=int, y=float)`. - - Note that in Beam 2.30.0 and later, Row objects are sensitive to field order. - So `Row(x=3, y=4)` is not considered equal to `Row(y=4, x=3)`. - """ - def __init__(self, **kwargs): - self.__dict__.update(kwargs) - - def as_dict(self): - return dict(self.__dict__) - - # For compatibility with named tuples. - _asdict = as_dict - - def __iter__(self): - for _, value in self.__dict__.items(): - yield value - - def __repr__(self): - return 'Row(%s)' % ', '.join('%s=%r' % kv for kv in self.__dict__.items()) - - def __hash__(self): - return hash(self.__dict__.items()) - - def __eq__(self, other): - if type(self) == type(other): - other_dict = other.__dict__ - elif type(other) == type(NamedTuple): - other_dict = other._asdict() - else: - return False - return ( - len(self.__dict__) == len(other_dict) and - all(s == o for s, o in zip(self.__dict__.items(), other_dict.items()))) - - def __reduce__(self): - return _make_Row, tuple(self.__dict__.items()) - - -def _make_Row(*items): - return Row(**dict(items)) From 71243c3fba48868c5a0c8740a9de36feec5bce88 Mon Sep 17 00:00:00 2001 From: Nikita Grover <145201799+nikitagrover19@users.noreply.github.com> Date: Wed, 10 Dec 2025 19:14:08 +0530 Subject: [PATCH 3/7] Update pvalue.py --- sdks/python/apache_beam/pvalue.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 8a9f040a6899..50afb8c5f835 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -636,5 +636,3 @@ class EmptySideInput(object): want to create new instances of this class themselves. """ pass - - From e587f05e51534668b7afc6b06014ec52c7d2be39 Mon Sep 17 00:00:00 2001 From: Nikita Grover <145201799+nikitagrover19@users.noreply.github.com> Date: Wed, 10 Dec 2025 19:15:21 +0530 Subject: [PATCH 4/7] Update CHANGES.md --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 317dc75b8656..a9d184de87a0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,7 +74,7 @@ * Support configuring Firestore database on ReadFn transforms (Java) ([#36904](https://github.com/apache/beam/issues/36904)). -* (Python) Moved `Row` to `apache_beam.typehints.row` to avoid import cycles and improve module organization. Kept a compatibility alias in `pvalue.py`. (Fixes #35095) +* (Python) Moved `Row` to `apache_beam.typehints.row` to avoid import cycles and improve module organization. ([#35095](https://github.com/apache/beam/issues/35095)) ## Breaking Changes From af3e9b9777a78dea024efc4bd84c2f269e2b8965 Mon Sep 17 00:00:00 2001 From: Nikita Grover Date: Wed, 10 Dec 2025 23:11:42 +0530 Subject: [PATCH 5/7] Remove unused _make_Row import and update CHANGES.md --- CHANGES.md | 3 ++- sdks/python/apache_beam/pvalue.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a9d184de87a0..3b4edf7dcb4c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,7 +74,8 @@ * Support configuring Firestore database on ReadFn transforms (Java) ([#36904](https://github.com/apache/beam/issues/36904)). -* (Python) Moved `Row` to `apache_beam.typehints.row` to avoid import cycles and improve module organization. ([#35095](https://github.com/apache/beam/issues/35095)) +* Moved `Row` to `apache_beam.typehints.row` to avoid import cycles and improve module organization. Kept a compatibility alias in `pvalue.py` (Python) ([#35095](https://github.com/apache/beam/issues/35095)). + ## Breaking Changes diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 50afb8c5f835..bc1eed20e19b 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -44,7 +44,7 @@ from apache_beam.portability import common_urns from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 -from apache_beam.typehints.row import Row, _make_Row +from apache_beam.typehints.row import Row if TYPE_CHECKING: from apache_beam.pipeline import AppliedPTransform From b70ddb501ed62b91cc5d7be48b3e909b1d89bcdc Mon Sep 17 00:00:00 2001 From: Nikita Grover <145201799+nikitagrover19@users.noreply.github.com> Date: Thu, 11 Dec 2025 00:41:12 +0530 Subject: [PATCH 6/7] Update __init__.py --- sdks/python/apache_beam/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/__init__.py b/sdks/python/apache_beam/__init__.py index 8e20805559cf..0399f86d5f0c 100644 --- a/sdks/python/apache_beam/__init__.py +++ b/sdks/python/apache_beam/__init__.py @@ -89,10 +89,11 @@ from apache_beam import typehints from apache_beam import version from apache_beam.pipeline import * +from apache_beam.typehints.row import Row from apache_beam.pvalue import PCollection from apache_beam.pvalue import TaggedOutput from apache_beam.transforms import * -from apache_beam.typehints.row import Row + try: # Add mitigation for CVE-2023-47248 while Beam allows affected versions From 4bdbd0a593462b299545bd6268df21dc5d1b17c2 Mon Sep 17 00:00:00 2001 From: Nikita Grover Date: Fri, 12 Dec 2025 22:16:31 +0530 Subject: [PATCH 7/7] Fix yapf formatting in __init__.py --- sdks/python/apache_beam/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/__init__.py b/sdks/python/apache_beam/__init__.py index 0399f86d5f0c..8e20805559cf 100644 --- a/sdks/python/apache_beam/__init__.py +++ b/sdks/python/apache_beam/__init__.py @@ -89,11 +89,10 @@ from apache_beam import typehints from apache_beam import version from apache_beam.pipeline import * -from apache_beam.typehints.row import Row from apache_beam.pvalue import PCollection from apache_beam.pvalue import TaggedOutput from apache_beam.transforms import * - +from apache_beam.typehints.row import Row try: # Add mitigation for CVE-2023-47248 while Beam allows affected versions