From 057f297b5f8004c97d070238cf31ebcc8c0c37c9 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 12 Dec 2025 16:53:27 -0500 Subject: [PATCH 1/2] Pass options in DaskOptions inheritance hierarchy only for Dask runner --- .../apache_beam/options/pipeline_options.py | 23 +++++++++++----- .../options/pipeline_options_test.py | 27 +++++++++++++++++++ .../apache_beam/runners/dask/dask_runner.py | 2 +- 3 files changed, 44 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 38b36c3a2c45..090846b0a921 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -486,11 +486,12 @@ def get_all_options( drop_default=False, add_extra_args_fn: Optional[Callable[[_BeamArgumentParser], None]] = None, retain_unknown_options=False, - display_warnings=False) -> Dict[str, Any]: + display_warnings=False, + hierarchy_only=False, + ) -> Dict[str, Any]: """Returns a dictionary of all defined arguments. - Returns a dictionary of all defined arguments (arguments that are defined in - any subclass of PipelineOptions) into a dictionary. + Returns a dictionary of all defined arguments into a dictionary. Args: drop_default: If set to true, options that are equal to their default @@ -500,6 +501,9 @@ def get_all_options( retain_unknown_options: If set to true, options not recognized by any known pipeline options class will still be included in the result. If set to false, they will be discarded. + hierarchy_only: If set to true, only returns options defined in this class + and its super classes only. Otherwise, arguments that are defined in + any subclass of PipelineOptions are returned (default). Returns: Dictionary of all args and values. @@ -510,8 +514,13 @@ def get_all_options( # instance of each subclass to avoid conflicts. subset = {} parser = _BeamArgumentParser(allow_abbrev=False) - for cls in PipelineOptions.__subclasses__(): - subset.setdefault(str(cls), cls) + if not hierarchy_only: + for cls in PipelineOptions.__subclasses__(): + subset.setdefault(str(cls), cls) + else: + for cls in self.__class__.__mro__: + if issubclass(cls, PipelineOptions): + subset.setdefault(str(cls), cls) for cls in subset.values(): cls._add_argparse_args(parser) # pylint: disable=protected-access if add_extra_args_fn: @@ -562,7 +571,7 @@ def add_new_arg(arg, **kwargs): continue parsed_args, _ = parser.parse_known_args(self._flags) else: - if unknown_args: + if unknown_args and not hierarchy_only: _LOGGER.warning("Discarding unparseable args: %s", unknown_args) parsed_args = known_args result = vars(parsed_args) @@ -580,7 +589,7 @@ def add_new_arg(arg, **kwargs): if overrides: if retain_unknown_options: result.update(overrides) - else: + elif not hierarchy_only: _LOGGER.warning("Discarding invalid overrides: %s", overrides) return result diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 705e8e1e2c04..791587a1ce13 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -204,6 +204,11 @@ def _add_argparse_args(cls, parser): parser.add_argument( '--fake_multi_option', action='append', help='fake multi option') + class FakeSubclassOptions(FakeOptions): + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument('--fake_sub_option', help='fake option') + @parameterized.expand(TEST_CASES) def test_display_data(self, flags, _, display_data): options = PipelineOptions(flags=flags) @@ -238,6 +243,28 @@ def test_get_all_options(self, flags, expected, _): options.view_as(PipelineOptionsTest.MockOptions).mock_multi_option, expected['mock_multi_option']) + def test_get_superclass_options(self): + flags = [ + "--mock_option", + "mock", + "--fake_option", + "fake", + "--fake_sub_option", + "fake_sub" + ] + options = PipelineOptions(flags=flags).view_as( + PipelineOptionsTest.FakeSubclassOptions) + items = options.get_all_options(hierarchy_only=True).items() + print(items) + self.assertTrue(('fake_option', 'fake') in items) + self.assertTrue(('fake_sub_option', 'fake_sub') in items) + self.assertFalse(('mock_option', 'mock') in items) + items = options.view_as(PipelineOptionsTest.MockOptions).get_all_options( + hierarchy_only=True).items() + self.assertFalse(('fake_option', 'fake') in items) + self.assertFalse(('fake_sub_option', 'fake_sub') in items) + self.assertTrue(('mock_option', 'mock') in items) + @parameterized.expand(TEST_CASES) def test_subclasses_of_pipeline_options_can_be_instantiated( self, flags, expected, _): diff --git a/sdks/python/apache_beam/runners/dask/dask_runner.py b/sdks/python/apache_beam/runners/dask/dask_runner.py index 8975fcf1e138..03bc387c23e0 100644 --- a/sdks/python/apache_beam/runners/dask/dask_runner.py +++ b/sdks/python/apache_beam/runners/dask/dask_runner.py @@ -236,7 +236,7 @@ def run_pipeline(self, pipeline, options): 'DaskRunner is not available. Please install apache_beam[dask].') dask_options = options.view_as(DaskOptions).get_all_options( - drop_default=True) + drop_default=True, hierarchy_only=True) bag_kwargs = DaskOptions._extract_bag_kwargs(dask_options) client = ddist.Client(**dask_options) From 3b0ff19b08a305542ab044739cfd8ea5c3956518 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 12 Dec 2025 23:26:51 -0500 Subject: [PATCH 2/2] address comments --- .../apache_beam/options/pipeline_options.py | 20 ++++++++--------- .../options/pipeline_options_test.py | 22 ++++--------------- .../apache_beam/runners/dask/dask_runner.py | 2 +- 3 files changed, 14 insertions(+), 30 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 090846b0a921..0e1012b2de65 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -487,7 +487,7 @@ def get_all_options( add_extra_args_fn: Optional[Callable[[_BeamArgumentParser], None]] = None, retain_unknown_options=False, display_warnings=False, - hierarchy_only=False, + current_only=False, ) -> Dict[str, Any]: """Returns a dictionary of all defined arguments. @@ -501,9 +501,9 @@ def get_all_options( retain_unknown_options: If set to true, options not recognized by any known pipeline options class will still be included in the result. If set to false, they will be discarded. - hierarchy_only: If set to true, only returns options defined in this class - and its super classes only. Otherwise, arguments that are defined in - any subclass of PipelineOptions are returned (default). + current_only: If set to true, only returns options defined in this class. + Otherwise, arguments that are defined in any subclass of PipelineOptions + are returned (default). Returns: Dictionary of all args and values. @@ -514,13 +514,11 @@ def get_all_options( # instance of each subclass to avoid conflicts. subset = {} parser = _BeamArgumentParser(allow_abbrev=False) - if not hierarchy_only: + if current_only: + subset.setdefault(str(type(self)), type(self)) + else: for cls in PipelineOptions.__subclasses__(): subset.setdefault(str(cls), cls) - else: - for cls in self.__class__.__mro__: - if issubclass(cls, PipelineOptions): - subset.setdefault(str(cls), cls) for cls in subset.values(): cls._add_argparse_args(parser) # pylint: disable=protected-access if add_extra_args_fn: @@ -571,7 +569,7 @@ def add_new_arg(arg, **kwargs): continue parsed_args, _ = parser.parse_known_args(self._flags) else: - if unknown_args and not hierarchy_only: + if unknown_args and not current_only: _LOGGER.warning("Discarding unparseable args: %s", unknown_args) parsed_args = known_args result = vars(parsed_args) @@ -589,7 +587,7 @@ def add_new_arg(arg, **kwargs): if overrides: if retain_unknown_options: result.update(overrides) - elif not hierarchy_only: + elif not current_only: _LOGGER.warning("Discarding invalid overrides: %s", overrides) return result diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 791587a1ce13..c683c9625272 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -204,11 +204,6 @@ def _add_argparse_args(cls, parser): parser.add_argument( '--fake_multi_option', action='append', help='fake multi option') - class FakeSubclassOptions(FakeOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_argument('--fake_sub_option', help='fake option') - @parameterized.expand(TEST_CASES) def test_display_data(self, flags, _, display_data): options = PipelineOptions(flags=flags) @@ -244,25 +239,16 @@ def test_get_all_options(self, flags, expected, _): expected['mock_multi_option']) def test_get_superclass_options(self): - flags = [ - "--mock_option", - "mock", - "--fake_option", - "fake", - "--fake_sub_option", - "fake_sub" - ] + flags = ["--mock_option", "mock", "--fake_option", "fake"] options = PipelineOptions(flags=flags).view_as( - PipelineOptionsTest.FakeSubclassOptions) - items = options.get_all_options(hierarchy_only=True).items() + PipelineOptionsTest.FakeOptions) + items = options.get_all_options(current_only=True).items() print(items) self.assertTrue(('fake_option', 'fake') in items) - self.assertTrue(('fake_sub_option', 'fake_sub') in items) self.assertFalse(('mock_option', 'mock') in items) items = options.view_as(PipelineOptionsTest.MockOptions).get_all_options( - hierarchy_only=True).items() + current_only=True).items() self.assertFalse(('fake_option', 'fake') in items) - self.assertFalse(('fake_sub_option', 'fake_sub') in items) self.assertTrue(('mock_option', 'mock') in items) @parameterized.expand(TEST_CASES) diff --git a/sdks/python/apache_beam/runners/dask/dask_runner.py b/sdks/python/apache_beam/runners/dask/dask_runner.py index 03bc387c23e0..bc915d300857 100644 --- a/sdks/python/apache_beam/runners/dask/dask_runner.py +++ b/sdks/python/apache_beam/runners/dask/dask_runner.py @@ -236,7 +236,7 @@ def run_pipeline(self, pipeline, options): 'DaskRunner is not available. Please install apache_beam[dask].') dask_options = options.view_as(DaskOptions).get_all_options( - drop_default=True, hierarchy_only=True) + drop_default=True, current_only=True) bag_kwargs = DaskOptions._extract_bag_kwargs(dask_options) client = ddist.Client(**dask_options)