diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index bd1fc8da9018..42f4ee30a30c 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -1033,6 +1033,20 @@ def preprocess_windowing(spec): if 'windowing' in spec: spec['config'] = spec.get('config', {}) spec['config']['windowing'] = spec.pop('windowing') + + if spec.get('config', {}).get('windowing'): + windowing_config = spec['config']['windowing'] + if isinstance(windowing_config, str): + try: + # PyYAML can load a JSON string. + parsed_config = yaml.safe_load(windowing_config) + if not isinstance(parsed_config, dict): + raise TypeError('Windowing config string must be a YAML/JSON map.') + spec['config']['windowing'] = parsed_config + except Exception as e: + raise ValueError( + f'Error parsing windowing config string at \ + {identify_object(spec)}: {e}') from e return spec elif 'windowing' not in spec: # Nothing to do. diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py b/sdks/python/apache_beam/yaml/yaml_transform_test.py index d5950fb9efaf..06064138afdb 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py @@ -993,6 +993,43 @@ def test_explicit_window_into(self): providers=TEST_PROVIDERS) assert_that(result, equal_to([6, 9])) + def test_explicit_window_into_with_json_string_config(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + result = p | YamlTransform( + ''' + type: chain + transforms: + - type: CreateTimestamped + config: + elements: [0, 1, 2, 3, 4, 5] + - type: WindowInto + config: + windowing: | + {"type": "fixed", "size": "4s"} + - type: SumGlobally + ''', + providers=TEST_PROVIDERS) + assert_that(result, equal_to([6, 9])) + + def test_explicit_window_into_with_string_config_fails(self): + with self.assertRaisesRegex(ValueError, 'Error parsing windowing config'): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + _ = p | YamlTransform( + ''' + type: chain + transforms: + - type: CreateTimestamped + config: + elements: [0, 1, 2, 3, 4, 5] + - type: WindowInto + config: + windowing: | + 'not a valid yaml' + ''', + providers=TEST_PROVIDERS) + def test_windowing_on_input(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: