diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 170ade224c10..38b36c3a2c45 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1682,15 +1682,16 @@ def _add_argparse_args(cls, parser): 'defined in __main__ (e.g. interactive session) can be unpickled. ' 'Some workflows do not need the session state if for instance all ' 'their functions/classes are defined in proper modules ' - '(not __main__) and the modules are importable in the worker. ')) + '(not __main__) and the modules are importable in the worker. ' + 'It is disabled by default except for cloudpickle as pickle ' + 'library on Dataflow runner.')) parser.add_argument( '--no_save_main_session', default=None, action='store_false', dest='save_main_session', help=( - 'Disable saving the main session state. It is enabled/disabled by' - 'default for cloudpickle/dill pickler. See "save_main_session".')) + 'Disable saving the main session state. See "save_main_session".')) parser.add_argument( '--sdk_location', @@ -1795,13 +1796,19 @@ def _add_argparse_args(cls, parser): def _handle_load_main_session(self, validator): save_main_session = getattr(self, 'save_main_session') if save_main_session is None: - # save_main_session default to False for dill, while default to true - # for cloudpickle - pickle_library = getattr(self, 'pickle_library') - if pickle_library in ['default', 'cloudpickle']: - setattr(self, 'save_main_session', True) - else: + if not validator.is_service_runner(): setattr(self, 'save_main_session', False) + else: + # save_main_session default to False for dill, while default to true + # for cloudpickle on service runner + pickle_library = getattr(self, 'pickle_library') + if pickle_library == 'default': + from apache_beam.internal.pickler import DEFAULT_PICKLE_LIB + pickle_library = DEFAULT_PICKLE_LIB + if pickle_library == 'cloudpickle': + setattr(self, 'save_main_session', True) + else: + setattr(self, 'save_main_session', False) return [] def validate(self, validator):