diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 989661a6eae4..77cbc41def32 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -46,6 +46,7 @@ from apache_beam.io.gcp.bigquery import BigQueryDisposition from apache_beam.portability.api import schema_pb2 from apache_beam.typehints import schemas +from apache_beam.utils.timestamp import Timestamp from apache_beam.yaml import json_utils from apache_beam.yaml import yaml_errors from apache_beam.yaml import yaml_provider @@ -316,7 +317,8 @@ def read_from_pubsub( attributes: Optional[Iterable[str]] = None, attributes_map: Optional[str] = None, id_attribute: Optional[str] = None, - timestamp_attribute: Optional[str] = None): + timestamp_attribute: Optional[str] = None, + publish_time_field: Optional[str] = None): """Reads messages from Cloud Pub/Sub. Args: @@ -366,14 +368,19 @@ def read_from_pubsub( ``2015-10-29T23:41:41.123Z``. The sub-second component of the timestamp is optional, and digits beyond the first three (i.e., time units smaller than milliseconds) may be ignored. + publish_time_field: Field to add to output messages with the Pub/Sub + message publish time. If None, no such field is added. """ if topic and subscription: raise TypeError('Only one of topic and subscription may be specified.') elif not topic and not subscription: raise TypeError('One of topic or subscription may be specified.') + if publish_time_field is not None and not publish_time_field.strip(): + raise ValueError('publish_time_field must be a non-empty field name.') + has_publish_time_field = publish_time_field is not None payload_schema, parser = _create_parser(format, schema) extra_fields: list[schema_pb2.Field] = [] - if not attributes and not attributes_map: + if not attributes and not attributes_map and not has_publish_time_field: mapper = lambda msg: parser(msg) else: if isinstance(attributes, str): @@ -384,6 +391,9 @@ def read_from_pubsub( if attributes_map: extra_fields.append( schemas.schema_field(attributes_map, Mapping[str, str])) + if has_publish_time_field: + extra_fields.append( + schemas.schema_field(publish_time_field, Optional[Timestamp])) def mapper(msg): values = parser(msg.data).as_dict() @@ -393,6 +403,10 @@ def mapper(msg): values[attr] = msg.attributes[attr] if attributes_map: values[attributes_map] = msg.attributes + if has_publish_time_field: + values[publish_time_field] = ( + Timestamp.of(msg.publish_time) + if msg.publish_time is not None else None) return beam.Row(**values) output = ( @@ -400,7 +414,8 @@ def mapper(msg): | beam.io.ReadFromPubSub( topic=topic, subscription=subscription, - with_attributes=bool(attributes or attributes_map), + with_attributes=bool( + attributes or attributes_map or has_publish_time_field), id_label=id_attribute, timestamp_attribute=timestamp_attribute) | 'ParseMessage' >> beam.Map(mapper)) diff --git a/sdks/python/apache_beam/yaml/yaml_io_test.py b/sdks/python/apache_beam/yaml/yaml_io_test.py index e6219277bf58..250a54689f5a 100644 --- a/sdks/python/apache_beam/yaml/yaml_io_test.py +++ b/sdks/python/apache_beam/yaml/yaml_io_test.py @@ -15,6 +15,7 @@ # limitations under the License. # +import datetime import io import json import logging @@ -32,6 +33,7 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.typehints import schemas as schema_utils +from apache_beam.utils.timestamp import Timestamp from apache_beam.yaml.yaml_transform import YamlTransform try: @@ -181,6 +183,107 @@ def test_read_with_attribute_map(self): beam.Row(payload=b'msg2', attrMap={'attr': 'value2'}) ])) + def test_read_with_publish_time_field(self): + publish_time_1 = datetime.datetime( + 2018, 3, 12, 13, 37, 1, 234567, tzinfo=datetime.timezone.utc) + publish_time_2 = datetime.datetime( + 2018, 3, 12, 13, 38, 2, 345678, tzinfo=datetime.timezone.utc) + publish_time_3 = Timestamp.from_utc_datetime( + datetime.datetime( + 2018, 3, 12, 13, 39, 3, 456789, tzinfo=datetime.timezone.utc)) + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + with mock.patch('apache_beam.io.ReadFromPubSub', + FakeReadFromPubSub( + topic='my_topic', + messages=[PubsubMessage(b'msg1', {'attr': 'value1'}, + publish_time=publish_time_1), + PubsubMessage(b'msg2', {'attr': 'value2'}, + publish_time=publish_time_2), + PubsubMessage(b'msg3', {'attr': 'value3'}, + publish_time=publish_time_3), + PubsubMessage(b'msg4', + {'attr': 'value4'})])): + result = p | YamlTransform( + ''' + type: ReadFromPubSub + config: + topic: my_topic + format: RAW + publish_time_field: publish_time + ''') + assert_that( + result, + equal_to([ + beam.Row( + payload=b'msg1', + publish_time=Timestamp.from_utc_datetime(publish_time_1)), + beam.Row( + payload=b'msg2', + publish_time=Timestamp.from_utc_datetime(publish_time_2)), + beam.Row(payload=b'msg3', publish_time=publish_time_3), + beam.Row(payload=b'msg4', publish_time=None) + ])) + + def test_read_with_attributes_and_publish_time_field(self): + publish_time_1 = Timestamp.from_utc_datetime( + datetime.datetime( + 2018, 3, 12, 13, 37, 1, 234567, tzinfo=datetime.timezone.utc)) + publish_time_2 = Timestamp.from_utc_datetime( + datetime.datetime( + 2018, 3, 12, 13, 38, 2, 345678, tzinfo=datetime.timezone.utc)) + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + with mock.patch('apache_beam.io.ReadFromPubSub', + FakeReadFromPubSub( + topic='my_topic', + messages=[PubsubMessage(b'msg1', {'attr': 'value1'}, + publish_time=publish_time_1), + PubsubMessage(b'msg2', {'attr': 'value2'}, + publish_time=publish_time_2) + ])): + result = p | YamlTransform( + ''' + type: ReadFromPubSub + config: + topic: my_topic + format: RAW + attributes: [attr] + attributes_map: attrMap + publish_time_field: publish_time + ''') + assert_that( + result, + equal_to([ + beam.Row( + payload=b'msg1', + attr='value1', + attrMap={'attr': 'value1'}, + publish_time=publish_time_1), + beam.Row( + payload=b'msg2', + attr='value2', + attrMap={'attr': 'value2'}, + publish_time=publish_time_2) + ])) + + def test_read_with_empty_publish_time_field(self): + for publish_time_field in ('', ' '): + with self.subTest(publish_time_field=publish_time_field): + with self.assertRaisesRegex( + ValueError, 'publish_time_field must be a non-empty field name'): + with beam.Pipeline( + options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + _ = p | YamlTransform( + ''' + type: ReadFromPubSub + config: + topic: my_topic + format: RAW + publish_time_field: "%s" + ''' % publish_time_field) + def test_read_with_id_attribute(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: