From 49ba1ffc9f306d1c2567218645746d78d1960314 Mon Sep 17 00:00:00 2001 From: Lalit Yadav Date: Tue, 16 Jun 2026 09:51:46 -0500 Subject: [PATCH 1/3] Add publish_time_field to ReadFromPubSub YAML transform --- sdks/python/apache_beam/yaml/yaml_io.py | 16 +++++++-- sdks/python/apache_beam/yaml/yaml_io_test.py | 38 ++++++++++++++++++++ 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 989661a6eae4..2ed6060cd108 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -46,6 +46,7 @@ from apache_beam.io.gcp.bigquery import BigQueryDisposition from apache_beam.portability.api import schema_pb2 from apache_beam.typehints import schemas +from apache_beam.utils.timestamp import Timestamp from apache_beam.yaml import json_utils from apache_beam.yaml import yaml_errors from apache_beam.yaml import yaml_provider @@ -316,7 +317,8 @@ def read_from_pubsub( attributes: Optional[Iterable[str]] = None, attributes_map: Optional[str] = None, id_attribute: Optional[str] = None, - timestamp_attribute: Optional[str] = None): + timestamp_attribute: Optional[str] = None, + publish_time_field: Optional[str] = None): """Reads messages from Cloud Pub/Sub. Args: @@ -366,6 +368,8 @@ def read_from_pubsub( ``2015-10-29T23:41:41.123Z``. The sub-second component of the timestamp is optional, and digits beyond the first three (i.e., time units smaller than milliseconds) may be ignored. + publish_time_field: Field to add to output messages with the Pub/Sub + message publish time. If None, no such field is added. """ if topic and subscription: raise TypeError('Only one of topic and subscription may be specified.') @@ -373,7 +377,7 @@ def read_from_pubsub( raise TypeError('One of topic or subscription may be specified.') payload_schema, parser = _create_parser(format, schema) extra_fields: list[schema_pb2.Field] = [] - if not attributes and not attributes_map: + if not attributes and not attributes_map and not publish_time_field: mapper = lambda msg: parser(msg) else: if isinstance(attributes, str): @@ -384,6 +388,8 @@ def read_from_pubsub( if attributes_map: extra_fields.append( schemas.schema_field(attributes_map, Mapping[str, str])) + if publish_time_field: + extra_fields.append(schemas.schema_field(publish_time_field, Timestamp)) def mapper(msg): values = parser(msg.data).as_dict() @@ -393,6 +399,9 @@ def mapper(msg): values[attr] = msg.attributes[attr] if attributes_map: values[attributes_map] = msg.attributes + if publish_time_field: + values[publish_time_field] = Timestamp.from_utc_datetime( + msg.publish_time) return beam.Row(**values) output = ( @@ -400,7 +409,8 @@ def mapper(msg): | beam.io.ReadFromPubSub( topic=topic, subscription=subscription, - with_attributes=bool(attributes or attributes_map), + with_attributes=bool( + attributes or attributes_map or publish_time_field), id_label=id_attribute, timestamp_attribute=timestamp_attribute) | 'ParseMessage' >> beam.Map(mapper)) diff --git a/sdks/python/apache_beam/yaml/yaml_io_test.py b/sdks/python/apache_beam/yaml/yaml_io_test.py index e6219277bf58..35dafae21872 100644 --- a/sdks/python/apache_beam/yaml/yaml_io_test.py +++ b/sdks/python/apache_beam/yaml/yaml_io_test.py @@ -15,6 +15,7 @@ # limitations under the License. # +import datetime import io import json import logging @@ -32,6 +33,7 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.typehints import schemas as schema_utils +from apache_beam.utils.timestamp import Timestamp from apache_beam.yaml.yaml_transform import YamlTransform try: @@ -181,6 +183,42 @@ def test_read_with_attribute_map(self): beam.Row(payload=b'msg2', attrMap={'attr': 'value2'}) ])) + def test_read_with_publish_time_field(self): + publish_time_1 = datetime.datetime( + 2018, 3, 12, 13, 37, 1, 234567, tzinfo=datetime.timezone.utc) + publish_time_2 = datetime.datetime( + 2018, 3, 12, 13, 38, 2, 345678, tzinfo=datetime.timezone.utc) + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + with mock.patch( + 'apache_beam.io.ReadFromPubSub', + FakeReadFromPubSub( + topic='my_topic', + messages=[ + PubsubMessage( + b'msg1', {'attr': 'value1'}, publish_time=publish_time_1), + PubsubMessage( + b'msg2', {'attr': 'value2'}, publish_time=publish_time_2) + ])): + result = p | YamlTransform( + ''' + type: ReadFromPubSub + config: + topic: my_topic + format: RAW + publish_time_field: publish_time + ''') + assert_that( + result, + equal_to([ + beam.Row( + payload=b'msg1', + publish_time=Timestamp.from_utc_datetime(publish_time_1)), + beam.Row( + payload=b'msg2', + publish_time=Timestamp.from_utc_datetime(publish_time_2)) + ])) + def test_read_with_id_attribute(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: From 7e3795821c852b96fb1b01f7d730c93c5a5aa67d Mon Sep 17 00:00:00 2001 From: Lalit Yadav Date: Tue, 16 Jun 2026 10:30:15 -0500 Subject: [PATCH 2/3] Format YAML PubSub publish time test --- sdks/python/apache_beam/yaml/yaml_io_test.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_io_test.py b/sdks/python/apache_beam/yaml/yaml_io_test.py index 35dafae21872..4541bc48bb2a 100644 --- a/sdks/python/apache_beam/yaml/yaml_io_test.py +++ b/sdks/python/apache_beam/yaml/yaml_io_test.py @@ -190,16 +190,14 @@ def test_read_with_publish_time_field(self): 2018, 3, 12, 13, 38, 2, 345678, tzinfo=datetime.timezone.utc) with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: - with mock.patch( - 'apache_beam.io.ReadFromPubSub', - FakeReadFromPubSub( - topic='my_topic', - messages=[ - PubsubMessage( - b'msg1', {'attr': 'value1'}, publish_time=publish_time_1), - PubsubMessage( - b'msg2', {'attr': 'value2'}, publish_time=publish_time_2) - ])): + with mock.patch('apache_beam.io.ReadFromPubSub', + FakeReadFromPubSub( + topic='my_topic', + messages=[PubsubMessage(b'msg1', {'attr': 'value1'}, + publish_time=publish_time_1), + PubsubMessage(b'msg2', {'attr': 'value2'}, + publish_time=publish_time_2) + ])): result = p | YamlTransform( ''' type: ReadFromPubSub From e9f322227f2515d7eadcc3c19963f42cb2a150b6 Mon Sep 17 00:00:00 2001 From: Lalit Yadav Date: Tue, 16 Jun 2026 12:39:38 -0500 Subject: [PATCH 3/3] Fix YAML PubSub publish time handling --- sdks/python/apache_beam/yaml/yaml_io.py | 19 ++++-- sdks/python/apache_beam/yaml/yaml_io_test.py | 71 +++++++++++++++++++- 2 files changed, 81 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 2ed6060cd108..77cbc41def32 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -375,9 +375,12 @@ def read_from_pubsub( raise TypeError('Only one of topic and subscription may be specified.') elif not topic and not subscription: raise TypeError('One of topic or subscription may be specified.') + if publish_time_field is not None and not publish_time_field.strip(): + raise ValueError('publish_time_field must be a non-empty field name.') + has_publish_time_field = publish_time_field is not None payload_schema, parser = _create_parser(format, schema) extra_fields: list[schema_pb2.Field] = [] - if not attributes and not attributes_map and not publish_time_field: + if not attributes and not attributes_map and not has_publish_time_field: mapper = lambda msg: parser(msg) else: if isinstance(attributes, str): @@ -388,8 +391,9 @@ def read_from_pubsub( if attributes_map: extra_fields.append( schemas.schema_field(attributes_map, Mapping[str, str])) - if publish_time_field: - extra_fields.append(schemas.schema_field(publish_time_field, Timestamp)) + if has_publish_time_field: + extra_fields.append( + schemas.schema_field(publish_time_field, Optional[Timestamp])) def mapper(msg): values = parser(msg.data).as_dict() @@ -399,9 +403,10 @@ def mapper(msg): values[attr] = msg.attributes[attr] if attributes_map: values[attributes_map] = msg.attributes - if publish_time_field: - values[publish_time_field] = Timestamp.from_utc_datetime( - msg.publish_time) + if has_publish_time_field: + values[publish_time_field] = ( + Timestamp.of(msg.publish_time) + if msg.publish_time is not None else None) return beam.Row(**values) output = ( @@ -410,7 +415,7 @@ def mapper(msg): topic=topic, subscription=subscription, with_attributes=bool( - attributes or attributes_map or publish_time_field), + attributes or attributes_map or has_publish_time_field), id_label=id_attribute, timestamp_attribute=timestamp_attribute) | 'ParseMessage' >> beam.Map(mapper)) diff --git a/sdks/python/apache_beam/yaml/yaml_io_test.py b/sdks/python/apache_beam/yaml/yaml_io_test.py index 4541bc48bb2a..250a54689f5a 100644 --- a/sdks/python/apache_beam/yaml/yaml_io_test.py +++ b/sdks/python/apache_beam/yaml/yaml_io_test.py @@ -188,6 +188,50 @@ def test_read_with_publish_time_field(self): 2018, 3, 12, 13, 37, 1, 234567, tzinfo=datetime.timezone.utc) publish_time_2 = datetime.datetime( 2018, 3, 12, 13, 38, 2, 345678, tzinfo=datetime.timezone.utc) + publish_time_3 = Timestamp.from_utc_datetime( + datetime.datetime( + 2018, 3, 12, 13, 39, 3, 456789, tzinfo=datetime.timezone.utc)) + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + with mock.patch('apache_beam.io.ReadFromPubSub', + FakeReadFromPubSub( + topic='my_topic', + messages=[PubsubMessage(b'msg1', {'attr': 'value1'}, + publish_time=publish_time_1), + PubsubMessage(b'msg2', {'attr': 'value2'}, + publish_time=publish_time_2), + PubsubMessage(b'msg3', {'attr': 'value3'}, + publish_time=publish_time_3), + PubsubMessage(b'msg4', + {'attr': 'value4'})])): + result = p | YamlTransform( + ''' + type: ReadFromPubSub + config: + topic: my_topic + format: RAW + publish_time_field: publish_time + ''') + assert_that( + result, + equal_to([ + beam.Row( + payload=b'msg1', + publish_time=Timestamp.from_utc_datetime(publish_time_1)), + beam.Row( + payload=b'msg2', + publish_time=Timestamp.from_utc_datetime(publish_time_2)), + beam.Row(payload=b'msg3', publish_time=publish_time_3), + beam.Row(payload=b'msg4', publish_time=None) + ])) + + def test_read_with_attributes_and_publish_time_field(self): + publish_time_1 = Timestamp.from_utc_datetime( + datetime.datetime( + 2018, 3, 12, 13, 37, 1, 234567, tzinfo=datetime.timezone.utc)) + publish_time_2 = Timestamp.from_utc_datetime( + datetime.datetime( + 2018, 3, 12, 13, 38, 2, 345678, tzinfo=datetime.timezone.utc)) with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: with mock.patch('apache_beam.io.ReadFromPubSub', @@ -204,6 +248,8 @@ def test_read_with_publish_time_field(self): config: topic: my_topic format: RAW + attributes: [attr] + attributes_map: attrMap publish_time_field: publish_time ''') assert_that( @@ -211,12 +257,33 @@ def test_read_with_publish_time_field(self): equal_to([ beam.Row( payload=b'msg1', - publish_time=Timestamp.from_utc_datetime(publish_time_1)), + attr='value1', + attrMap={'attr': 'value1'}, + publish_time=publish_time_1), beam.Row( payload=b'msg2', - publish_time=Timestamp.from_utc_datetime(publish_time_2)) + attr='value2', + attrMap={'attr': 'value2'}, + publish_time=publish_time_2) ])) + def test_read_with_empty_publish_time_field(self): + for publish_time_field in ('', ' '): + with self.subTest(publish_time_field=publish_time_field): + with self.assertRaisesRegex( + ValueError, 'publish_time_field must be a non-empty field name'): + with beam.Pipeline( + options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + _ = p | YamlTransform( + ''' + type: ReadFromPubSub + config: + topic: my_topic + format: RAW + publish_time_field: "%s" + ''' % publish_time_field) + def test_read_with_id_attribute(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: