From 48e432a15cd7d425de971b8e74b709577c5107da Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Mon, 8 Dec 2025 15:32:25 -0500 Subject: [PATCH 1/2] Use internal DEFAULT_PICKLE_LIB --- sdks/python/apache_beam/options/pipeline_options.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 170ade224c10..f6bc5e683274 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1798,7 +1798,10 @@ def _handle_load_main_session(self, validator): # save_main_session default to False for dill, while default to true # for cloudpickle pickle_library = getattr(self, 'pickle_library') - if pickle_library in ['default', 'cloudpickle']: + if pickle_library == 'default': + from apache_beam.internal.pickler import DEFAULT_PICKLE_LIB + pickle_library = DEFAULT_PICKLE_LIB + if pickle_library == 'cloudpickle': setattr(self, 'save_main_session', True) else: setattr(self, 'save_main_session', False) From f2ae7d35cf1a3eea30770ef5bd13d6359148cf3b Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Mon, 8 Dec 2025 22:01:30 -0500 Subject: [PATCH 2/2] Enable only on Dataflow --- .../apache_beam/options/pipeline_options.py | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index f6bc5e683274..38b36c3a2c45 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1682,15 +1682,16 @@ def _add_argparse_args(cls, parser): 'defined in __main__ (e.g. interactive session) can be unpickled. ' 'Some workflows do not need the session state if for instance all ' 'their functions/classes are defined in proper modules ' - '(not __main__) and the modules are importable in the worker. ')) + '(not __main__) and the modules are importable in the worker. ' + 'It is disabled by default except for cloudpickle as pickle ' + 'library on Dataflow runner.')) parser.add_argument( '--no_save_main_session', default=None, action='store_false', dest='save_main_session', help=( - 'Disable saving the main session state. It is enabled/disabled by' - 'default for cloudpickle/dill pickler. See "save_main_session".')) + 'Disable saving the main session state. See "save_main_session".')) parser.add_argument( '--sdk_location', @@ -1795,16 +1796,19 @@ def _add_argparse_args(cls, parser): def _handle_load_main_session(self, validator): save_main_session = getattr(self, 'save_main_session') if save_main_session is None: - # save_main_session default to False for dill, while default to true - # for cloudpickle - pickle_library = getattr(self, 'pickle_library') - if pickle_library == 'default': - from apache_beam.internal.pickler import DEFAULT_PICKLE_LIB - pickle_library = DEFAULT_PICKLE_LIB - if pickle_library == 'cloudpickle': - setattr(self, 'save_main_session', True) - else: + if not validator.is_service_runner(): setattr(self, 'save_main_session', False) + else: + # save_main_session default to False for dill, while default to true + # for cloudpickle on service runner + pickle_library = getattr(self, 'pickle_library') + if pickle_library == 'default': + from apache_beam.internal.pickler import DEFAULT_PICKLE_LIB + pickle_library = DEFAULT_PICKLE_LIB + if pickle_library == 'cloudpickle': + setattr(self, 'save_main_session', True) + else: + setattr(self, 'save_main_session', False) return [] def validate(self, validator):