From a4b620304a794741da163092263b3c9a9d4de3ca Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 22 Dec 2025 16:49:00 +0000 Subject: [PATCH 1/3] fix windowing link --- sdks/python/apache_beam/yaml/yaml_provider.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index 3d1d1f5a742c..b8c7f4f7a871 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -1063,7 +1063,7 @@ class WindowInto(beam.PTransform): size: 30s Note that any Yaml transform can have a - [windowing parameter](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/README.md#windowing), + [windowing parameter](https://beam.apache.org/documentation/sdks/yaml/#windowing), which is applied to its inputs (if any) or outputs (if there are no inputs) which means that explicit WindowInto operations are not typically needed. From b20384bd3c758cfd5bfad7b3d783604866997d5f Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 23 Dec 2025 18:39:04 +0000 Subject: [PATCH 2/3] add windowing support when passing in a json string --- sdks/python/apache_beam/yaml/yaml_transform.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index bd1fc8da9018..42f4ee30a30c 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -1033,6 +1033,20 @@ def preprocess_windowing(spec): if 'windowing' in spec: spec['config'] = spec.get('config', {}) spec['config']['windowing'] = spec.pop('windowing') + + if spec.get('config', {}).get('windowing'): + windowing_config = spec['config']['windowing'] + if isinstance(windowing_config, str): + try: + # PyYAML can load a JSON string. + parsed_config = yaml.safe_load(windowing_config) + if not isinstance(parsed_config, dict): + raise TypeError('Windowing config string must be a YAML/JSON map.') + spec['config']['windowing'] = parsed_config + except Exception as e: + raise ValueError( + f'Error parsing windowing config string at \ + {identify_object(spec)}: {e}') from e return spec elif 'windowing' not in spec: # Nothing to do. From 691fa68c780d705b43288bf12abe88a9c6d9ed05 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 23 Dec 2025 18:41:18 +0000 Subject: [PATCH 3/3] Revert "add windowing support when passing in a json string" This reverts commit b20384bd3c758cfd5bfad7b3d783604866997d5f. --- sdks/python/apache_beam/yaml/yaml_transform.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index 42f4ee30a30c..bd1fc8da9018 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -1033,20 +1033,6 @@ def preprocess_windowing(spec): if 'windowing' in spec: spec['config'] = spec.get('config', {}) spec['config']['windowing'] = spec.pop('windowing') - - if spec.get('config', {}).get('windowing'): - windowing_config = spec['config']['windowing'] - if isinstance(windowing_config, str): - try: - # PyYAML can load a JSON string. - parsed_config = yaml.safe_load(windowing_config) - if not isinstance(parsed_config, dict): - raise TypeError('Windowing config string must be a YAML/JSON map.') - spec['config']['windowing'] = parsed_config - except Exception as e: - raise ValueError( - f'Error parsing windowing config string at \ - {identify_object(spec)}: {e}') from e return spec elif 'windowing' not in spec: # Nothing to do.