diff --git a/CHANGES.md b/CHANGES.md index 09e249630447..3b4edf7dcb4c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,6 +74,10 @@ * Support configuring Firestore database on ReadFn transforms (Java) ([#36904](https://github.com/apache/beam/issues/36904)). +* Moved `Row` to `apache_beam.typehints.row` to avoid import cycles and improve module organization. Kept a compatibility alias in `pvalue.py` (Python) ([#35095](https://github.com/apache/beam/issues/35095)). + + + ## Breaking Changes * (Python) Some Python dependencies have been split out into extras. To ensure all previously installed dependencies are installed, when installing Beam you can `pip install apache-beam[gcp,interactive,yaml,redis,hadoop,tfrecord]`, though most users will not need all of these extras ([#34554](https://github.com/apache/beam/issues/34554)). diff --git a/sdks/python/apache_beam/__init__.py b/sdks/python/apache_beam/__init__.py index 9906c95aee14..8e20805559cf 100644 --- a/sdks/python/apache_beam/__init__.py +++ b/sdks/python/apache_beam/__init__.py @@ -90,9 +90,9 @@ from apache_beam import version from apache_beam.pipeline import * from apache_beam.pvalue import PCollection -from apache_beam.pvalue import Row from apache_beam.pvalue import TaggedOutput from apache_beam.transforms import * +from apache_beam.typehints.row import Row try: # Add mitigation for CVE-2023-47248 while Beam allows affected versions diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index ca9a662d399e..bc1eed20e19b 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -33,7 +33,6 @@ from typing import Dict from typing import Generic from typing import Iterator -from typing import NamedTuple from typing import Optional from typing import Sequence from typing import TypeVar @@ -45,6 +44,7 @@ from apache_beam.portability import common_urns from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 +from apache_beam.typehints.row import Row if TYPE_CHECKING: from apache_beam.pipeline import AppliedPTransform @@ -636,59 +636,3 @@ class EmptySideInput(object): want to create new instances of this class themselves. """ pass - - -class Row(object): - """A dynamic schema'd row object. - - This objects attributes are initialized from the keywords passed into its - constructor, e.g. Row(x=3, y=4) will create a Row with two attributes x and y. - - More importantly, when a Row object is returned from a `Map`, `FlatMap`, or - `DoFn` type inference is able to deduce the schema of the resulting - PCollection, e.g. - - pc | beam.Map(lambda x: Row(x=x, y=0.5 * x)) - - when applied to a PCollection of ints will produce a PCollection with schema - `(x=int, y=float)`. - - Note that in Beam 2.30.0 and later, Row objects are sensitive to field order. - So `Row(x=3, y=4)` is not considered equal to `Row(y=4, x=3)`. - """ - def __init__(self, **kwargs): - self.__dict__.update(kwargs) - - def as_dict(self): - return dict(self.__dict__) - - # For compatibility with named tuples. - _asdict = as_dict - - def __iter__(self): - for _, value in self.__dict__.items(): - yield value - - def __repr__(self): - return 'Row(%s)' % ', '.join('%s=%r' % kv for kv in self.__dict__.items()) - - def __hash__(self): - return hash(self.__dict__.items()) - - def __eq__(self, other): - if type(self) == type(other): - other_dict = other.__dict__ - elif type(other) == type(NamedTuple): - other_dict = other._asdict() - else: - return False - return ( - len(self.__dict__) == len(other_dict) and - all(s == o for s, o in zip(self.__dict__.items(), other_dict.items()))) - - def __reduce__(self): - return _make_Row, tuple(self.__dict__.items()) - - -def _make_Row(*items): - return Row(**dict(items)) diff --git a/sdks/python/apache_beam/typehints/row.py b/sdks/python/apache_beam/typehints/row.py new file mode 100644 index 000000000000..6a0db2417aad --- /dev/null +++ b/sdks/python/apache_beam/typehints/row.py @@ -0,0 +1,74 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import NamedTuple + + +class Row(object): + """A dynamic schema'd row object. + + This objects attributes are initialized from the keywords passed into its + constructor, e.g. Row(x=3, y=4) will create a Row with two attributes x and y. + + More importantly, when a Row object is returned from a `Map`, `FlatMap`, or + `DoFn` type inference is able to deduce the schema of the resulting + PCollection, e.g. + + pc | beam.Map(lambda x: Row(x=x, y=0.5 * x)) + + when applied to a PCollection of ints will produce a PCollection with schema + `(x=int, y=float)`. + + Note that in Beam 2.30.0 and later, Row objects are sensitive to field order. + So `Row(x=3, y=4)` is not considered equal to `Row(y=4, x=3)`. + """ + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + + def as_dict(self): + return dict(self.__dict__) + + # For compatibility with named tuples. + _asdict = as_dict + + def __iter__(self): + for _, value in self.__dict__.items(): + yield value + + def __repr__(self): + return 'Row(%s)' % ', '.join('%s=%r' % kv for kv in self.__dict__.items()) + + def __hash__(self): + return hash(self.__dict__.items()) + + def __eq__(self, other): + if type(self) == type(other): + other_dict = other.__dict__ + elif type(other) == type(NamedTuple): + other_dict = other._asdict() + else: + return False + return ( + len(self.__dict__) == len(other_dict) and + all(s == o for s, o in zip(self.__dict__.items(), other_dict.items()))) + + def __reduce__(self): + return _make_Row, tuple(self.__dict__.items()) + + +def _make_Row(*items): + return Row(**dict(items))