diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 170ade224c10..4f569305ba09 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -66,7 +66,6 @@ # from the flag name and whose default value is `None`. _FLAG_THAT_SETS_FALSE_VALUE = { 'use_public_ips': 'no_use_public_ips', - 'save_main_session': 'no_save_main_session' } # Set of options which should not be overriden when applying options from a # different language. This is relevant when using x-lang transforms where the @@ -1683,14 +1682,6 @@ def _add_argparse_args(cls, parser): 'Some workflows do not need the session state if for instance all ' 'their functions/classes are defined in proper modules ' '(not __main__) and the modules are importable in the worker. ')) - parser.add_argument( - '--no_save_main_session', - default=None, - action='store_false', - dest='save_main_session', - help=( - 'Disable saving the main session state. It is enabled/disabled by' - 'default for cloudpickle/dill pickler. See "save_main_session".')) parser.add_argument( '--sdk_location', diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 6ef06abb7436..d838ea1e7393 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -219,6 +219,11 @@ def __init__( SetupOptions).pickle_library = runner.default_pickle_library_override( ) pickler.set_library(self._options.view_as(SetupOptions).pickle_library) + if (pickler.is_currently_cloudpickle() and + self._options.view_as(SetupOptions).save_main_session is None and + runner.cloudpickle_save_main_session_by_default()): + self._options.view_as(SetupOptions).save_main_session = True + logging.info("Runner defaulting save_main_session to True") # Validate pipeline options errors = PipelineOptionsValidator(self._options, runner).validate() diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index d33c33f84fee..7cf40d7b6821 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -101,6 +101,10 @@ def __init__(self, cache=None): def default_pickle_library_override(self): return 'cloudpickle' + def cloudpickle_save_main_session_by_default(self): + """Override to 'true' to save_main_session by default for a given runner.""" + return True + def is_fnapi_compatible(self): return False diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 00ca84bb8e7d..190e28a2c925 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -229,6 +229,10 @@ def default_pickle_library_override(self): """Default pickle library, can be overridden by runner implementation.""" return None + def cloudpickle_save_main_session_by_default(self): + """Override to 'true' to save_main_session by default for a given runner.""" + return None + # FIXME: replace with PipelineState(str, enum.Enum) class PipelineState(object):