From b1230f9918d91806580ea9460555bc58d9343c09 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 5 Feb 2026 17:36:12 +0000 Subject: [PATCH 01/10] Add pipeline construction options. --- .../options/pipeline_construction_options.py | 71 ++++ .../pipeline_construction_options_test.py | 338 ++++++++++++++++++ sdks/python/apache_beam/pipeline.py | 16 + 3 files changed, 425 insertions(+) create mode 100644 sdks/python/apache_beam/options/pipeline_construction_options.py create mode 100644 sdks/python/apache_beam/options/pipeline_construction_options_test.py diff --git a/sdks/python/apache_beam/options/pipeline_construction_options.py b/sdks/python/apache_beam/options/pipeline_construction_options.py new file mode 100644 index 000000000000..f0b5f326d6ef --- /dev/null +++ b/sdks/python/apache_beam/options/pipeline_construction_options.py @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Context-scoped access to pipeline options during graph construction and +translation. + +This module provides thread-safe and async-safe access to PipelineOptions +using contextvars. + +Usage: + # Reading options (from anywhere in the call stack): + from apache_beam.options.pipeline_construction_options import get_current_pipeline_options + options = get_current_pipeline_options() + + # Setting options (typically done internally by Pipeline): + from apache_beam.options.pipeline_construction_options import scoped_pipeline_options + with scoped_pipeline_options(options): + # All code here sees these options + ... +""" + +from contextlib import contextmanager +from contextvars import ContextVar +from typing import TYPE_CHECKING +from typing import Optional + +if TYPE_CHECKING: + from apache_beam.options.pipeline_options import PipelineOptions + +# The contextvar holding the current pipeline's options. +# Each thread and each asyncio task gets its own isolated copy. +_current_pipeline_options: ContextVar[Optional['PipelineOptions']] = ContextVar( + 'current_pipeline_options', default=None) + + +def get_current_pipeline_options() -> Optional['PipelineOptions']: + """Get the current pipeline's options from the context. + + Returns: + The PipelineOptions for the currently executing pipeline operation, + or None if called outside of a pipeline context. + """ + return _current_pipeline_options.get() + + +@contextmanager +def scoped_pipeline_options(options: Optional['PipelineOptions']): + """Context manager that sets pipeline options for the duration of a block. + + Args: + options: The PipelineOptions to make available during this scope. + """ + token = _current_pipeline_options.set(options) + try: + yield + finally: + _current_pipeline_options.reset(token) diff --git a/sdks/python/apache_beam/options/pipeline_construction_options_test.py b/sdks/python/apache_beam/options/pipeline_construction_options_test.py new file mode 100644 index 000000000000..a48eb51ba493 --- /dev/null +++ b/sdks/python/apache_beam/options/pipeline_construction_options_test.py @@ -0,0 +1,338 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests for pipeline_construction_options module. + +These tests verify that the contextvar-based approach properly isolates +pipeline options across threads and async tasks, preventing race conditions. +""" + +import asyncio +import threading +import unittest + +import apache_beam as beam + +from apache_beam.options.pipeline_construction_options import get_current_pipeline_options +from apache_beam.options.pipeline_construction_options import scoped_pipeline_options +from apache_beam.options.pipeline_options import PipelineOptions + + +class PipelineConstructionOptionsTest(unittest.TestCase): + def test_nested_scoping(self): + """Test that nested scopes properly restore outer options.""" + outer_options = PipelineOptions(['--runner=DirectRunner']) + inner_options = PipelineOptions(['--runner=DataflowRunner']) + + with scoped_pipeline_options(outer_options): + self.assertIs(get_current_pipeline_options(), outer_options) + + with scoped_pipeline_options(inner_options): + self.assertIs(get_current_pipeline_options(), inner_options) + + self.assertIs(get_current_pipeline_options(), outer_options) + + self.assertIsNone(get_current_pipeline_options()) + + def test_exception_in_scope_restores_options(self): + """Test that options are restored even when an exception is raised.""" + outer_options = PipelineOptions(['--runner=DirectRunner']) + inner_options = PipelineOptions(['--runner=DataflowRunner']) + + with scoped_pipeline_options(outer_options): + try: + with scoped_pipeline_options(inner_options): + self.assertIs(get_current_pipeline_options(), inner_options) + raise ValueError("Test exception") + except ValueError: + pass + + self.assertIs(get_current_pipeline_options(), outer_options) + + def test_thread_isolation(self): + """Test that different threads see their own isolated options.""" + results = {} + errors = [] + barrier = threading.Barrier(2) + + def thread_worker(thread_id, runner_name): + try: + options = PipelineOptions([f'--runner={runner_name}']) + with scoped_pipeline_options(options): + barrier.wait(timeout=5) + + current = get_current_pipeline_options() + results[thread_id] = current.get_all_options()['runner'] + import time + time.sleep(0.01) + + current_after = get_current_pipeline_options() + if current_after is not current: + errors.append( + f"Thread {thread_id}: options changed during execution") + except Exception as e: + errors.append(f"Thread {thread_id}: {e}") + + thread1 = threading.Thread(target=thread_worker, args=(1, 'DirectRunner')) + thread2 = threading.Thread(target=thread_worker, args=(2, 'DataflowRunner')) + + thread1.start() + thread2.start() + + thread1.join(timeout=5) + thread2.join(timeout=5) + + self.assertEqual(errors, []) + self.assertEqual(results[1], 'DirectRunner') + self.assertEqual(results[2], 'DataflowRunner') + + def test_asyncio_task_isolation(self): + """Test that different asyncio tasks see their own isolated options.""" + async def async_worker( + task_id, runner_name, results, ready_event, go_event): + options = PipelineOptions([f'--runner={runner_name}']) + with scoped_pipeline_options(options): + ready_event.set() + await go_event.wait() + current = get_current_pipeline_options() + results[task_id] = current.get_all_options()['runner'] + await asyncio.sleep(0.01) + current_after = get_current_pipeline_options() + assert current_after is current, \ + f"Task {task_id}: options changed during execution" + + async def run_test(): + results = {} + ready_events = [asyncio.Event() for _ in range(2)] + go_event = asyncio.Event() + + task1 = asyncio.create_task( + async_worker(1, 'DirectRunner', results, ready_events[0], go_event)) + task2 = asyncio.create_task( + async_worker(2, 'DataflowRunner', results, ready_events[1], go_event)) + + # Wait for both tasks to be ready + await asyncio.gather(*[e.wait() for e in ready_events]) + # Signal all tasks to proceed + go_event.set() + + await asyncio.gather(task1, task2) + return results + + results = asyncio.run(run_test()) + self.assertEqual(results[1], 'DirectRunner') + self.assertEqual(results[2], 'DataflowRunner') + + def test_transform_sees_pipeline_options(self): + """Test that a transform can access pipeline options during expand().""" + class OptionsCapturingTransform(beam.PTransform): + """Transform that captures pipeline options during expand().""" + def __init__(self, expected_job_name): + self.expected_job_name = expected_job_name + self.captured_options = None + + def expand(self, pcoll): + # This runs during pipeline construction + self.captured_options = get_current_pipeline_options() + return pcoll | beam.Map(lambda x: x) + + options = PipelineOptions(['--job_name=test_job_123']) + transform = OptionsCapturingTransform('test_job_123') + + with beam.Pipeline(options=options) as p: + _ = p | beam.Create([1, 2, 3]) | transform + + # Verify the transform saw the correct options + self.assertIsNotNone(transform.captured_options) + self.assertEqual( + transform.captured_options.get_all_options()['job_name'], + 'test_job_123') + + def test_coder_sees_correct_options_during_run(self): + """Test that coders see correct pipeline options during proto conversion. + + This tests the run path where as_deterministic_coder() is called during + to_runner_api() proto conversion. Each thread runs a pipeline with a + custom key type that uses a non-deterministic coder. During conversion, + as_deterministic_coder() should see the correct pipeline options. + + Uses PrismRunner for deterministic behavior and shared.Shared() to + capture results across the pickle boundary. + """ + from apache_beam.coders import coders + from apache_beam.utils import shared + + errors = [] + + class WeakRefDict(dict): + pass + + class TestKey: + def __init__(self, value): + self.value = value + + def __eq__(self, other): + return isinstance(other, TestKey) and self.value == other.value + + def __hash__(self): + return hash(self.value) + + class OptionsCapturingKeyCoder(coders.Coder): + """Coder that captures pipeline options in as_deterministic_coder.""" + shared_handle = shared.Shared() + + def encode(self, value): + return str(value.value).encode('utf-8') + + def decode(self, encoded): + return TestKey(encoded.decode('utf-8')) + + def is_deterministic(self): + return False + + def as_deterministic_coder(self, step_label, error_message=None): + opts = get_current_pipeline_options() + if opts is not None: + results = OptionsCapturingKeyCoder.shared_handle.acquire(WeakRefDict) + thread_name = threading.current_thread().name + job_name = opts.get_all_options().get('job_name') + results[thread_name] = job_name + return self + + beam.coders.registry.register_coder(TestKey, OptionsCapturingKeyCoder) + + results = OptionsCapturingKeyCoder.shared_handle.acquire(WeakRefDict) + + start_barrier = threading.Barrier(2) + + def construct_pipeline(worker_id): + try: + job_name = f'gbk_job_{worker_id}' + options = PipelineOptions( + [f'--job_name={job_name}', '--runner=PrismRunner']) + + start_barrier.wait(timeout=5) + + with beam.Pipeline(options=options) as p: + _ = ( + p + | beam.Create([(TestKey(1), 'a'), (TestKey(2), 'b')]) + | beam.GroupByKey()) + + except ValueError as e: + # Ignore "signal only works in main thread" error from PrismRunner + if 'signal' not in str(e): + import traceback + errors.append(f"Worker {worker_id}: {e}\n{traceback.format_exc()}") + except Exception as e: + import traceback + errors.append(f"Worker {worker_id}: {e}\n{traceback.format_exc()}") + + thread1 = threading.Thread( + target=construct_pipeline, args=(1, ), name='Worker1') + thread2 = threading.Thread( + target=construct_pipeline, args=(2, ), name='Worker2') + + thread1.start() + thread2.start() + + thread1.join(timeout=30) + thread2.join(timeout=30) + + self.assertEqual(errors, [], f"Errors occurred: {errors}") + self.assertEqual( + results.get('Worker1'), + 'gbk_job_1', + f"Worker1 saw wrong options: {results}") + self.assertEqual( + results.get('Worker2'), + 'gbk_job_2', + f"Worker2 saw wrong options: {results}") + + def test_barrier_inside_default_type_hints(self): + """Test race condition detection with barrier inside default_type_hints. + + This test reliably detects race conditions because: + 1. Both threads start pipeline construction simultaneously + 2. Inside default_type_hints (which is called during Pipeline.apply()), + both threads hit a barrier and wait for each other + 3. At this point, BOTH threads are inside scoped_pipeline_options + 4. When they continue, they read options - with a global var, they'd see + the wrong values because the last thread to set options would win + """ + + results = {} + errors = [] + inner_barrier = threading.Barrier(2) + + class BarrierTransform(beam.PTransform): + """Transform that synchronizes threads INSIDE default_type_hints.""" + def __init__(self, worker_id, results_dict, barrier): + self.worker_id = worker_id + self.results_dict = results_dict + self.barrier = barrier + + def expand(self, pcoll): + return pcoll | beam.Map(lambda x: x) + + def default_type_hints(self): + self.barrier.wait(timeout=5) + opts = get_current_pipeline_options() + if opts is not None: + job_name = opts.get_all_options().get('job_name') + if self.worker_id not in self.results_dict: + self.results_dict[self.worker_id] = job_name + + return super().default_type_hints() + + def construct_pipeline(worker_id): + try: + job_name = f'barrier_job_{worker_id}' + options = PipelineOptions([f'--job_name={job_name}']) + transform = BarrierTransform(worker_id, results, inner_barrier) + + with beam.Pipeline(options=options) as p: + _ = p | beam.Create([1, 2, 3]) | transform + + except Exception as e: + import traceback + errors.append(f"Worker {worker_id}: {e}\n{traceback.format_exc()}") + + thread1 = threading.Thread( + target=construct_pipeline, args=(1, )) + thread2 = threading.Thread( + target=construct_pipeline, args=(2, )) + + thread1.start() + thread2.start() + + thread1.join(timeout=10) + thread2.join(timeout=10) + + self.assertEqual(errors, [], f"Errors occurred: {errors}") + self.assertEqual( + results.get(1), + 'barrier_job_1', + f"Worker 1 saw wrong options: {results}") + self.assertEqual( + results.get(2), + 'barrier_job_2', + f"Worker 2 saw wrong options: {results}") + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 6ef06abb7436..10da835912aa 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -74,6 +74,7 @@ from apache_beam.coders import typecoders from apache_beam.internal import pickler from apache_beam.io.filesystems import FileSystems +from apache_beam.options.pipeline_construction_options import scoped_pipeline_options from apache_beam.options.pipeline_options import CrossLanguageOptions from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PipelineOptions @@ -559,6 +560,12 @@ def replace_all(self, replacements: Iterable['PTransformOverride']) -> None: def run(self, test_runner_api: Union[bool, str] = 'AUTO') -> 'PipelineResult': """Runs the pipeline. Returns whatever our runner returns after running.""" + with scoped_pipeline_options(self._options): + return self._run_internal(test_runner_api) + + def _run_internal( + self, test_runner_api: Union[bool, str] = 'AUTO') -> 'PipelineResult': + """Internal implementation of run(), called within scoped options.""" # All pipeline options are finalized at this point. # Call get_all_options to print warnings on invalid options. self.options.get_all_options( @@ -698,6 +705,15 @@ def apply( RuntimeError: if the transform object was already applied to this pipeline and needs to be cloned in order to apply again. """ + with scoped_pipeline_options(self._options): + return self._apply_internal(transform, pvalueish, label) + + def _apply_internal( + self, + transform: ptransform.PTransform, + pvalueish: Optional[pvalue.PValue] = None, + label: Optional[str] = None) -> pvalue.PValue: + """Internal implementation of apply(), called within scoped options.""" if isinstance(transform, ptransform._NamedPTransform): return self.apply( transform.transform, pvalueish, label or transform.label) From 260a077ef987b628508a355e71ed5c8b765a1bd6 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 5 Feb 2026 17:48:09 +0000 Subject: [PATCH 02/10] use apply_internal --- sdks/python/apache_beam/pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 10da835912aa..b6cfd24dc7b7 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -715,7 +715,7 @@ def _apply_internal( label: Optional[str] = None) -> pvalue.PValue: """Internal implementation of apply(), called within scoped options.""" if isinstance(transform, ptransform._NamedPTransform): - return self.apply( + return self._apply_internal( transform.transform, pvalueish, label or transform.label) if not label and isinstance(transform, ptransform._PTransformFnPTransform): @@ -730,7 +730,7 @@ def _apply_internal( # (e.g. to produce error messages for type hint violations). old_label, transform.label = transform.label, label try: - return self.apply(transform, pvalueish) + return self._apply_internal(transform, pvalueish) finally: transform.label = old_label @@ -753,7 +753,7 @@ def _apply_internal( 'updating a pipeline or reloading the job state. ' 'This is not recommended for streaming jobs.') unique_label = self._generate_unique_label(transform) - return self.apply(transform, pvalueish, unique_label) + return self._apply_internal(transform, pvalueish, unique_label) else: raise RuntimeError( 'A transform with label "%s" already exists in the pipeline. ' From fbbf41885e570f28e414485f9302fda006bdf2bc Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 5 Feb 2026 18:44:45 +0000 Subject: [PATCH 03/10] lint. --- .../options/pipeline_construction_options.py | 11 ----------- .../options/pipeline_construction_options_test.py | 1 - 2 files changed, 12 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_construction_options.py b/sdks/python/apache_beam/options/pipeline_construction_options.py index f0b5f326d6ef..6bc300de507a 100644 --- a/sdks/python/apache_beam/options/pipeline_construction_options.py +++ b/sdks/python/apache_beam/options/pipeline_construction_options.py @@ -20,17 +20,6 @@ This module provides thread-safe and async-safe access to PipelineOptions using contextvars. - -Usage: - # Reading options (from anywhere in the call stack): - from apache_beam.options.pipeline_construction_options import get_current_pipeline_options - options = get_current_pipeline_options() - - # Setting options (typically done internally by Pipeline): - from apache_beam.options.pipeline_construction_options import scoped_pipeline_options - with scoped_pipeline_options(options): - # All code here sees these options - ... """ from contextlib import contextmanager diff --git a/sdks/python/apache_beam/options/pipeline_construction_options_test.py b/sdks/python/apache_beam/options/pipeline_construction_options_test.py index a48eb51ba493..e4c0c0f2c119 100644 --- a/sdks/python/apache_beam/options/pipeline_construction_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_construction_options_test.py @@ -26,7 +26,6 @@ import unittest import apache_beam as beam - from apache_beam.options.pipeline_construction_options import get_current_pipeline_options from apache_beam.options.pipeline_construction_options import scoped_pipeline_options from apache_beam.options.pipeline_options import PipelineOptions From 36d1fbb1a75c9eb6fbf75f477dcf44a147337e15 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 5 Feb 2026 19:41:08 +0000 Subject: [PATCH 04/10] remove concurrent prism jobs. --- .../pipeline_construction_options_test.py | 58 ++++--------------- 1 file changed, 11 insertions(+), 47 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_construction_options_test.py b/sdks/python/apache_beam/options/pipeline_construction_options_test.py index e4c0c0f2c119..6ef3e316afe2 100644 --- a/sdks/python/apache_beam/options/pipeline_construction_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_construction_options_test.py @@ -165,12 +165,7 @@ def test_coder_sees_correct_options_during_run(self): """Test that coders see correct pipeline options during proto conversion. This tests the run path where as_deterministic_coder() is called during - to_runner_api() proto conversion. Each thread runs a pipeline with a - custom key type that uses a non-deterministic coder. During conversion, - as_deterministic_coder() should see the correct pipeline options. - - Uses PrismRunner for deterministic behavior and shared.Shared() to - capture results across the pickle boundary. + to_runner_api() proto conversion. """ from apache_beam.coders import coders from apache_beam.utils import shared @@ -207,60 +202,29 @@ def as_deterministic_coder(self, step_label, error_message=None): opts = get_current_pipeline_options() if opts is not None: results = OptionsCapturingKeyCoder.shared_handle.acquire(WeakRefDict) - thread_name = threading.current_thread().name job_name = opts.get_all_options().get('job_name') - results[thread_name] = job_name + results['Worker1'] = job_name return self beam.coders.registry.register_coder(TestKey, OptionsCapturingKeyCoder) results = OptionsCapturingKeyCoder.shared_handle.acquire(WeakRefDict) - start_barrier = threading.Barrier(2) - - def construct_pipeline(worker_id): - try: - job_name = f'gbk_job_{worker_id}' - options = PipelineOptions( - [f'--job_name={job_name}', '--runner=PrismRunner']) - - start_barrier.wait(timeout=5) - - with beam.Pipeline(options=options) as p: - _ = ( - p - | beam.Create([(TestKey(1), 'a'), (TestKey(2), 'b')]) - | beam.GroupByKey()) - - except ValueError as e: - # Ignore "signal only works in main thread" error from PrismRunner - if 'signal' not in str(e): - import traceback - errors.append(f"Worker {worker_id}: {e}\n{traceback.format_exc()}") - except Exception as e: - import traceback - errors.append(f"Worker {worker_id}: {e}\n{traceback.format_exc()}") - - thread1 = threading.Thread( - target=construct_pipeline, args=(1, ), name='Worker1') - thread2 = threading.Thread( - target=construct_pipeline, args=(2, ), name='Worker2') + job_name = 'gbk_job' + options = PipelineOptions([f'--job_name={job_name}']) - thread1.start() - thread2.start() - - thread1.join(timeout=30) - thread2.join(timeout=30) + with beam.Pipeline(options=options) as p: + _ = ( + p + | beam.Create([(TestKey(1), 'a'), (TestKey(2), 'b')]) + | beam.GroupByKey()) self.assertEqual(errors, [], f"Errors occurred: {errors}") self.assertEqual( results.get('Worker1'), - 'gbk_job_1', + job_name, f"Worker1 saw wrong options: {results}") - self.assertEqual( - results.get('Worker2'), - 'gbk_job_2', - f"Worker2 saw wrong options: {results}") + self.assertFalse(get_current_pipeline_options() == options) def test_barrier_inside_default_type_hints(self): """Test race condition detection with barrier inside default_type_hints. From 33f250d80874d4f842732f17506dee0115c5ddf7 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 9 Feb 2026 15:01:32 -0500 Subject: [PATCH 05/10] comments. --- .../options/pipeline_construction_options.py | 12 ++++---- .../pipeline_construction_options_test.py | 30 +++++++++---------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_construction_options.py b/sdks/python/apache_beam/options/pipeline_construction_options.py index 6bc300de507a..57f9aeed0307 100644 --- a/sdks/python/apache_beam/options/pipeline_construction_options.py +++ b/sdks/python/apache_beam/options/pipeline_construction_options.py @@ -32,18 +32,18 @@ # The contextvar holding the current pipeline's options. # Each thread and each asyncio task gets its own isolated copy. -_current_pipeline_options: ContextVar[Optional['PipelineOptions']] = ContextVar( - 'current_pipeline_options', default=None) +_pipeline_options: ContextVar[Optional['PipelineOptions']] = ContextVar( + 'pipeline_options', default=None) -def get_current_pipeline_options() -> Optional['PipelineOptions']: +def get_pipeline_options() -> Optional['PipelineOptions']: """Get the current pipeline's options from the context. Returns: The PipelineOptions for the currently executing pipeline operation, or None if called outside of a pipeline context. """ - return _current_pipeline_options.get() + return _pipeline_options.get() @contextmanager @@ -53,8 +53,8 @@ def scoped_pipeline_options(options: Optional['PipelineOptions']): Args: options: The PipelineOptions to make available during this scope. """ - token = _current_pipeline_options.set(options) + token = _pipeline_options.set(options) try: yield finally: - _current_pipeline_options.reset(token) + _pipeline_options.reset(token) diff --git a/sdks/python/apache_beam/options/pipeline_construction_options_test.py b/sdks/python/apache_beam/options/pipeline_construction_options_test.py index 6ef3e316afe2..0546c574e82c 100644 --- a/sdks/python/apache_beam/options/pipeline_construction_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_construction_options_test.py @@ -26,7 +26,7 @@ import unittest import apache_beam as beam -from apache_beam.options.pipeline_construction_options import get_current_pipeline_options +from apache_beam.options.pipeline_construction_options import get_pipeline_options from apache_beam.options.pipeline_construction_options import scoped_pipeline_options from apache_beam.options.pipeline_options import PipelineOptions @@ -38,14 +38,14 @@ def test_nested_scoping(self): inner_options = PipelineOptions(['--runner=DataflowRunner']) with scoped_pipeline_options(outer_options): - self.assertIs(get_current_pipeline_options(), outer_options) + self.assertIs(get_pipeline_options(), outer_options) with scoped_pipeline_options(inner_options): - self.assertIs(get_current_pipeline_options(), inner_options) + self.assertIs(get_pipeline_options(), inner_options) - self.assertIs(get_current_pipeline_options(), outer_options) + self.assertIs(get_pipeline_options(), outer_options) - self.assertIsNone(get_current_pipeline_options()) + self.assertIsNone(get_pipeline_options()) def test_exception_in_scope_restores_options(self): """Test that options are restored even when an exception is raised.""" @@ -55,12 +55,12 @@ def test_exception_in_scope_restores_options(self): with scoped_pipeline_options(outer_options): try: with scoped_pipeline_options(inner_options): - self.assertIs(get_current_pipeline_options(), inner_options) + self.assertIs(get_pipeline_options(), inner_options) raise ValueError("Test exception") except ValueError: pass - self.assertIs(get_current_pipeline_options(), outer_options) + self.assertIs(get_pipeline_options(), outer_options) def test_thread_isolation(self): """Test that different threads see their own isolated options.""" @@ -74,12 +74,12 @@ def thread_worker(thread_id, runner_name): with scoped_pipeline_options(options): barrier.wait(timeout=5) - current = get_current_pipeline_options() + current = get_pipeline_options() results[thread_id] = current.get_all_options()['runner'] import time time.sleep(0.01) - current_after = get_current_pipeline_options() + current_after = get_pipeline_options() if current_after is not current: errors.append( f"Thread {thread_id}: options changed during execution") @@ -107,10 +107,10 @@ async def async_worker( with scoped_pipeline_options(options): ready_event.set() await go_event.wait() - current = get_current_pipeline_options() + current = get_pipeline_options() results[task_id] = current.get_all_options()['runner'] await asyncio.sleep(0.01) - current_after = get_current_pipeline_options() + current_after = get_pipeline_options() assert current_after is current, \ f"Task {task_id}: options changed during execution" @@ -146,7 +146,7 @@ def __init__(self, expected_job_name): def expand(self, pcoll): # This runs during pipeline construction - self.captured_options = get_current_pipeline_options() + self.captured_options = get_pipeline_options() return pcoll | beam.Map(lambda x: x) options = PipelineOptions(['--job_name=test_job_123']) @@ -199,7 +199,7 @@ def is_deterministic(self): return False def as_deterministic_coder(self, step_label, error_message=None): - opts = get_current_pipeline_options() + opts = get_pipeline_options() if opts is not None: results = OptionsCapturingKeyCoder.shared_handle.acquire(WeakRefDict) job_name = opts.get_all_options().get('job_name') @@ -224,7 +224,7 @@ def as_deterministic_coder(self, step_label, error_message=None): results.get('Worker1'), job_name, f"Worker1 saw wrong options: {results}") - self.assertFalse(get_current_pipeline_options() == options) + self.assertFalse(get_pipeline_options() == options) def test_barrier_inside_default_type_hints(self): """Test race condition detection with barrier inside default_type_hints. @@ -254,7 +254,7 @@ def expand(self, pcoll): def default_type_hints(self): self.barrier.wait(timeout=5) - opts = get_current_pipeline_options() + opts = get_pipeline_options() if opts is not None: job_name = opts.get_all_options().get('job_name') if self.worker_id not in self.results_dict: From 27ebf1eb8a36ea067e95f9355ce623fcf11fe578 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 9 Feb 2026 15:02:17 -0500 Subject: [PATCH 06/10] comments. --- .../apache_beam/options/pipeline_construction_options_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/options/pipeline_construction_options_test.py b/sdks/python/apache_beam/options/pipeline_construction_options_test.py index 0546c574e82c..214da4d51d29 100644 --- a/sdks/python/apache_beam/options/pipeline_construction_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_construction_options_test.py @@ -62,7 +62,7 @@ def test_exception_in_scope_restores_options(self): self.assertIs(get_pipeline_options(), outer_options) - def test_thread_isolation(self): + def test_different_threads_see_their_own_isolated_options(self): """Test that different threads see their own isolated options.""" results = {} errors = [] From 00769398dfb34271c1a2e9a2438d50a2a479cc48 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 9 Feb 2026 17:08:22 -0500 Subject: [PATCH 07/10] comments. --- .../options/pipeline_construction_options.py | 60 ---- .../pipeline_construction_options_test.py | 301 ------------------ sdks/python/apache_beam/pipeline.py | 2 +- 3 files changed, 1 insertion(+), 362 deletions(-) delete mode 100644 sdks/python/apache_beam/options/pipeline_construction_options.py delete mode 100644 sdks/python/apache_beam/options/pipeline_construction_options_test.py diff --git a/sdks/python/apache_beam/options/pipeline_construction_options.py b/sdks/python/apache_beam/options/pipeline_construction_options.py deleted file mode 100644 index 57f9aeed0307..000000000000 --- a/sdks/python/apache_beam/options/pipeline_construction_options.py +++ /dev/null @@ -1,60 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Context-scoped access to pipeline options during graph construction and -translation. - -This module provides thread-safe and async-safe access to PipelineOptions -using contextvars. -""" - -from contextlib import contextmanager -from contextvars import ContextVar -from typing import TYPE_CHECKING -from typing import Optional - -if TYPE_CHECKING: - from apache_beam.options.pipeline_options import PipelineOptions - -# The contextvar holding the current pipeline's options. -# Each thread and each asyncio task gets its own isolated copy. -_pipeline_options: ContextVar[Optional['PipelineOptions']] = ContextVar( - 'pipeline_options', default=None) - - -def get_pipeline_options() -> Optional['PipelineOptions']: - """Get the current pipeline's options from the context. - - Returns: - The PipelineOptions for the currently executing pipeline operation, - or None if called outside of a pipeline context. - """ - return _pipeline_options.get() - - -@contextmanager -def scoped_pipeline_options(options: Optional['PipelineOptions']): - """Context manager that sets pipeline options for the duration of a block. - - Args: - options: The PipelineOptions to make available during this scope. - """ - token = _pipeline_options.set(options) - try: - yield - finally: - _pipeline_options.reset(token) diff --git a/sdks/python/apache_beam/options/pipeline_construction_options_test.py b/sdks/python/apache_beam/options/pipeline_construction_options_test.py deleted file mode 100644 index 214da4d51d29..000000000000 --- a/sdks/python/apache_beam/options/pipeline_construction_options_test.py +++ /dev/null @@ -1,301 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Tests for pipeline_construction_options module. - -These tests verify that the contextvar-based approach properly isolates -pipeline options across threads and async tasks, preventing race conditions. -""" - -import asyncio -import threading -import unittest - -import apache_beam as beam -from apache_beam.options.pipeline_construction_options import get_pipeline_options -from apache_beam.options.pipeline_construction_options import scoped_pipeline_options -from apache_beam.options.pipeline_options import PipelineOptions - - -class PipelineConstructionOptionsTest(unittest.TestCase): - def test_nested_scoping(self): - """Test that nested scopes properly restore outer options.""" - outer_options = PipelineOptions(['--runner=DirectRunner']) - inner_options = PipelineOptions(['--runner=DataflowRunner']) - - with scoped_pipeline_options(outer_options): - self.assertIs(get_pipeline_options(), outer_options) - - with scoped_pipeline_options(inner_options): - self.assertIs(get_pipeline_options(), inner_options) - - self.assertIs(get_pipeline_options(), outer_options) - - self.assertIsNone(get_pipeline_options()) - - def test_exception_in_scope_restores_options(self): - """Test that options are restored even when an exception is raised.""" - outer_options = PipelineOptions(['--runner=DirectRunner']) - inner_options = PipelineOptions(['--runner=DataflowRunner']) - - with scoped_pipeline_options(outer_options): - try: - with scoped_pipeline_options(inner_options): - self.assertIs(get_pipeline_options(), inner_options) - raise ValueError("Test exception") - except ValueError: - pass - - self.assertIs(get_pipeline_options(), outer_options) - - def test_different_threads_see_their_own_isolated_options(self): - """Test that different threads see their own isolated options.""" - results = {} - errors = [] - barrier = threading.Barrier(2) - - def thread_worker(thread_id, runner_name): - try: - options = PipelineOptions([f'--runner={runner_name}']) - with scoped_pipeline_options(options): - barrier.wait(timeout=5) - - current = get_pipeline_options() - results[thread_id] = current.get_all_options()['runner'] - import time - time.sleep(0.01) - - current_after = get_pipeline_options() - if current_after is not current: - errors.append( - f"Thread {thread_id}: options changed during execution") - except Exception as e: - errors.append(f"Thread {thread_id}: {e}") - - thread1 = threading.Thread(target=thread_worker, args=(1, 'DirectRunner')) - thread2 = threading.Thread(target=thread_worker, args=(2, 'DataflowRunner')) - - thread1.start() - thread2.start() - - thread1.join(timeout=5) - thread2.join(timeout=5) - - self.assertEqual(errors, []) - self.assertEqual(results[1], 'DirectRunner') - self.assertEqual(results[2], 'DataflowRunner') - - def test_asyncio_task_isolation(self): - """Test that different asyncio tasks see their own isolated options.""" - async def async_worker( - task_id, runner_name, results, ready_event, go_event): - options = PipelineOptions([f'--runner={runner_name}']) - with scoped_pipeline_options(options): - ready_event.set() - await go_event.wait() - current = get_pipeline_options() - results[task_id] = current.get_all_options()['runner'] - await asyncio.sleep(0.01) - current_after = get_pipeline_options() - assert current_after is current, \ - f"Task {task_id}: options changed during execution" - - async def run_test(): - results = {} - ready_events = [asyncio.Event() for _ in range(2)] - go_event = asyncio.Event() - - task1 = asyncio.create_task( - async_worker(1, 'DirectRunner', results, ready_events[0], go_event)) - task2 = asyncio.create_task( - async_worker(2, 'DataflowRunner', results, ready_events[1], go_event)) - - # Wait for both tasks to be ready - await asyncio.gather(*[e.wait() for e in ready_events]) - # Signal all tasks to proceed - go_event.set() - - await asyncio.gather(task1, task2) - return results - - results = asyncio.run(run_test()) - self.assertEqual(results[1], 'DirectRunner') - self.assertEqual(results[2], 'DataflowRunner') - - def test_transform_sees_pipeline_options(self): - """Test that a transform can access pipeline options during expand().""" - class OptionsCapturingTransform(beam.PTransform): - """Transform that captures pipeline options during expand().""" - def __init__(self, expected_job_name): - self.expected_job_name = expected_job_name - self.captured_options = None - - def expand(self, pcoll): - # This runs during pipeline construction - self.captured_options = get_pipeline_options() - return pcoll | beam.Map(lambda x: x) - - options = PipelineOptions(['--job_name=test_job_123']) - transform = OptionsCapturingTransform('test_job_123') - - with beam.Pipeline(options=options) as p: - _ = p | beam.Create([1, 2, 3]) | transform - - # Verify the transform saw the correct options - self.assertIsNotNone(transform.captured_options) - self.assertEqual( - transform.captured_options.get_all_options()['job_name'], - 'test_job_123') - - def test_coder_sees_correct_options_during_run(self): - """Test that coders see correct pipeline options during proto conversion. - - This tests the run path where as_deterministic_coder() is called during - to_runner_api() proto conversion. - """ - from apache_beam.coders import coders - from apache_beam.utils import shared - - errors = [] - - class WeakRefDict(dict): - pass - - class TestKey: - def __init__(self, value): - self.value = value - - def __eq__(self, other): - return isinstance(other, TestKey) and self.value == other.value - - def __hash__(self): - return hash(self.value) - - class OptionsCapturingKeyCoder(coders.Coder): - """Coder that captures pipeline options in as_deterministic_coder.""" - shared_handle = shared.Shared() - - def encode(self, value): - return str(value.value).encode('utf-8') - - def decode(self, encoded): - return TestKey(encoded.decode('utf-8')) - - def is_deterministic(self): - return False - - def as_deterministic_coder(self, step_label, error_message=None): - opts = get_pipeline_options() - if opts is not None: - results = OptionsCapturingKeyCoder.shared_handle.acquire(WeakRefDict) - job_name = opts.get_all_options().get('job_name') - results['Worker1'] = job_name - return self - - beam.coders.registry.register_coder(TestKey, OptionsCapturingKeyCoder) - - results = OptionsCapturingKeyCoder.shared_handle.acquire(WeakRefDict) - - job_name = 'gbk_job' - options = PipelineOptions([f'--job_name={job_name}']) - - with beam.Pipeline(options=options) as p: - _ = ( - p - | beam.Create([(TestKey(1), 'a'), (TestKey(2), 'b')]) - | beam.GroupByKey()) - - self.assertEqual(errors, [], f"Errors occurred: {errors}") - self.assertEqual( - results.get('Worker1'), - job_name, - f"Worker1 saw wrong options: {results}") - self.assertFalse(get_pipeline_options() == options) - - def test_barrier_inside_default_type_hints(self): - """Test race condition detection with barrier inside default_type_hints. - - This test reliably detects race conditions because: - 1. Both threads start pipeline construction simultaneously - 2. Inside default_type_hints (which is called during Pipeline.apply()), - both threads hit a barrier and wait for each other - 3. At this point, BOTH threads are inside scoped_pipeline_options - 4. When they continue, they read options - with a global var, they'd see - the wrong values because the last thread to set options would win - """ - - results = {} - errors = [] - inner_barrier = threading.Barrier(2) - - class BarrierTransform(beam.PTransform): - """Transform that synchronizes threads INSIDE default_type_hints.""" - def __init__(self, worker_id, results_dict, barrier): - self.worker_id = worker_id - self.results_dict = results_dict - self.barrier = barrier - - def expand(self, pcoll): - return pcoll | beam.Map(lambda x: x) - - def default_type_hints(self): - self.barrier.wait(timeout=5) - opts = get_pipeline_options() - if opts is not None: - job_name = opts.get_all_options().get('job_name') - if self.worker_id not in self.results_dict: - self.results_dict[self.worker_id] = job_name - - return super().default_type_hints() - - def construct_pipeline(worker_id): - try: - job_name = f'barrier_job_{worker_id}' - options = PipelineOptions([f'--job_name={job_name}']) - transform = BarrierTransform(worker_id, results, inner_barrier) - - with beam.Pipeline(options=options) as p: - _ = p | beam.Create([1, 2, 3]) | transform - - except Exception as e: - import traceback - errors.append(f"Worker {worker_id}: {e}\n{traceback.format_exc()}") - - thread1 = threading.Thread( - target=construct_pipeline, args=(1, )) - thread2 = threading.Thread( - target=construct_pipeline, args=(2, )) - - thread1.start() - thread2.start() - - thread1.join(timeout=10) - thread2.join(timeout=10) - - self.assertEqual(errors, [], f"Errors occurred: {errors}") - self.assertEqual( - results.get(1), - 'barrier_job_1', - f"Worker 1 saw wrong options: {results}") - self.assertEqual( - results.get(2), - 'barrier_job_2', - f"Worker 2 saw wrong options: {results}") - - -if __name__ == '__main__': - unittest.main() diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index b6cfd24dc7b7..3e7a4eb6881f 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -74,7 +74,6 @@ from apache_beam.coders import typecoders from apache_beam.internal import pickler from apache_beam.io.filesystems import FileSystems -from apache_beam.options.pipeline_construction_options import scoped_pipeline_options from apache_beam.options.pipeline_options import CrossLanguageOptions from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PipelineOptions @@ -82,6 +81,7 @@ from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import StreamingOptions from apache_beam.options.pipeline_options import TypeOptions +from apache_beam.options.pipeline_options_context import scoped_pipeline_options from apache_beam.options.pipeline_options_validator import PipelineOptionsValidator from apache_beam.portability import common_urns from apache_beam.portability.api import beam_runner_api_pb2 From 3239ed41b26ca7c68a1317549f340a0be2d06dae Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 9 Feb 2026 17:09:37 -0500 Subject: [PATCH 08/10] comments. --- .../options/pipeline_options_context.py | 65 ++++ .../options/pipeline_options_context_test.py | 301 ++++++++++++++++++ 2 files changed, 366 insertions(+) create mode 100644 sdks/python/apache_beam/options/pipeline_options_context.py create mode 100644 sdks/python/apache_beam/options/pipeline_options_context_test.py diff --git a/sdks/python/apache_beam/options/pipeline_options_context.py b/sdks/python/apache_beam/options/pipeline_options_context.py new file mode 100644 index 000000000000..5f2475e334af --- /dev/null +++ b/sdks/python/apache_beam/options/pipeline_options_context.py @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Context-scoped access to pipeline options during graph construction and +translation. + +This module provides thread-safe and async-safe access to globally-available +instances of PipelineOptions using contextvars, scoped by the current pipeline. +It allows components like transforms and coders to access the pipeline's +configuration without requiring explicit parameter passing through every +level of the call stack. + +For internal use only; no backwards-compatibility guarantees. +""" + +from contextlib import contextmanager +from contextvars import ContextVar +from typing import TYPE_CHECKING +from typing import Optional + +if TYPE_CHECKING: + from apache_beam.options.pipeline_options import PipelineOptions + +# The contextvar holding the current pipeline's options. +# Each thread and each asyncio task gets its own isolated copy. +_pipeline_options: ContextVar[Optional['PipelineOptions']] = ContextVar( + 'pipeline_options', default=None) + + +def get_pipeline_options() -> Optional['PipelineOptions']: + """Get the current pipeline's options from the context. + + Returns: + The PipelineOptions for the currently executing pipeline operation, + or None if called outside of a pipeline context. + """ + return _pipeline_options.get() + + +@contextmanager +def scoped_pipeline_options(options: Optional['PipelineOptions']): + """Context manager that sets pipeline options for the duration of a block. + + Args: + options: The PipelineOptions to make available during this scope. + """ + token = _pipeline_options.set(options) + try: + yield + finally: + _pipeline_options.reset(token) diff --git a/sdks/python/apache_beam/options/pipeline_options_context_test.py b/sdks/python/apache_beam/options/pipeline_options_context_test.py new file mode 100644 index 000000000000..b157ae9ed49f --- /dev/null +++ b/sdks/python/apache_beam/options/pipeline_options_context_test.py @@ -0,0 +1,301 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests for pipeline_options_context module. + +These tests verify that the contextvar-based approach properly isolates +pipeline options across threads and async tasks, preventing race conditions. +""" + +import asyncio +import threading +import unittest + +import apache_beam as beam +from apache_beam.options.pipeline_options_context import get_pipeline_options +from apache_beam.options.pipeline_options_context import scoped_pipeline_options +from apache_beam.options.pipeline_options import PipelineOptions + + +class PipelineConstructionOptionsTest(unittest.TestCase): + def test_nested_scoping(self): + """Test that nested scopes properly restore outer options.""" + outer_options = PipelineOptions(['--runner=DirectRunner']) + inner_options = PipelineOptions(['--runner=DataflowRunner']) + + with scoped_pipeline_options(outer_options): + self.assertIs(get_pipeline_options(), outer_options) + + with scoped_pipeline_options(inner_options): + self.assertIs(get_pipeline_options(), inner_options) + + self.assertIs(get_pipeline_options(), outer_options) + + self.assertIsNone(get_pipeline_options()) + + def test_exception_in_scope_restores_options(self): + """Test that options are restored even when an exception is raised.""" + outer_options = PipelineOptions(['--runner=DirectRunner']) + inner_options = PipelineOptions(['--runner=DataflowRunner']) + + with scoped_pipeline_options(outer_options): + try: + with scoped_pipeline_options(inner_options): + self.assertIs(get_pipeline_options(), inner_options) + raise ValueError("Test exception") + except ValueError: + pass + + self.assertIs(get_pipeline_options(), outer_options) + + def test_different_threads_see_their_own_isolated_options(self): + """Test that different threads see their own isolated options.""" + results = {} + errors = [] + barrier = threading.Barrier(2) + + def thread_worker(thread_id, runner_name): + try: + options = PipelineOptions([f'--runner={runner_name}']) + with scoped_pipeline_options(options): + barrier.wait(timeout=5) + + current = get_pipeline_options() + results[thread_id] = current.get_all_options()['runner'] + import time + time.sleep(0.01) + + current_after = get_pipeline_options() + if current_after is not current: + errors.append( + f"Thread {thread_id}: options changed during execution") + except Exception as e: + errors.append(f"Thread {thread_id}: {e}") + + thread1 = threading.Thread(target=thread_worker, args=(1, 'DirectRunner')) + thread2 = threading.Thread(target=thread_worker, args=(2, 'DataflowRunner')) + + thread1.start() + thread2.start() + + thread1.join(timeout=5) + thread2.join(timeout=5) + + self.assertEqual(errors, []) + self.assertEqual(results[1], 'DirectRunner') + self.assertEqual(results[2], 'DataflowRunner') + + def test_asyncio_task_isolation(self): + """Test that different asyncio tasks see their own isolated options.""" + async def async_worker( + task_id, runner_name, results, ready_event, go_event): + options = PipelineOptions([f'--runner={runner_name}']) + with scoped_pipeline_options(options): + ready_event.set() + await go_event.wait() + current = get_pipeline_options() + results[task_id] = current.get_all_options()['runner'] + await asyncio.sleep(0.01) + current_after = get_pipeline_options() + assert current_after is current, \ + f"Task {task_id}: options changed during execution" + + async def run_test(): + results = {} + ready_events = [asyncio.Event() for _ in range(2)] + go_event = asyncio.Event() + + task1 = asyncio.create_task( + async_worker(1, 'DirectRunner', results, ready_events[0], go_event)) + task2 = asyncio.create_task( + async_worker(2, 'DataflowRunner', results, ready_events[1], go_event)) + + # Wait for both tasks to be ready + await asyncio.gather(*[e.wait() for e in ready_events]) + # Signal all tasks to proceed + go_event.set() + + await asyncio.gather(task1, task2) + return results + + results = asyncio.run(run_test()) + self.assertEqual(results[1], 'DirectRunner') + self.assertEqual(results[2], 'DataflowRunner') + + def test_transform_sees_pipeline_options(self): + """Test that a transform can access pipeline options during expand().""" + class OptionsCapturingTransform(beam.PTransform): + """Transform that captures pipeline options during expand().""" + def __init__(self, expected_job_name): + self.expected_job_name = expected_job_name + self.captured_options = None + + def expand(self, pcoll): + # This runs during pipeline construction + self.captured_options = get_pipeline_options() + return pcoll | beam.Map(lambda x: x) + + options = PipelineOptions(['--job_name=test_job_123']) + transform = OptionsCapturingTransform('test_job_123') + + with beam.Pipeline(options=options) as p: + _ = p | beam.Create([1, 2, 3]) | transform + + # Verify the transform saw the correct options + self.assertIsNotNone(transform.captured_options) + self.assertEqual( + transform.captured_options.get_all_options()['job_name'], + 'test_job_123') + + def test_coder_sees_correct_options_during_run(self): + """Test that coders see correct pipeline options during proto conversion. + + This tests the run path where as_deterministic_coder() is called during + to_runner_api() proto conversion. + """ + from apache_beam.coders import coders + from apache_beam.utils import shared + + errors = [] + + class WeakRefDict(dict): + pass + + class TestKey: + def __init__(self, value): + self.value = value + + def __eq__(self, other): + return isinstance(other, TestKey) and self.value == other.value + + def __hash__(self): + return hash(self.value) + + class OptionsCapturingKeyCoder(coders.Coder): + """Coder that captures pipeline options in as_deterministic_coder.""" + shared_handle = shared.Shared() + + def encode(self, value): + return str(value.value).encode('utf-8') + + def decode(self, encoded): + return TestKey(encoded.decode('utf-8')) + + def is_deterministic(self): + return False + + def as_deterministic_coder(self, step_label, error_message=None): + opts = get_pipeline_options() + if opts is not None: + results = OptionsCapturingKeyCoder.shared_handle.acquire(WeakRefDict) + job_name = opts.get_all_options().get('job_name') + results['Worker1'] = job_name + return self + + beam.coders.registry.register_coder(TestKey, OptionsCapturingKeyCoder) + + results = OptionsCapturingKeyCoder.shared_handle.acquire(WeakRefDict) + + job_name = 'gbk_job' + options = PipelineOptions([f'--job_name={job_name}']) + + with beam.Pipeline(options=options) as p: + _ = ( + p + | beam.Create([(TestKey(1), 'a'), (TestKey(2), 'b')]) + | beam.GroupByKey()) + + self.assertEqual(errors, [], f"Errors occurred: {errors}") + self.assertEqual( + results.get('Worker1'), + job_name, + f"Worker1 saw wrong options: {results}") + self.assertFalse(get_pipeline_options() == options) + + def test_barrier_inside_default_type_hints(self): + """Test race condition detection with barrier inside default_type_hints. + + This test reliably detects race conditions because: + 1. Both threads start pipeline construction simultaneously + 2. Inside default_type_hints (which is called during Pipeline.apply()), + both threads hit a barrier and wait for each other + 3. At this point, BOTH threads are inside scoped_pipeline_options + 4. When they continue, they read options - with a global var, they'd see + the wrong values because the last thread to set options would win + """ + + results = {} + errors = [] + inner_barrier = threading.Barrier(2) + + class BarrierTransform(beam.PTransform): + """Transform that synchronizes threads INSIDE default_type_hints.""" + def __init__(self, worker_id, results_dict, barrier): + self.worker_id = worker_id + self.results_dict = results_dict + self.barrier = barrier + + def expand(self, pcoll): + return pcoll | beam.Map(lambda x: x) + + def default_type_hints(self): + self.barrier.wait(timeout=5) + opts = get_pipeline_options() + if opts is not None: + job_name = opts.get_all_options().get('job_name') + if self.worker_id not in self.results_dict: + self.results_dict[self.worker_id] = job_name + + return super().default_type_hints() + + def construct_pipeline(worker_id): + try: + job_name = f'barrier_job_{worker_id}' + options = PipelineOptions([f'--job_name={job_name}']) + transform = BarrierTransform(worker_id, results, inner_barrier) + + with beam.Pipeline(options=options) as p: + _ = p | beam.Create([1, 2, 3]) | transform + + except Exception as e: + import traceback + errors.append(f"Worker {worker_id}: {e}\n{traceback.format_exc()}") + + thread1 = threading.Thread( + target=construct_pipeline, args=(1, )) + thread2 = threading.Thread( + target=construct_pipeline, args=(2, )) + + thread1.start() + thread2.start() + + thread1.join(timeout=10) + thread2.join(timeout=10) + + self.assertEqual(errors, [], f"Errors occurred: {errors}") + self.assertEqual( + results.get(1), + 'barrier_job_1', + f"Worker 1 saw wrong options: {results}") + self.assertEqual( + results.get(2), + 'barrier_job_2', + f"Worker 2 saw wrong options: {results}") + + +if __name__ == '__main__': + unittest.main() From 9aa7093de320a38d71786745877906c97b90330d Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 9 Feb 2026 18:04:01 -0500 Subject: [PATCH 09/10] lint. --- .../python/apache_beam/options/pipeline_options_context_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/options/pipeline_options_context_test.py b/sdks/python/apache_beam/options/pipeline_options_context_test.py index b157ae9ed49f..6979156a5090 100644 --- a/sdks/python/apache_beam/options/pipeline_options_context_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_context_test.py @@ -26,9 +26,9 @@ import unittest import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options_context import get_pipeline_options from apache_beam.options.pipeline_options_context import scoped_pipeline_options -from apache_beam.options.pipeline_options import PipelineOptions class PipelineConstructionOptionsTest(unittest.TestCase): From d964c50cba99a18bc35c995b2b1a0b0b29e9d602 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 10 Feb 2026 10:36:58 -0500 Subject: [PATCH 10/10] use apply in recursive calls --- .../options/pipeline_options_context_test.py | 34 +++++++++++++++++++ sdks/python/apache_beam/pipeline.py | 6 ++-- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options_context_test.py b/sdks/python/apache_beam/options/pipeline_options_context_test.py index 6979156a5090..da9c8fce4aab 100644 --- a/sdks/python/apache_beam/options/pipeline_options_context_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_context_test.py @@ -297,5 +297,39 @@ def construct_pipeline(worker_id): f"Worker 2 saw wrong options: {results}") +class PipelineSubclassApplyTest(unittest.TestCase): + def test_subclass_apply_called_on_recursive_paths(self): + """Test that Pipeline subclass overrides of apply() are respected. + + _apply_internal's recursive calls must go through self.apply(), not + self._apply_internal(), so that subclass interceptions are not skipped. + """ + apply_calls = [] + + class TrackingPipeline(beam.Pipeline): + def apply(self, transform, pvalueish=None, label=None): + apply_calls.append(label or transform.label) + return super().apply(transform, pvalueish, label) + + options = PipelineOptions(['--job_name=subclass_test']) + with TrackingPipeline(options=options) as p: + # "my_label" >> transform creates a _NamedPTransform, which triggers + # two recursive apply() calls: one to unwrap _NamedPTransform, and + # one to handle the label argument. + _ = p | beam.Create([1, 2, 3]) | "my_label" >> beam.Map(lambda x: x) + + # beam.Create goes through apply() once (no recursion). + # "my_label" >> Map triggers: apply(_NamedPTransform) -> apply(Map, + # label="my_label") -> apply(Map). That's 3 calls through apply(). + # Total: 1 (Create) + 3 (Map) = 4 calls minimum. + map_calls = [c for c in apply_calls if c == 'my_label' or c == 'Map'] + self.assertGreaterEqual( + len(map_calls), + 3, + f"Expected at least 3 apply() calls for the Map transform " + f"(NamedPTransform unwrap + label handling + final), " + f"got {len(map_calls)}. All calls: {apply_calls}") + + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 3e7a4eb6881f..a6080f2f3e7f 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -715,7 +715,7 @@ def _apply_internal( label: Optional[str] = None) -> pvalue.PValue: """Internal implementation of apply(), called within scoped options.""" if isinstance(transform, ptransform._NamedPTransform): - return self._apply_internal( + return self.apply( transform.transform, pvalueish, label or transform.label) if not label and isinstance(transform, ptransform._PTransformFnPTransform): @@ -730,7 +730,7 @@ def _apply_internal( # (e.g. to produce error messages for type hint violations). old_label, transform.label = transform.label, label try: - return self._apply_internal(transform, pvalueish) + return self.apply(transform, pvalueish) finally: transform.label = old_label @@ -753,7 +753,7 @@ def _apply_internal( 'updating a pipeline or reloading the job state. ' 'This is not recommended for streaming jobs.') unique_label = self._generate_unique_label(transform) - return self._apply_internal(transform, pvalueish, unique_label) + return self.apply(transform, pvalueish, unique_label) else: raise RuntimeError( 'A transform with label "%s" already exists in the pipeline. '