Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,20 @@ def preprocess_windowing(spec):
if 'windowing' in spec:
spec['config'] = spec.get('config', {})
spec['config']['windowing'] = spec.pop('windowing')

if spec.get('config', {}).get('windowing'):
windowing_config = spec['config']['windowing']
if isinstance(windowing_config, str):
try:
# PyYAML can load a JSON string.
parsed_config = yaml.safe_load(windowing_config)
if not isinstance(parsed_config, dict):
raise TypeError('Windowing config string must be a YAML/JSON map.')
spec['config']['windowing'] = parsed_config
except Exception as e:
raise ValueError(
f'Error parsing windowing config string at \
{identify_object(spec)}: {e}') from e
return spec
elif 'windowing' not in spec:
# Nothing to do.
Expand Down
37 changes: 37 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_transform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,43 @@ def test_explicit_window_into(self):
providers=TEST_PROVIDERS)
assert_that(result, equal_to([6, 9]))

def test_explicit_window_into_with_json_string_config(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
result = p | YamlTransform(
'''
type: chain
transforms:
- type: CreateTimestamped
config:
elements: [0, 1, 2, 3, 4, 5]
- type: WindowInto
config:
windowing: |
{"type": "fixed", "size": "4s"}
- type: SumGlobally
''',
providers=TEST_PROVIDERS)
assert_that(result, equal_to([6, 9]))

def test_explicit_window_into_with_string_config_fails(self):
with self.assertRaisesRegex(ValueError, 'Error parsing windowing config'):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
_ = p | YamlTransform(
'''
type: chain
transforms:
- type: CreateTimestamped
config:
elements: [0, 1, 2, 3, 4, 5]
- type: WindowInto
config:
windowing: |
'not a valid yaml'
''',
providers=TEST_PROVIDERS)

def test_windowing_on_input(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
Expand Down
Loading