From 62643db88635ffb20c7e98d97d25816e74a4795a Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Wed, 5 Nov 2025 20:39:47 +0000 Subject: [PATCH 01/20] [BEAM-36736] Add state sampling for timer processing --- .../apache_beam/runners/worker/operations.py | 31 +++++++++++----- .../runners/worker/statesampler.py | 5 ++- .../runners/worker/statesampler_test.py | 37 +++++++++++++++++++ 3 files changed, 62 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 2b20bebe0940..e9952e5fab00 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -809,7 +809,13 @@ def __init__( self.tagged_receivers = None # type: Optional[_TaggedReceivers] # A mapping of timer tags to the input "PCollections" they come in on. self.input_info = None # type: Optional[OpInputInfo] - + self.scoped_timer_processing_state = None + if self.state_sampler: + self.scoped_timer_processing_state = self.state_sampler.scoped_state( + self.name_context, + 'process-timers', + metrics_container=self.metrics_container, + suffix="-millis") # See fn_data in dataflow_runner.py # TODO: Store all the items from spec? self.fn, _, _, _, _ = (pickler.loads(self.spec.serialized_fn)) @@ -971,14 +977,21 @@ def add_timer_info(self, timer_family_id, timer_info): self.user_state_context.add_timer_info(timer_family_id, timer_info) def process_timer(self, tag, timer_data): - timer_spec = self.timer_specs[tag] - self.dofn_runner.process_user_timer( - timer_spec, - timer_data.user_key, - timer_data.windows[0], - timer_data.fire_timestamp, - timer_data.paneinfo, - timer_data.dynamic_timer_tag) + def process_timer_logic(): + timer_spec = self.timer_specs[tag] + self.dofn_runner.process_user_timer( + timer_spec, + timer_data.user_key, + timer_data.windows[0], + timer_data.fire_timestamp, + timer_data.paneinfo, + timer_data.dynamic_timer_tag) + + if self.scoped_timer_processing_state: + with self.scoped_timer_processing_state: + process_timer_logic() + else: + process_timer_logic() def finish(self): # type: () -> None diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py b/sdks/python/apache_beam/runners/worker/statesampler.py index b9c75f4de93d..53520805c091 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler.py +++ b/sdks/python/apache_beam/runners/worker/statesampler.py @@ -134,7 +134,8 @@ def scoped_state( name_context: Union[str, 'common.NameContext'], state_name: str, io_target=None, - metrics_container: Optional['MetricsContainer'] = None + metrics_container: Optional['MetricsContainer'] = None, + suffix: str = '-msecs' ) -> statesampler_impl.ScopedState: """Returns a ScopedState object associated to a Step and a State. @@ -152,7 +153,7 @@ def scoped_state( name_context = common.NameContext(name_context) counter_name = CounterName( - state_name + '-msecs', + state_name + suffix, stage_name=self._prefix, step_name=name_context.metrics_name(), io_target=io_target) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index c9ea7e8eef97..f78db0e368c4 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -127,6 +127,43 @@ def test_sampler_transition_overhead(self): # debug mode). self.assertLess(overhead_us, 20.0) + @retry(reraise=True, stop=stop_after_attempt(3)) + def test_timer_sampler(self): + # Set up state sampler. + counter_factory = CounterFactory() + sampler = statesampler.StateSampler( + 'timer', counter_factory, sampling_period_ms=1) + + # Duration of the timer processing. + state_duration_ms = 100 + margin_of_error = 0.25 + + sampler.start() + with sampler.scoped_state( + 'step1', 'process-timers', suffix='-millis'): + time.sleep(state_duration_ms / 1000) + sampler.stop() + sampler.commit_counters() + + if not statesampler.FAST_SAMPLER: + # The slow sampler does not implement sampling, so we won't test it. + return + + # Test that sampled state timings are close to their expected values. + expected_counter_values = { + CounterName('process-timers-millis', step_name='step1', stage_name='timer'): + state_duration_ms, + } + for counter in counter_factory.get_counters(): + self.assertIn(counter.name, expected_counter_values) + expected_value = expected_counter_values[counter.name] + actual_value = counter.value() + deviation = float(abs(actual_value - expected_value)) / expected_value + _LOGGER.info('Sampling deviation from expectation: %f', deviation) + self.assertGreater(actual_value, expected_value * (1.0 - margin_of_error)) + self.assertLess(actual_value, expected_value * (1.0 + margin_of_error)) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From c5cb7847eb53b4fd60418dce584dc98a17a415d0 Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Wed, 5 Nov 2025 23:25:26 +0000 Subject: [PATCH 02/20] Force CI to rebuild --- sdks/python/apache_beam/runners/worker/operations.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index e9952e5fab00..2b8e4bba7dac 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -808,6 +808,7 @@ def __init__( self.user_state_context = user_state_context self.tagged_receivers = None # type: Optional[_TaggedReceivers] # A mapping of timer tags to the input "PCollections" they come in on. + # Force clean rebuild self.input_info = None # type: Optional[OpInputInfo] self.scoped_timer_processing_state = None if self.state_sampler: From 1ce3c64f87680900654da7daf0cf013c4b1db681 Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Thu, 6 Nov 2025 00:47:15 +0000 Subject: [PATCH 03/20] Fix error with no state found --- .../apache_beam/runners/worker/operations.pxd | 1 + .../apache_beam/runners/worker/operations.py | 16 ++++++++++------ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd index f24b75a720e0..c301f5bca695 100644 --- a/sdks/python/apache_beam/runners/worker/operations.pxd +++ b/sdks/python/apache_beam/runners/worker/operations.pxd @@ -83,6 +83,7 @@ cdef class Operation(object): cdef readonly object scoped_start_state cdef readonly object scoped_process_state cdef readonly object scoped_finish_state + cdef readonly object scoped_timer_processing_state cdef readonly object data_sampler diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 2b8e4bba7dac..863ef8ed9fc8 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -444,12 +444,16 @@ def __init__( self.metrics_container = MetricsContainer(self.name_context.metrics_name()) self.state_sampler = state_sampler - self.scoped_start_state = self.state_sampler.scoped_state( - self.name_context, 'start', metrics_container=self.metrics_container) - self.scoped_process_state = self.state_sampler.scoped_state( - self.name_context, 'process', metrics_container=self.metrics_container) - self.scoped_finish_state = self.state_sampler.scoped_state( - self.name_context, 'finish', metrics_container=self.metrics_container) + self.scoped_start_state = None + self.scoped_process_state = None + self.scoped_finish_state = None + if self.state_sampler: + self.scoped_start_state = self.state_sampler.scoped_state( + self.name_context, 'start', metrics_container=self.metrics_container) + self.scoped_process_state = self.state_sampler.scoped_state( + self.name_context, 'process', metrics_container=self.metrics_container) + self.scoped_finish_state = self.state_sampler.scoped_state( + self.name_context, 'finish', metrics_container=self.metrics_container) # TODO(ccy): the '-abort' state can be added when the abort is supported in # Operations. self.receivers = [] # type: List[ConsumerSet] From c80fa858bb0e00e4f69df6a3b51e30336c855b85 Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Thu, 6 Nov 2025 02:41:11 +0000 Subject: [PATCH 04/20] Fix error for Regex test --- .../apache_beam/runners/worker/operations.pxd | 2 +- .../apache_beam/runners/worker/operations.py | 13 ++++++++++--- .../apache_beam/runners/worker/statesampler.py | 17 +++++++++++++++-- .../runners/worker/statesampler_test.py | 8 +++----- 4 files changed, 29 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd index c301f5bca695..3179385d740f 100644 --- a/sdks/python/apache_beam/runners/worker/operations.pxd +++ b/sdks/python/apache_beam/runners/worker/operations.pxd @@ -83,7 +83,6 @@ cdef class Operation(object): cdef readonly object scoped_start_state cdef readonly object scoped_process_state cdef readonly object scoped_finish_state - cdef readonly object scoped_timer_processing_state cdef readonly object data_sampler @@ -118,6 +117,7 @@ cdef class DoOperation(Operation): cdef dict timer_specs cdef public object input_info cdef object fn + cdef readonly object scoped_timer_processing_state cdef class SdfProcessSizedElements(DoOperation): diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 863ef8ed9fc8..55d95b73fc8c 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -49,6 +49,7 @@ from apache_beam.runners.worker import opcounters from apache_beam.runners.worker import operation_specs from apache_beam.runners.worker import sideinputs +from apache_beam.runners.worker import statesampler from apache_beam.runners.worker.data_sampler import DataSampler from apache_beam.transforms import sideinputs as apache_sideinputs from apache_beam.transforms import combiners @@ -451,9 +452,15 @@ def __init__( self.scoped_start_state = self.state_sampler.scoped_state( self.name_context, 'start', metrics_container=self.metrics_container) self.scoped_process_state = self.state_sampler.scoped_state( - self.name_context, 'process', metrics_container=self.metrics_container) + self.name_context, + 'process', + metrics_container=self.metrics_container) self.scoped_finish_state = self.state_sampler.scoped_state( self.name_context, 'finish', metrics_container=self.metrics_container) + else: + self.scoped_start_state = statesampler.NOOP_SCOPED_STATE + self.scoped_process_state = statesampler.NOOP_SCOPED_STATE + self.scoped_finish_state = statesampler.NOOP_SCOPED_STATE # TODO(ccy): the '-abort' state can be added when the abort is supported in # Operations. self.receivers = [] # type: List[ConsumerSet] @@ -812,9 +819,9 @@ def __init__( self.user_state_context = user_state_context self.tagged_receivers = None # type: Optional[_TaggedReceivers] # A mapping of timer tags to the input "PCollections" they come in on. - # Force clean rebuild + # Force clean rebuild self.input_info = None # type: Optional[OpInputInfo] - self.scoped_timer_processing_state = None + self.scoped_timer_processing_state = statesampler.NOOP_SCOPED_STATE if self.state_sampler: self.scoped_timer_processing_state = self.state_sampler.scoped_state( self.name_context, diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py b/sdks/python/apache_beam/runners/worker/statesampler.py index 53520805c091..53c3d8055101 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler.py +++ b/sdks/python/apache_beam/runners/worker/statesampler.py @@ -135,8 +135,7 @@ def scoped_state( state_name: str, io_target=None, metrics_container: Optional['MetricsContainer'] = None, - suffix: str = '-msecs' - ) -> statesampler_impl.ScopedState: + suffix: str = '-msecs') -> statesampler_impl.ScopedState: """Returns a ScopedState object associated to a Step and a State. Args: @@ -171,3 +170,17 @@ def commit_counters(self) -> None: for state in self._states_by_name.values(): state_msecs = int(1e-6 * state.nsecs) state.counter.update(state_msecs - state.counter.value()) + + +class NoOpScopedState: + def __enter__(self): + pass + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def sampled_msecs_int(self): + return 0 + + +NOOP_SCOPED_STATE = NoOpScopedState() diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index f78db0e368c4..fcdcf9d2ae36 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -139,8 +139,7 @@ def test_timer_sampler(self): margin_of_error = 0.25 sampler.start() - with sampler.scoped_state( - 'step1', 'process-timers', suffix='-millis'): + with sampler.scoped_state('step1', 'process-timers', suffix='-millis'): time.sleep(state_duration_ms / 1000) sampler.stop() sampler.commit_counters() @@ -151,8 +150,8 @@ def test_timer_sampler(self): # Test that sampled state timings are close to their expected values. expected_counter_values = { - CounterName('process-timers-millis', step_name='step1', stage_name='timer'): - state_duration_ms, + CounterName( + 'process-timers-millis', step_name='step1', stage_name='timer'): state_duration_ms, } for counter in counter_factory.get_counters(): self.assertIn(counter.name, expected_counter_values) @@ -164,7 +163,6 @@ def test_timer_sampler(self): self.assertLess(actual_value, expected_value * (1.0 + margin_of_error)) - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() From 188a767ade7485f6b35d598236591ce72570a611 Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Thu, 6 Nov 2025 05:40:42 +0000 Subject: [PATCH 05/20] Resolve linting error --- sdks/python/apache_beam/runners/worker/operations.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 55d95b73fc8c..249a77d46225 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -445,9 +445,6 @@ def __init__( self.metrics_container = MetricsContainer(self.name_context.metrics_name()) self.state_sampler = state_sampler - self.scoped_start_state = None - self.scoped_process_state = None - self.scoped_finish_state = None if self.state_sampler: self.scoped_start_state = self.state_sampler.scoped_state( self.name_context, 'start', metrics_container=self.metrics_container) From 9b5a91c989fe15d99d1cedc04fd6c2ffa1ac33ee Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Fri, 7 Nov 2025 03:44:03 +0000 Subject: [PATCH 06/20] Add test case to test full functionality --- .../apache_beam/runners/worker/operations.py | 3 +- .../runners/worker/statesampler_test.py | 55 ++++++++++++++++++- 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 249a77d46225..7f5b18497b14 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -823,8 +823,7 @@ def __init__( self.scoped_timer_processing_state = self.state_sampler.scoped_state( self.name_context, 'process-timers', - metrics_container=self.metrics_container, - suffix="-millis") + metrics_container=self.metrics_container) # See fn_data in dataflow_runner.py # TODO: Store all the items from spec? self.fn, _, _, _, _ = (pickler.loads(self.spec.serialized_fn)) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index fcdcf9d2ae36..2e76444b35b6 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -151,7 +151,7 @@ def test_timer_sampler(self): # Test that sampled state timings are close to their expected values. expected_counter_values = { CounterName( - 'process-timers-millis', step_name='step1', stage_name='timer'): state_duration_ms, + 'process-timers', step_name='step1', stage_name='timer'): state_duration_ms, } for counter in counter_factory.get_counters(): self.assertIn(counter.name, expected_counter_values) @@ -162,6 +162,59 @@ def test_timer_sampler(self): self.assertGreater(actual_value, expected_value * (1.0 - margin_of_error)) self.assertLess(actual_value, expected_value * (1.0 + margin_of_error)) + @retry(reraise=True, stop=stop_after_attempt(3)) + def test_process_timers_metric_is_recorded(self): + """ + Tests that the 'process-timers-msecs' metric is correctly recorded + when a state sampler is active. + """ + # Set up a real state sampler and counter factory. + counter_factory = CounterFactory() + sampler = statesampler.StateSampler( + 'test_stage', counter_factory, sampling_period_ms=1) + + state_duration_ms = 100 + margin_of_error = 0.25 + + # Run a workload inside the 'process-timers' scoped state. + sampler.start() + with sampler.scoped_state('test_step', 'process-timers'): + time.sleep(state_duration_ms / 1000.0) + sampler.stop() + sampler.commit_counters() + + if not statesampler.FAST_SAMPLER: + return + + # Verify that the counter was created with the correct name and value. + expected_counter_name = CounterName( + 'process-timers-msecs', + step_name='test_step', + stage_name='test_stage') + + # Find the specific counter we are looking for. + found_counter = None + for counter in counter_factory.get_counters(): + if counter.name == expected_counter_name: + found_counter = counter + break + + self.assertIsNotNone( + found_counter, + f"The expected counter '{expected_counter_name}' was not created.") + + # Check that its value is approximately correct. + actual_value = found_counter.value() + expected_value = state_duration_ms + self.assertGreater( + actual_value, + expected_value * (1.0 - margin_of_error), + "The timer metric was lower than expected.") + self.assertLess( + actual_value, + expected_value * (1.0 + margin_of_error), + "The timer metric was higher than expected.") + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From f4b44dbcd86d6f34c849a1bdfa8af6ac1061fdc8 Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Fri, 7 Nov 2025 04:32:24 +0000 Subject: [PATCH 07/20] Fix suffix issue --- sdks/python/apache_beam/runners/worker/statesampler_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 2e76444b35b6..fcb3a7b8b41f 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -139,7 +139,7 @@ def test_timer_sampler(self): margin_of_error = 0.25 sampler.start() - with sampler.scoped_state('step1', 'process-timers', suffix='-millis'): + with sampler.scoped_state('step1', 'process-timers'): time.sleep(state_duration_ms / 1000) sampler.stop() sampler.commit_counters() @@ -151,7 +151,7 @@ def test_timer_sampler(self): # Test that sampled state timings are close to their expected values. expected_counter_values = { CounterName( - 'process-timers', step_name='step1', stage_name='timer'): state_duration_ms, + 'process-timers-msecs', step_name='step1', stage_name='timer'): state_duration_ms, } for counter in counter_factory.get_counters(): self.assertIn(counter.name, expected_counter_values) From ed86d99b981661293d512a629da856701956b1a9 Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Fri, 7 Nov 2025 06:32:50 +0000 Subject: [PATCH 08/20] Fix formatting issues using tox -e yapf-check --- sdks/python/apache_beam/runners/worker/statesampler_test.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index fcb3a7b8b41f..6b0af6a8237d 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -188,9 +188,7 @@ def test_process_timers_metric_is_recorded(self): # Verify that the counter was created with the correct name and value. expected_counter_name = CounterName( - 'process-timers-msecs', - step_name='test_step', - stage_name='test_stage') + 'process-timers-msecs', step_name='test_step', stage_name='test_stage') # Find the specific counter we are looking for. found_counter = None From a14646388b2c54c62bca786d9b28866d6dde98bb Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Fri, 7 Nov 2025 17:11:52 +0000 Subject: [PATCH 09/20] Add test cases to test code paths --- .../runners/worker/statesampler_test.py | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 6b0af6a8237d..dffa811d1edc 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -21,6 +21,7 @@ import logging import time import unittest +from unittest.mock import Mock from tenacity import retry from tenacity import stop_after_attempt @@ -28,6 +29,10 @@ from apache_beam.runners.worker import statesampler from apache_beam.utils.counters import CounterFactory from apache_beam.utils.counters import CounterName +from apache_beam.runners.worker import operation_specs +from apache_beam.runners.worker import operations +from apache_beam.internal import pickler +from apache_beam.transforms import core _LOGGER = logging.getLogger(__name__) @@ -213,6 +218,59 @@ def test_process_timers_metric_is_recorded(self): expected_value * (1.0 + margin_of_error), "The timer metric was higher than expected.") + def test_do_operation_with_sampler(self): + """ + Tests that a DoOperation with an active state_sampler correctly + creates a real ScopedState object for timer processing. + """ + mock_spec = operation_specs.WorkerDoFn( + serialized_fn=pickler.dumps((core.DoFn(), None, None, None, None)), + output_tags=[], + input=None, + side_inputs=[], + output_coders=[]) + + sampler = statesampler.StateSampler( + 'test_stage', CounterFactory(), sampling_period_ms=1) + + # 1. Create the operation WITHOUT the unexpected keyword argument + op = operations.create_operation( + name_context='test_op', + spec=mock_spec, + counter_factory=CounterFactory(), + state_sampler=sampler) + + # 2. Set the user_state_context attribute AFTER creation + op.user_state_context = Mock() + + self.assertIsNot( + op.scoped_timer_processing_state, statesampler.NOOP_SCOPED_STATE) + + def test_do_operation_without_sampler(self): + """ + Tests that a DoOperation without a state_sampler correctly uses the + NOOP_SCOPED_STATE for timer processing. + """ + mock_spec = operation_specs.WorkerDoFn( + serialized_fn=pickler.dumps((core.DoFn(), None, None, None, None)), + output_tags=[], + input=None, + side_inputs=[], + output_coders=[]) + + # 1. Create the operation WITHOUT the unexpected keyword argument + op = operations.create_operation( + name_context='test_op', + spec=mock_spec, + counter_factory=CounterFactory(), + state_sampler=None) + + # 2. Set the user_state_context attribute AFTER creation + op.user_state_context = Mock() + + self.assertIs( + op.scoped_timer_processing_state, statesampler.NOOP_SCOPED_STATE) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From 87b50b9c62b68e3bed3ab487f4610d3474c57c34 Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Wed, 12 Nov 2025 00:35:07 +0000 Subject: [PATCH 10/20] Address comments and remove extra test case --- .../apache_beam/runners/worker/operations.py | 1 - .../runners/worker/statesampler_test.py | 35 ------------------- 2 files changed, 36 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 7f5b18497b14..42aa1425ee4a 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -816,7 +816,6 @@ def __init__( self.user_state_context = user_state_context self.tagged_receivers = None # type: Optional[_TaggedReceivers] # A mapping of timer tags to the input "PCollections" they come in on. - # Force clean rebuild self.input_info = None # type: Optional[OpInputInfo] self.scoped_timer_processing_state = statesampler.NOOP_SCOPED_STATE if self.state_sampler: diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index dffa811d1edc..952a5969d6d4 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -132,41 +132,6 @@ def test_sampler_transition_overhead(self): # debug mode). self.assertLess(overhead_us, 20.0) - @retry(reraise=True, stop=stop_after_attempt(3)) - def test_timer_sampler(self): - # Set up state sampler. - counter_factory = CounterFactory() - sampler = statesampler.StateSampler( - 'timer', counter_factory, sampling_period_ms=1) - - # Duration of the timer processing. - state_duration_ms = 100 - margin_of_error = 0.25 - - sampler.start() - with sampler.scoped_state('step1', 'process-timers'): - time.sleep(state_duration_ms / 1000) - sampler.stop() - sampler.commit_counters() - - if not statesampler.FAST_SAMPLER: - # The slow sampler does not implement sampling, so we won't test it. - return - - # Test that sampled state timings are close to their expected values. - expected_counter_values = { - CounterName( - 'process-timers-msecs', step_name='step1', stage_name='timer'): state_duration_ms, - } - for counter in counter_factory.get_counters(): - self.assertIn(counter.name, expected_counter_values) - expected_value = expected_counter_values[counter.name] - actual_value = counter.value() - deviation = float(abs(actual_value - expected_value)) / expected_value - _LOGGER.info('Sampling deviation from expectation: %f', deviation) - self.assertGreater(actual_value, expected_value * (1.0 - margin_of_error)) - self.assertLess(actual_value, expected_value * (1.0 + margin_of_error)) - @retry(reraise=True, stop=stop_after_attempt(3)) def test_process_timers_metric_is_recorded(self): """ From 8e1dbb3962ca17fd38636ec915bd629f46ea6315 Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Wed, 12 Nov 2025 18:19:05 +0000 Subject: [PATCH 11/20] Remove user state context variable --- sdks/python/apache_beam/runners/worker/statesampler_test.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 952a5969d6d4..9ff518d9709f 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -205,9 +205,6 @@ def test_do_operation_with_sampler(self): counter_factory=CounterFactory(), state_sampler=sampler) - # 2. Set the user_state_context attribute AFTER creation - op.user_state_context = Mock() - self.assertIsNot( op.scoped_timer_processing_state, statesampler.NOOP_SCOPED_STATE) @@ -230,9 +227,6 @@ def test_do_operation_without_sampler(self): counter_factory=CounterFactory(), state_sampler=None) - # 2. Set the user_state_context attribute AFTER creation - op.user_state_context = Mock() - self.assertIs( op.scoped_timer_processing_state, statesampler.NOOP_SCOPED_STATE) From bf55c2d7e0d3bc20b80fd3802c8aef88954b130d Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Thu, 13 Nov 2025 21:41:02 +0000 Subject: [PATCH 12/20] Adjust state duration for test to avoid flakiness --- sdks/python/apache_beam/runners/worker/statesampler_test.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 9ff518d9709f..4b869ed198cf 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -143,8 +143,9 @@ def test_process_timers_metric_is_recorded(self): sampler = statesampler.StateSampler( 'test_stage', counter_factory, sampling_period_ms=1) - state_duration_ms = 100 - margin_of_error = 0.25 + # Keeps range between 50-350 ms, which is fair. + state_duration_ms = 200 + margin_of_error = 0.75 # Run a workload inside the 'process-timers' scoped state. sampler.start() From d41f76b745bcb38c2ceafaf0ffe745ec2b0036cf Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Fri, 14 Nov 2025 05:45:47 +0000 Subject: [PATCH 13/20] Add different tests, remove no op scoped state, and address formatting/lint issues --- .../apache_beam/runners/worker/operations.py | 38 ++--- .../runners/worker/statesampler.py | 14 -- .../runners/worker/statesampler_test.py | 156 ++++++++++++++---- 3 files changed, 138 insertions(+), 70 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 42aa1425ee4a..7668564b6eb3 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -49,7 +49,6 @@ from apache_beam.runners.worker import opcounters from apache_beam.runners.worker import operation_specs from apache_beam.runners.worker import sideinputs -from apache_beam.runners.worker import statesampler from apache_beam.runners.worker.data_sampler import DataSampler from apache_beam.transforms import sideinputs as apache_sideinputs from apache_beam.transforms import combiners @@ -445,19 +444,12 @@ def __init__( self.metrics_container = MetricsContainer(self.name_context.metrics_name()) self.state_sampler = state_sampler - if self.state_sampler: - self.scoped_start_state = self.state_sampler.scoped_state( - self.name_context, 'start', metrics_container=self.metrics_container) - self.scoped_process_state = self.state_sampler.scoped_state( - self.name_context, - 'process', - metrics_container=self.metrics_container) - self.scoped_finish_state = self.state_sampler.scoped_state( - self.name_context, 'finish', metrics_container=self.metrics_container) - else: - self.scoped_start_state = statesampler.NOOP_SCOPED_STATE - self.scoped_process_state = statesampler.NOOP_SCOPED_STATE - self.scoped_finish_state = statesampler.NOOP_SCOPED_STATE + self.scoped_start_state = self.state_sampler.scoped_state( + self.name_context, 'start', metrics_container=self.metrics_container) + self.scoped_process_state = self.state_sampler.scoped_state( + self.name_context, 'process', metrics_container=self.metrics_container) + self.scoped_finish_state = self.state_sampler.scoped_state( + self.name_context, 'finish', metrics_container=self.metrics_container) # TODO(ccy): the '-abort' state can be added when the abort is supported in # Operations. self.receivers = [] # type: List[ConsumerSet] @@ -817,12 +809,10 @@ def __init__( self.tagged_receivers = None # type: Optional[_TaggedReceivers] # A mapping of timer tags to the input "PCollections" they come in on. self.input_info = None # type: Optional[OpInputInfo] - self.scoped_timer_processing_state = statesampler.NOOP_SCOPED_STATE - if self.state_sampler: - self.scoped_timer_processing_state = self.state_sampler.scoped_state( - self.name_context, - 'process-timers', - metrics_container=self.metrics_container) + self.scoped_timer_processing_state = self.state_sampler.scoped_state( + self.name_context, + 'process-timers', + metrics_container=self.metrics_container) # See fn_data in dataflow_runner.py # TODO: Store all the items from spec? self.fn, _, _, _, _ = (pickler.loads(self.spec.serialized_fn)) @@ -984,7 +974,7 @@ def add_timer_info(self, timer_family_id, timer_info): self.user_state_context.add_timer_info(timer_family_id, timer_info) def process_timer(self, tag, timer_data): - def process_timer_logic(): + with self.scoped_timer_processing_state: timer_spec = self.timer_specs[tag] self.dofn_runner.process_user_timer( timer_spec, @@ -994,12 +984,6 @@ def process_timer_logic(): timer_data.paneinfo, timer_data.dynamic_timer_tag) - if self.scoped_timer_processing_state: - with self.scoped_timer_processing_state: - process_timer_logic() - else: - process_timer_logic() - def finish(self): # type: () -> None super(DoOperation, self).finish() diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py b/sdks/python/apache_beam/runners/worker/statesampler.py index 53c3d8055101..4cda3a4daa1c 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler.py +++ b/sdks/python/apache_beam/runners/worker/statesampler.py @@ -170,17 +170,3 @@ def commit_counters(self) -> None: for state in self._states_by_name.values(): state_msecs = int(1e-6 * state.nsecs) state.counter.update(state_msecs - state.counter.value()) - - -class NoOpScopedState: - def __enter__(self): - pass - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - def sampled_msecs_int(self): - return 0 - - -NOOP_SCOPED_STATE = NoOpScopedState() diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 4b869ed198cf..331a8208ec79 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -26,13 +26,14 @@ from tenacity import retry from tenacity import stop_after_attempt -from apache_beam.runners.worker import statesampler -from apache_beam.utils.counters import CounterFactory -from apache_beam.utils.counters import CounterName +from apache_beam.internal import pickler +from apache_beam.runners import common from apache_beam.runners.worker import operation_specs from apache_beam.runners.worker import operations -from apache_beam.internal import pickler +from apache_beam.runners.worker import statesampler from apache_beam.transforms import core +from apache_beam.utils.counters import CounterFactory +from apache_beam.utils.counters import CounterName _LOGGER = logging.getLogger(__name__) @@ -143,7 +144,7 @@ def test_process_timers_metric_is_recorded(self): sampler = statesampler.StateSampler( 'test_stage', counter_factory, sampling_period_ms=1) - # Keeps range between 50-350 ms, which is fair. + # Keeps range between 50-350 ms, which is fair. state_duration_ms = 200 margin_of_error = 0.75 @@ -184,11 +185,15 @@ def test_process_timers_metric_is_recorded(self): expected_value * (1.0 + margin_of_error), "The timer metric was higher than expected.") - def test_do_operation_with_sampler(self): + def test_do_operation_process_timer_metric(self): + """ + Tests that the 'process-timers-msecs' metric is correctly recorded + when a timer is processed within a DoOperation. """ - Tests that a DoOperation with an active state_sampler correctly - creates a real ScopedState object for timer processing. - """ + counter_factory = CounterFactory() + sampler = statesampler.StateSampler( + 'test_stage', counter_factory, sampling_period_ms=1) + mock_spec = operation_specs.WorkerDoFn( serialized_fn=pickler.dumps((core.DoFn(), None, None, None, None)), output_tags=[], @@ -196,24 +201,70 @@ def test_do_operation_with_sampler(self): side_inputs=[], output_coders=[]) - sampler = statesampler.StateSampler( - 'test_stage', CounterFactory(), sampling_period_ms=1) - - # 1. Create the operation WITHOUT the unexpected keyword argument - op = operations.create_operation( - name_context='test_op', + op = operations.DoOperation( + name=common.NameContext('test_op'), spec=mock_spec, - counter_factory=CounterFactory(), - state_sampler=sampler) + counter_factory=counter_factory, + sampler=sampler) + + op.dofn_runner = Mock() + op.timer_specs = {'timer_id': Mock()} + state_duration_ms = 200 + margin_of_error = 0.75 + + def mock_process_user_timer(*args, **kwargs): + time.sleep(state_duration_ms / 1000.0) + + op.dofn_runner.process_user_timer = mock_process_user_timer + + mock_timer_data = Mock() + mock_timer_data.windows = [Mock()] + mock_timer_data.user_key = Mock() + mock_timer_data.fire_timestamp = Mock() + mock_timer_data.paneinfo = Mock() + mock_timer_data.dynamic_timer_tag = Mock() - self.assertIsNot( - op.scoped_timer_processing_state, statesampler.NOOP_SCOPED_STATE) + sampler.start() + op.process_timer('timer_id', mock_timer_data) + sampler.stop() + sampler.commit_counters() + + if not statesampler.FAST_SAMPLER: + return + + expected_counter_name = CounterName( + 'process-timers-msecs', step_name='test_op', stage_name='test_stage') + + found_counter = None + for counter in counter_factory.get_counters(): + if counter.name == expected_counter_name: + found_counter = counter + break + + self.assertIsNotNone( + found_counter, + f"The expected counter '{expected_counter_name}' was not created.") + + actual_value = found_counter.value() + expected_value = state_duration_ms + self.assertGreater( + actual_value, + expected_value * (1.0 - margin_of_error), + "The timer metric was lower than expected.") + self.assertLess( + actual_value, + expected_value * (1.0 + margin_of_error), + "The timer metric was higher than expected.") - def test_do_operation_without_sampler(self): + def test_do_operation_process_timer_metric_with_exception(self): """ - Tests that a DoOperation without a state_sampler correctly uses the - NOOP_SCOPED_STATE for timer processing. + Tests that the 'process-timers-msecs' metric is still recorded + when a timer callback in a DoOperation raises an exception. """ + counter_factory = CounterFactory() + sampler = statesampler.StateSampler( + 'test_stage', counter_factory, sampling_period_ms=1) + mock_spec = operation_specs.WorkerDoFn( serialized_fn=pickler.dumps((core.DoFn(), None, None, None, None)), output_tags=[], @@ -221,15 +272,62 @@ def test_do_operation_without_sampler(self): side_inputs=[], output_coders=[]) - # 1. Create the operation WITHOUT the unexpected keyword argument - op = operations.create_operation( - name_context='test_op', + op = operations.DoOperation( + name=common.NameContext('test_op'), spec=mock_spec, - counter_factory=CounterFactory(), - state_sampler=None) + counter_factory=counter_factory, + sampler=sampler) + + op.dofn_runner = Mock() + op.timer_specs = {'timer_id': Mock()} + state_duration_ms = 200 + margin_of_error = 0.75 + + def mock_process_user_timer(*args, **kwargs): + time.sleep(state_duration_ms / 1000.0) + raise ValueError("Test Exception") + + op.dofn_runner.process_user_timer = mock_process_user_timer + + mock_timer_data = Mock() + mock_timer_data.windows = [Mock()] + mock_timer_data.user_key = Mock() + mock_timer_data.fire_timestamp = Mock() + mock_timer_data.paneinfo = Mock() + mock_timer_data.dynamic_timer_tag = Mock() + + sampler.start() + with self.assertRaises(ValueError): + op.process_timer('timer_id', mock_timer_data) + sampler.stop() + sampler.commit_counters() + + if not statesampler.FAST_SAMPLER: + return + + expected_counter_name = CounterName( + 'process-timers-msecs', step_name='test_op', stage_name='test_stage') + + found_counter = None + for counter in counter_factory.get_counters(): + if counter.name == expected_counter_name: + found_counter = counter + break - self.assertIs( - op.scoped_timer_processing_state, statesampler.NOOP_SCOPED_STATE) + self.assertIsNotNone( + found_counter, + f"The expected counter '{expected_counter_name}' was not created.") + + actual_value = found_counter.value() + expected_value = state_duration_ms + self.assertGreater( + actual_value, + expected_value * (1.0 - margin_of_error), + "The timer metric was lower than expected.") + self.assertLess( + actual_value, + expected_value * (1.0 + margin_of_error), + "The timer metric was higher than expected.") if __name__ == '__main__': From f85191fc232c22d86c423385b487e41b80277127 Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Fri, 14 Nov 2025 07:48:13 +0000 Subject: [PATCH 14/20] Add patch to deal with CI presubmit errors --- .../apache_beam/runners/worker/statesampler_test.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 331a8208ec79..7e7fd3a2a869 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -21,7 +21,7 @@ import logging import time import unittest -from unittest.mock import Mock +from unittest.mock import Mock, patch from tenacity import retry from tenacity import stop_after_attempt @@ -185,7 +185,8 @@ def test_process_timers_metric_is_recorded(self): expected_value * (1.0 + margin_of_error), "The timer metric was higher than expected.") - def test_do_operation_process_timer_metric(self): + @patch('apache_beam.runners.common.DoFnRunner') + def test_do_operation_process_timer_metric(self, mock_dofn_runner): """ Tests that the 'process-timers-msecs' metric is correctly recorded when a timer is processed within a DoOperation. @@ -256,7 +257,8 @@ def mock_process_user_timer(*args, **kwargs): expected_value * (1.0 + margin_of_error), "The timer metric was higher than expected.") - def test_do_operation_process_timer_metric_with_exception(self): + @patch('apache_beam.runners.common.DoFnRunner') + def test_do_operation_process_timer_metric_with_exception(self, mock_dofn_runner): """ Tests that the 'process-timers-msecs' metric is still recorded when a timer callback in a DoOperation raises an exception. From d063094f2738efa8b4022a04eb4707eeade8c7f1 Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Fri, 14 Nov 2025 21:38:14 +0000 Subject: [PATCH 15/20] Adjust test case to not use dofn_runner --- .../runners/worker/statesampler_test.py | 56 ++++++++++++------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 7e7fd3a2a869..304897dc8ac2 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -21,7 +21,8 @@ import logging import time import unittest -from unittest.mock import Mock, patch +from unittest.mock import Mock +from unittest.mock import patch from tenacity import retry from tenacity import stop_after_attempt @@ -185,18 +186,22 @@ def test_process_timers_metric_is_recorded(self): expected_value * (1.0 + margin_of_error), "The timer metric was higher than expected.") - @patch('apache_beam.runners.common.DoFnRunner') - def test_do_operation_process_timer_metric(self, mock_dofn_runner): - """ - Tests that the 'process-timers-msecs' metric is correctly recorded - when a timer is processed within a DoOperation. + @patch('apache_beam.runners.common.DoFnRunner.process_user_timer') + def test_do_operation_process_timer_metric(self, mock_process_user_timer): """ + Tests that the 'process-timers-msecs' metric is correctly recorded + when a timer is processed within a DoOperation. + """ counter_factory = CounterFactory() sampler = statesampler.StateSampler( 'test_stage', counter_factory, sampling_period_ms=1) + # Create a mock for the windowing function + mock_window_fn = Mock() + mock_spec = operation_specs.WorkerDoFn( - serialized_fn=pickler.dumps((core.DoFn(), None, None, None, None)), + serialized_fn=pickler.dumps( + (core.DoFn(), None, None, [], mock_window_fn)), output_tags=[], input=None, side_inputs=[], @@ -206,17 +211,19 @@ def test_do_operation_process_timer_metric(self, mock_dofn_runner): name=common.NameContext('test_op'), spec=mock_spec, counter_factory=counter_factory, - sampler=sampler) + sampler=sampler, + user_state_context=Mock()) - op.dofn_runner = Mock() + op.setup() op.timer_specs = {'timer_id': Mock()} + state_duration_ms = 200 margin_of_error = 0.75 - def mock_process_user_timer(*args, **kwargs): + def sleep_and_return(*args, **kwargs): time.sleep(state_duration_ms / 1000.0) - op.dofn_runner.process_user_timer = mock_process_user_timer + mock_process_user_timer.side_effect = sleep_and_return mock_timer_data = Mock() mock_timer_data.windows = [Mock()] @@ -257,18 +264,23 @@ def mock_process_user_timer(*args, **kwargs): expected_value * (1.0 + margin_of_error), "The timer metric was higher than expected.") - @patch('apache_beam.runners.common.DoFnRunner') - def test_do_operation_process_timer_metric_with_exception(self, mock_dofn_runner): - """ - Tests that the 'process-timers-msecs' metric is still recorded - when a timer callback in a DoOperation raises an exception. + @patch('apache_beam.runners.common.DoFnRunner.process_user_timer') + def test_do_operation_process_timer_metric_with_exception( + self, mock_process_user_timer): """ + Tests that the 'process-timers-msecs' metric is still recorded + when a timer callback in a DoOperation raises an exception. + """ counter_factory = CounterFactory() sampler = statesampler.StateSampler( 'test_stage', counter_factory, sampling_period_ms=1) + # Create a mock for the windowing function + mock_window_fn = Mock() + mock_spec = operation_specs.WorkerDoFn( - serialized_fn=pickler.dumps((core.DoFn(), None, None, None, None)), + serialized_fn=pickler.dumps( + (core.DoFn(), None, None, [], mock_window_fn)), output_tags=[], input=None, side_inputs=[], @@ -278,18 +290,20 @@ def test_do_operation_process_timer_metric_with_exception(self, mock_dofn_runner name=common.NameContext('test_op'), spec=mock_spec, counter_factory=counter_factory, - sampler=sampler) + sampler=sampler, + user_state_context=Mock()) - op.dofn_runner = Mock() + op.setup() op.timer_specs = {'timer_id': Mock()} + state_duration_ms = 200 margin_of_error = 0.75 - def mock_process_user_timer(*args, **kwargs): + def sleep_and_raise(*args, **kwargs): time.sleep(state_duration_ms / 1000.0) raise ValueError("Test Exception") - op.dofn_runner.process_user_timer = mock_process_user_timer + mock_process_user_timer.side_effect = sleep_and_raise mock_timer_data = Mock() mock_timer_data.windows = [Mock()] From 02cf48cbe5747b35c84555fd946aee766e07b94b Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Sun, 16 Nov 2025 02:29:01 +0000 Subject: [PATCH 16/20] Test case failing presubmits, attempting to fix --- .../runners/worker/statesampler_test.py | 72 ++++++++++--------- 1 file changed, 38 insertions(+), 34 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 304897dc8ac2..23fe496b7998 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -186,22 +186,25 @@ def test_process_timers_metric_is_recorded(self): expected_value * (1.0 + margin_of_error), "The timer metric was higher than expected.") - @patch('apache_beam.runners.common.DoFnRunner.process_user_timer') - def test_do_operation_process_timer_metric(self, mock_process_user_timer): + @patch('apache_beam.runners.common.DoFnRunner') + def test_do_operation_process_timer_metric(self, mock_dofn_runner_class): """ - Tests that the 'process-timers-msecs' metric is correctly recorded - when a timer is processed within a DoOperation. - """ + Tests that the 'process-timers-msecs' metric is correctly recorded + when a timer is processed within a DoOperation. + """ + mock_dofn_runner_instance = mock_dofn_runner_class.return_value + state_duration_ms = 200 + + def mock_process_user_timer(*args, **kwargs): + time.sleep(state_duration_ms / 1000.0) + + mock_dofn_runner_instance.process_user_timer = mock_process_user_timer counter_factory = CounterFactory() sampler = statesampler.StateSampler( 'test_stage', counter_factory, sampling_period_ms=1) - # Create a mock for the windowing function - mock_window_fn = Mock() - mock_spec = operation_specs.WorkerDoFn( - serialized_fn=pickler.dumps( - (core.DoFn(), None, None, [], mock_window_fn)), + serialized_fn=pickler.dumps((core.DoFn(), None, None, None, None)), output_tags=[], input=None, side_inputs=[], @@ -211,19 +214,17 @@ def test_do_operation_process_timer_metric(self, mock_process_user_timer): name=common.NameContext('test_op'), spec=mock_spec, counter_factory=counter_factory, - sampler=sampler, - user_state_context=Mock()) + sampler=sampler) op.setup() + op.dofn_runner = Mock() op.timer_specs = {'timer_id': Mock()} - - state_duration_ms = 200 margin_of_error = 0.75 - def sleep_and_return(*args, **kwargs): + def mock_process_timer_usage(*args, **kwargs): time.sleep(state_duration_ms / 1000.0) - mock_process_user_timer.side_effect = sleep_and_return + op.dofn_runner.process_user_timer = mock_process_timer_usage mock_timer_data = Mock() mock_timer_data.windows = [Mock()] @@ -264,23 +265,29 @@ def sleep_and_return(*args, **kwargs): expected_value * (1.0 + margin_of_error), "The timer metric was higher than expected.") - @patch('apache_beam.runners.common.DoFnRunner.process_user_timer') + @patch('apache_beam.runners.common.DoFnRunner') def test_do_operation_process_timer_metric_with_exception( - self, mock_process_user_timer): + self, mock_dofn_runner_class): + """ + Tests that the 'process-timers-msecs' metric is still recorded + when a timer callback in a DoOperation raises an exception. """ - Tests that the 'process-timers-msecs' metric is still recorded - when a timer callback in a DoOperation raises an exception. - """ + # Configure the mock instance to raise an exception + mock_dofn_runner_instance = mock_dofn_runner_class.return_value + state_duration_ms = 200 + + def mock_process_user_timer(*args, **kwargs): + time.sleep(state_duration_ms / 1000.0) + raise ValueError("Test Exception") + + mock_dofn_runner_instance.process_user_timer = mock_process_user_timer + counter_factory = CounterFactory() sampler = statesampler.StateSampler( 'test_stage', counter_factory, sampling_period_ms=1) - # Create a mock for the windowing function - mock_window_fn = Mock() - mock_spec = operation_specs.WorkerDoFn( - serialized_fn=pickler.dumps( - (core.DoFn(), None, None, [], mock_window_fn)), + serialized_fn=pickler.dumps((core.DoFn(), None, None, None, None)), output_tags=[], input=None, side_inputs=[], @@ -291,20 +298,15 @@ def test_do_operation_process_timer_metric_with_exception( spec=mock_spec, counter_factory=counter_factory, sampler=sampler, + # Provide a mock context to satisfy the setup method user_state_context=Mock()) + # The setup call now succeeds because DoFnRunner is mocked op.setup() - op.timer_specs = {'timer_id': Mock()} - state_duration_ms = 200 + op.timer_specs = {'timer_id': Mock()} margin_of_error = 0.75 - def sleep_and_raise(*args, **kwargs): - time.sleep(state_duration_ms / 1000.0) - raise ValueError("Test Exception") - - mock_process_user_timer.side_effect = sleep_and_raise - mock_timer_data = Mock() mock_timer_data.windows = [Mock()] mock_timer_data.user_key = Mock() @@ -313,6 +315,7 @@ def sleep_and_raise(*args, **kwargs): mock_timer_data.dynamic_timer_tag = Mock() sampler.start() + # The test correctly asserts that a ValueError is raised with self.assertRaises(ValueError): op.process_timer('timer_id', mock_timer_data) sampler.stop() @@ -334,6 +337,7 @@ def sleep_and_raise(*args, **kwargs): found_counter, f"The expected counter '{expected_counter_name}' was not created.") + # Assert that the timer metric was still recorded despite the exception actual_value = found_counter.value() expected_value = state_duration_ms self.assertGreater( From 2def297fe0a68d810b6a40f290ab1bd396a861c2 Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Mon, 17 Nov 2025 02:17:20 +0000 Subject: [PATCH 17/20] Fix mocking for tests and ensure all pass --- .../apache_beam/runners/worker/operations.pxd | 2 +- .../runners/worker/statesampler_test.py | 269 +++++++++--------- 2 files changed, 142 insertions(+), 129 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd index 3179385d740f..52211e4d8ce8 100644 --- a/sdks/python/apache_beam/runners/worker/operations.pxd +++ b/sdks/python/apache_beam/runners/worker/operations.pxd @@ -117,7 +117,7 @@ cdef class DoOperation(Operation): cdef dict timer_specs cdef public object input_info cdef object fn - cdef readonly object scoped_timer_processing_state + cdef object scoped_timer_processing_state cdef class SdfProcessSizedElements(DoOperation): diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 23fe496b7998..af1cf349447f 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -21,6 +21,7 @@ import logging import time import unittest +from unittest import mock from unittest.mock import Mock from unittest.mock import patch @@ -33,12 +34,43 @@ from apache_beam.runners.worker import operations from apache_beam.runners.worker import statesampler from apache_beam.transforms import core +from apache_beam.transforms import userstate +from apache_beam.transforms.core import GlobalWindows +from apache_beam.transforms.core import Windowing +from apache_beam.transforms.window import GlobalWindow from apache_beam.utils.counters import CounterFactory from apache_beam.utils.counters import CounterName +from apache_beam.utils.windowed_value import PaneInfo _LOGGER = logging.getLogger(__name__) +class TimerDoFn(core.DoFn): + TIMER_SPEC = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK) + + def __init__(self, sleep_duration_s=0): + self._sleep_duration_s = sleep_duration_s + + @userstate.on_timer(TIMER_SPEC) + def on_timer_f(self): + if self._sleep_duration_s: + time.sleep(self._sleep_duration_s) + + +class ExceptionTimerDoFn(core.DoFn): + """A DoFn that raises an exception when its timer fires.""" + TIMER_SPEC = userstate.TimerSpec('ts-timer', userstate.TimeDomain.WATERMARK) + + def __init__(self, sleep_duration_s=0): + self._sleep_duration_s = sleep_duration_s + + @userstate.on_timer(TIMER_SPEC) + def on_timer_f(self): + if self._sleep_duration_s: + time.sleep(self._sleep_duration_s) + raise RuntimeError("Test exception from timer") + + class StateSamplerTest(unittest.TestCase): # Due to somewhat non-deterministic nature of state sampling and sleep, @@ -135,23 +167,20 @@ def test_sampler_transition_overhead(self): self.assertLess(overhead_us, 20.0) @retry(reraise=True, stop=stop_after_attempt(3)) - def test_process_timers_metric_is_recorded(self): - """ - Tests that the 'process-timers-msecs' metric is correctly recorded - when a state sampler is active. - """ - # Set up a real state sampler and counter factory. + def test_timer_sampler_operation(self): + state_duration_ms = 200 + margin_of_error = 75 + counter_factory = CounterFactory() sampler = statesampler.StateSampler( 'test_stage', counter_factory, sampling_period_ms=1) - # Keeps range between 50-350 ms, which is fair. - state_duration_ms = 200 - margin_of_error = 0.75 + name_context = common.NameContext('test_op') + scoped_timer_state = sampler.scoped_state( + name_context, 'process-timers', metrics_container=None) - # Run a workload inside the 'process-timers' scoped state. sampler.start() - with sampler.scoped_state('test_step', 'process-timers'): + with scoped_timer_state: time.sleep(state_duration_ms / 1000.0) sampler.stop() sampler.commit_counters() @@ -159,195 +188,179 @@ def test_process_timers_metric_is_recorded(self): if not statesampler.FAST_SAMPLER: return - # Verify that the counter was created with the correct name and value. - expected_counter_name = CounterName( - 'process-timers-msecs', step_name='test_step', stage_name='test_stage') + expected_name = CounterName( + 'process-timers-msecs', step_name='test_op', stage_name='test_stage') - # Find the specific counter we are looking for. found_counter = None for counter in counter_factory.get_counters(): - if counter.name == expected_counter_name: + if counter.name == expected_name: found_counter = counter break self.assertIsNotNone( - found_counter, - f"The expected counter '{expected_counter_name}' was not created.") + found_counter, f"Expected counter '{expected_name}' to be created.") - # Check that its value is approximately correct. - actual_value = found_counter.value() - expected_value = state_duration_ms + value = found_counter.value() self.assertGreater( - actual_value, - expected_value * (1.0 - margin_of_error), - "The timer metric was lower than expected.") + value, + state_duration_ms * (1.0 - margin_of_error), + f"Timer metric was too low: {value} ms.") self.assertLess( - actual_value, - expected_value * (1.0 + margin_of_error), - "The timer metric was higher than expected.") + value, + state_duration_ms * (1.0 + margin_of_error), + f"Timer metric was too high: {value} ms.") - @patch('apache_beam.runners.common.DoFnRunner') - def test_do_operation_process_timer_metric(self, mock_dofn_runner_class): - """ - Tests that the 'process-timers-msecs' metric is correctly recorded - when a timer is processed within a DoOperation. - """ - mock_dofn_runner_instance = mock_dofn_runner_class.return_value - state_duration_ms = 200 + @retry(reraise=True, stop=stop_after_attempt(3)) + # Patch the problematic function to return the correct timer spec + @patch('apache_beam.transforms.userstate.get_dofn_specs') + def test_do_operation_process_timer(self, mock_get_dofn_specs): + fn = TimerDoFn() + # get_dofn_specs returns a tuple of (state_specs, timer_specs) + mock_get_dofn_specs.return_value = ([], [fn.TIMER_SPEC]) - def mock_process_user_timer(*args, **kwargs): - time.sleep(state_duration_ms / 1000.0) + if not statesampler.FAST_SAMPLER: + self.skipTest('DoOperation test requires FAST_SAMPLER') + + state_duration_ms = 200 + margin_of_error = 0.50 - mock_dofn_runner_instance.process_user_timer = mock_process_user_timer counter_factory = CounterFactory() sampler = statesampler.StateSampler( - 'test_stage', counter_factory, sampling_period_ms=1) + 'test_do_op', counter_factory, sampling_period_ms=1) - mock_spec = operation_specs.WorkerDoFn( - serialized_fn=pickler.dumps((core.DoFn(), None, None, None, None)), + fn_for_spec = TimerDoFn(sleep_duration_s=state_duration_ms / 1000.0) + + spec = operation_specs.WorkerDoFn( + serialized_fn=pickler.dumps( + (fn_for_spec, [], {}, [], Windowing(GlobalWindows()))), output_tags=[], input=None, side_inputs=[], output_coders=[]) + mock_user_state_context = mock.MagicMock() op = operations.DoOperation( - name=common.NameContext('test_op'), - spec=mock_spec, - counter_factory=counter_factory, - sampler=sampler) + common.NameContext('step1'), + spec, + counter_factory, + sampler, + user_state_context=mock_user_state_context) op.setup() - op.dofn_runner = Mock() - op.timer_specs = {'timer_id': Mock()} - margin_of_error = 0.75 - - def mock_process_timer_usage(*args, **kwargs): - time.sleep(state_duration_ms / 1000.0) - op.dofn_runner.process_user_timer = mock_process_timer_usage - - mock_timer_data = Mock() - mock_timer_data.windows = [Mock()] - mock_timer_data.user_key = Mock() - mock_timer_data.fire_timestamp = Mock() - mock_timer_data.paneinfo = Mock() - mock_timer_data.dynamic_timer_tag = Mock() + timer_data = Mock() + timer_data.user_key = None + timer_data.windows = [GlobalWindow()] + timer_data.fire_timestamp = 0 + timer_data.paneinfo = PaneInfo( + is_first=False, + is_last=False, + timing=0, + index=0, + nonspeculative_index=0) + timer_data.dynamic_timer_tag = '' sampler.start() - op.process_timer('timer_id', mock_timer_data) + op.process_timer('ts-timer', timer_data=timer_data) sampler.stop() sampler.commit_counters() - if not statesampler.FAST_SAMPLER: - return - - expected_counter_name = CounterName( - 'process-timers-msecs', step_name='test_op', stage_name='test_stage') + expected_name = CounterName( + 'process-timers-msecs', step_name='step1', stage_name='test_do_op') found_counter = None for counter in counter_factory.get_counters(): - if counter.name == expected_counter_name: + if counter.name == expected_name: found_counter = counter break self.assertIsNotNone( - found_counter, - f"The expected counter '{expected_counter_name}' was not created.") + found_counter, f"Expected counter '{expected_name}' to be created.") actual_value = found_counter.value() - expected_value = state_duration_ms self.assertGreater( - actual_value, - expected_value * (1.0 - margin_of_error), - "The timer metric was lower than expected.") - self.assertLess( - actual_value, - expected_value * (1.0 + margin_of_error), - "The timer metric was higher than expected.") + actual_value, state_duration_ms * (1.0 - margin_of_error)) + self.assertLess(actual_value, state_duration_ms * (1.0 + margin_of_error)) - @patch('apache_beam.runners.common.DoFnRunner') - def test_do_operation_process_timer_metric_with_exception( - self, mock_dofn_runner_class): + @retry(reraise=True, stop=stop_after_attempt(3)) + @patch('apache_beam.runners.worker.operations.userstate.get_dofn_specs') + def test_do_operation_process_timer_with_exception(self, mock_get_dofn_specs): """ - Tests that the 'process-timers-msecs' metric is still recorded - when a timer callback in a DoOperation raises an exception. + Tests that an exception from a timer is propagated and that the + sampler still records the time spent until the exception. """ - # Configure the mock instance to raise an exception - mock_dofn_runner_instance = mock_dofn_runner_class.return_value - state_duration_ms = 200 + fn = ExceptionTimerDoFn() + mock_get_dofn_specs.return_value = ([], [fn.TIMER_SPEC]) - def mock_process_user_timer(*args, **kwargs): - time.sleep(state_duration_ms / 1000.0) - raise ValueError("Test Exception") + if not statesampler.FAST_SAMPLER: + self.skipTest('DoOperation test requires FAST_SAMPLER') - mock_dofn_runner_instance.process_user_timer = mock_process_user_timer + state_duration_ms = 200 + margin_of_error = 0.50 counter_factory = CounterFactory() sampler = statesampler.StateSampler( - 'test_stage', counter_factory, sampling_period_ms=1) + 'test_do_op_exception', counter_factory, sampling_period_ms=1) - mock_spec = operation_specs.WorkerDoFn( - serialized_fn=pickler.dumps((core.DoFn(), None, None, None, None)), + fn_for_spec = ExceptionTimerDoFn( + sleep_duration_s=state_duration_ms / 1000.0) + + spec = operation_specs.WorkerDoFn( + serialized_fn=pickler.dumps( + (fn_for_spec, [], {}, [], Windowing(GlobalWindows()))), output_tags=[], input=None, side_inputs=[], output_coders=[]) + mock_user_state_context = mock.MagicMock() op = operations.DoOperation( - name=common.NameContext('test_op'), - spec=mock_spec, - counter_factory=counter_factory, - sampler=sampler, - # Provide a mock context to satisfy the setup method - user_state_context=Mock()) - - # The setup call now succeeds because DoFnRunner is mocked - op.setup() + common.NameContext('step1'), + spec, + counter_factory, + sampler, + user_state_context=mock_user_state_context) - op.timer_specs = {'timer_id': Mock()} - margin_of_error = 0.75 + op.setup() - mock_timer_data = Mock() - mock_timer_data.windows = [Mock()] - mock_timer_data.user_key = Mock() - mock_timer_data.fire_timestamp = Mock() - mock_timer_data.paneinfo = Mock() - mock_timer_data.dynamic_timer_tag = Mock() + timer_data = Mock() + timer_data.user_key = None + timer_data.windows = [GlobalWindow()] + timer_data.fire_timestamp = 0 + timer_data.paneinfo = PaneInfo( + is_first=False, + is_last=False, + timing=0, + index=0, + nonspeculative_index=0) + timer_data.dynamic_timer_tag = '' sampler.start() - # The test correctly asserts that a ValueError is raised - with self.assertRaises(ValueError): - op.process_timer('timer_id', mock_timer_data) + # Assert that the expected exception is raised + with self.assertRaises(RuntimeError): + op.process_timer('ts-ts-timer', timer_data=timer_data) sampler.stop() sampler.commit_counters() - if not statesampler.FAST_SAMPLER: - return - - expected_counter_name = CounterName( - 'process-timers-msecs', step_name='test_op', stage_name='test_stage') + expected_name = CounterName( + 'process-timers-msecs', + step_name='step1', + stage_name='test_do_op_exception') found_counter = None for counter in counter_factory.get_counters(): - if counter.name == expected_counter_name: + if counter.name == expected_name: found_counter = counter break self.assertIsNotNone( - found_counter, - f"The expected counter '{expected_counter_name}' was not created.") + found_counter, f"Expected counter '{expected_name}' to be created.") - # Assert that the timer metric was still recorded despite the exception actual_value = found_counter.value() - expected_value = state_duration_ms self.assertGreater( - actual_value, - expected_value * (1.0 - margin_of_error), - "The timer metric was lower than expected.") - self.assertLess( - actual_value, - expected_value * (1.0 + margin_of_error), - "The timer metric was higher than expected.") + actual_value, state_duration_ms * (1.0 - margin_of_error)) + self.assertLess(actual_value, state_duration_ms * (1.0 + margin_of_error)) + _LOGGER.info("Exception test finished successfully.") if __name__ == '__main__': From 21deb924f38ee7c9fb9b6bcf3d8d24a36f9ef013 Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Tue, 25 Nov 2025 18:15:18 +0000 Subject: [PATCH 18/20] Remove extra test and increase retries on the process timer tests to avoid flakiness --- .../apache_beam/runners/worker/statesampler_test.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index af1cf349447f..03891fa4780c 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -210,19 +210,18 @@ def test_timer_sampler_operation(self): state_duration_ms * (1.0 + margin_of_error), f"Timer metric was too high: {value} ms.") - @retry(reraise=True, stop=stop_after_attempt(3)) + @retry(reraise=True, stop=stop_after_attempt(30)) # Patch the problematic function to return the correct timer spec @patch('apache_beam.transforms.userstate.get_dofn_specs') def test_do_operation_process_timer(self, mock_get_dofn_specs): fn = TimerDoFn() - # get_dofn_specs returns a tuple of (state_specs, timer_specs) mock_get_dofn_specs.return_value = ([], [fn.TIMER_SPEC]) if not statesampler.FAST_SAMPLER: self.skipTest('DoOperation test requires FAST_SAMPLER') state_duration_ms = 200 - margin_of_error = 0.50 + margin_of_error = 0.75 counter_factory = CounterFactory() sampler = statesampler.StateSampler( @@ -278,17 +277,14 @@ def test_do_operation_process_timer(self, mock_get_dofn_specs): found_counter, f"Expected counter '{expected_name}' to be created.") actual_value = found_counter.value() + logging.info("Actual value %d", actual_value) self.assertGreater( actual_value, state_duration_ms * (1.0 - margin_of_error)) self.assertLess(actual_value, state_duration_ms * (1.0 + margin_of_error)) - @retry(reraise=True, stop=stop_after_attempt(3)) + @retry(reraise=True, stop=stop_after_attempt(30)) @patch('apache_beam.runners.worker.operations.userstate.get_dofn_specs') def test_do_operation_process_timer_with_exception(self, mock_get_dofn_specs): - """ - Tests that an exception from a timer is propagated and that the - sampler still records the time spent until the exception. - """ fn = ExceptionTimerDoFn() mock_get_dofn_specs.return_value = ([], [fn.TIMER_SPEC]) From 94cb59ad9927dff155225567a8bcee04821750cf Mon Sep 17 00:00:00 2001 From: Karthik Talluri Date: Tue, 25 Nov 2025 21:38:12 +0000 Subject: [PATCH 19/20] Remove upper bound restriction and reduce retries --- .../runners/worker/statesampler_test.py | 48 +------------------ 1 file changed, 1 insertion(+), 47 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 03891fa4780c..0d0ce1d2c8dc 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -167,50 +167,6 @@ def test_sampler_transition_overhead(self): self.assertLess(overhead_us, 20.0) @retry(reraise=True, stop=stop_after_attempt(3)) - def test_timer_sampler_operation(self): - state_duration_ms = 200 - margin_of_error = 75 - - counter_factory = CounterFactory() - sampler = statesampler.StateSampler( - 'test_stage', counter_factory, sampling_period_ms=1) - - name_context = common.NameContext('test_op') - scoped_timer_state = sampler.scoped_state( - name_context, 'process-timers', metrics_container=None) - - sampler.start() - with scoped_timer_state: - time.sleep(state_duration_ms / 1000.0) - sampler.stop() - sampler.commit_counters() - - if not statesampler.FAST_SAMPLER: - return - - expected_name = CounterName( - 'process-timers-msecs', step_name='test_op', stage_name='test_stage') - - found_counter = None - for counter in counter_factory.get_counters(): - if counter.name == expected_name: - found_counter = counter - break - - self.assertIsNotNone( - found_counter, f"Expected counter '{expected_name}' to be created.") - - value = found_counter.value() - self.assertGreater( - value, - state_duration_ms * (1.0 - margin_of_error), - f"Timer metric was too low: {value} ms.") - self.assertLess( - value, - state_duration_ms * (1.0 + margin_of_error), - f"Timer metric was too high: {value} ms.") - - @retry(reraise=True, stop=stop_after_attempt(30)) # Patch the problematic function to return the correct timer spec @patch('apache_beam.transforms.userstate.get_dofn_specs') def test_do_operation_process_timer(self, mock_get_dofn_specs): @@ -280,9 +236,8 @@ def test_do_operation_process_timer(self, mock_get_dofn_specs): logging.info("Actual value %d", actual_value) self.assertGreater( actual_value, state_duration_ms * (1.0 - margin_of_error)) - self.assertLess(actual_value, state_duration_ms * (1.0 + margin_of_error)) - @retry(reraise=True, stop=stop_after_attempt(30)) + @retry(reraise=True, stop=stop_after_attempt(3)) @patch('apache_beam.runners.worker.operations.userstate.get_dofn_specs') def test_do_operation_process_timer_with_exception(self, mock_get_dofn_specs): fn = ExceptionTimerDoFn() @@ -355,7 +310,6 @@ def test_do_operation_process_timer_with_exception(self, mock_get_dofn_specs): actual_value = found_counter.value() self.assertGreater( actual_value, state_duration_ms * (1.0 - margin_of_error)) - self.assertLess(actual_value, state_duration_ms * (1.0 + margin_of_error)) _LOGGER.info("Exception test finished successfully.") From ec7fb2bf1c5b67521faa1680bb3322945dc87fce Mon Sep 17 00:00:00 2001 From: tvalentyn Date: Tue, 25 Nov 2025 14:10:33 -0800 Subject: [PATCH 20/20] Remove unused suffix param. --- sdks/python/apache_beam/runners/worker/statesampler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py b/sdks/python/apache_beam/runners/worker/statesampler.py index 4cda3a4daa1c..b9c75f4de93d 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler.py +++ b/sdks/python/apache_beam/runners/worker/statesampler.py @@ -134,8 +134,8 @@ def scoped_state( name_context: Union[str, 'common.NameContext'], state_name: str, io_target=None, - metrics_container: Optional['MetricsContainer'] = None, - suffix: str = '-msecs') -> statesampler_impl.ScopedState: + metrics_container: Optional['MetricsContainer'] = None + ) -> statesampler_impl.ScopedState: """Returns a ScopedState object associated to a Step and a State. Args: @@ -152,7 +152,7 @@ def scoped_state( name_context = common.NameContext(name_context) counter_name = CounterName( - state_name + suffix, + state_name + '-msecs', stage_name=self._prefix, step_name=name_context.metrics_name(), io_target=io_target)