Skip to content
1 change: 1 addition & 0 deletions .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"pr": "36271",
"modification": 35
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 14
"modification": 0
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 5
"modification": 0
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
## Bugfixes

* Fixed FirestoreV1 Beam connectors allow configuring inconsistent project/database IDs between RPC requests and routing headers #36895 (Java) ([#36895](https://github.com/apache/beam/issues/36895)).
Logical type and coder registry are saved for pipelines in the case of default pickler. This fixes a side effect of switching to cloudpickle as default pickler in Beam 2.65.0 (Python) ([#35738](https://github.com/apache/beam/issues/35738)).

## Known Issues

Expand Down
25 changes: 18 additions & 7 deletions sdks/python/apache_beam/coders/typecoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ def _register_coder_internal(
typehint_coder_class: Type[coders.Coder]) -> None:
self._coders[typehint_type] = typehint_coder_class

@staticmethod
def _normalize_typehint_type(typehint_type):
if typehint_type.__module__ == '__main__':
# See https://github.com/apache/beam/issues/21541
# TODO(robertwb): Remove once all runners are portable.
return getattr(typehint_type, '__name__', str(typehint_type))
return typehint_type

def register_coder(
self, typehint_type: Any,
typehint_coder_class: Type[coders.Coder]) -> None:
Expand All @@ -123,11 +131,8 @@ def register_coder(
'Received %r instead.' % typehint_coder_class)
if typehint_type not in self.custom_types:
self.custom_types.append(typehint_type)
if typehint_type.__module__ == '__main__':
# See https://github.com/apache/beam/issues/21541
# TODO(robertwb): Remove once all runners are portable.
typehint_type = getattr(typehint_type, '__name__', str(typehint_type))
self._register_coder_internal(typehint_type, typehint_coder_class)
self._register_coder_internal(
self._normalize_typehint_type(typehint_type), typehint_coder_class)

def get_coder(self, typehint: Any) -> coders.Coder:
if typehint and typehint.__module__ == '__main__':
Expand Down Expand Up @@ -170,9 +175,15 @@ def get_coder(self, typehint: Any) -> coders.Coder:
coder = self._fallback_coder
return coder.from_type_hint(typehint, self)

def get_custom_type_coder_tuples(self, types):
def get_custom_type_coder_tuples(self, types=None):
"""Returns type/coder tuples for all custom types passed in."""
return [(t, self._coders[t]) for t in types if t in self.custom_types]
return [(t, self._coders[self._normalize_typehint_type(t)])
for t in self.custom_types if (types is None or t in types)]

def load_custom_type_coder_tuples(self, type_coder):
"""Load type/coder tuples into coder registry."""
for t, c in type_coder:
self.register_coder(t, c)

def verify_deterministic(self, key_coder, op_name, silent=True):
if not key_coder.is_deterministic():
Expand Down
35 changes: 29 additions & 6 deletions sdks/python/apache_beam/internal/cloudpickle_pickler.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,35 @@ def _lock_reducer(obj):


def dump_session(file_path):
# It is possible to dump session with cloudpickle. However, since references
# are saved it should not be necessary. See https://s.apache.org/beam-picklers
pass
# Since References are saved (https://s.apache.org/beam-picklers), we only
# dump supported Beam Registries (currently only logical type registry)
from apache_beam.coders import typecoders
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain how coder registry is related here? Is it a separate issue or the same logical type issue?

I believe all custom coders are fully pickled in the pipeline, and there are currently no problems with custom coders?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a related issue. I made a minimum example pipeline based on https://github.com/apache/beam/blob/bb340c2f66ac8730334160d6ed5ecd18822d059d/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py

in particular, in L67 there it registered a coder

coders.registry.register_coder(JdbcTestRow, coders.RowCoder)

I checked with dill and save main session, this mapping is registered in coder registry on worker. But for cloudpickle, it is no longer registered on worker.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it matter if the coder is registered on the worker though? My understanding is coders.registry.register_coder(JdbcTestRow, coders.RowCoder) just maps any transforms that use JdbcTestRow to use RowCoder during pipeline construction.

Then the RowCoder is pickled as part of the pipeline proto, and the worker doesnt actually need the coder registry, it just uses the pickled coder.

Am I missing something? Can you reproduce some sort of error in the case where coder registry is not correct on worker?

Copy link
Contributor Author

@Abacn Abacn Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. It doesn't have error, meaning "worker doesnt actually need the coder registry" in this use case. The original error is due to logical type registry, adding coder registry here is for completeness conceptually as both are exposed to user.

from apache_beam.typehints import schemas

with _pickle_lock, open(file_path, 'wb') as file:
coder_reg = typecoders.registry.get_custom_type_coder_tuples()
logical_type_reg = schemas.LogicalType._known_logical_types.copy_custom()

pickler = cloudpickle.CloudPickler(file)
# TODO(https://github.com/apache/beam/issues/18500) add file system registry
# once implemented
pickler.dump({"coder": coder_reg, "logical_type": logical_type_reg})


def load_session(file_path):
# It is possible to load_session with cloudpickle. However, since references
# are saved it should not be necessary. See https://s.apache.org/beam-picklers
pass
from apache_beam.coders import typecoders
from apache_beam.typehints import schemas

with _pickle_lock, open(file_path, 'rb') as file:
registries = cloudpickle.load(file)
if type(registries) != dict:
raise ValueError(
"Faled loading session: expected dict, got {}", type(registries))
if "coder" in registries:
typecoders.registry.load_custom_type_coder_tuples(registries["coder"])
else:
_LOGGER.warning('No coder registry found in saved session')
if "logical_type" in registries:
schemas.LogicalType._known_logical_types.load(registries["logical_type"])
else:
_LOGGER.warning('No logical type registry found in saved session')
20 changes: 20 additions & 0 deletions sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# pytype: skip-file

import os
import tempfile
import threading
import types
import unittest
Expand All @@ -31,6 +32,7 @@
from apache_beam.internal import module_test
from apache_beam.internal.cloudpickle_pickler import dumps
from apache_beam.internal.cloudpickle_pickler import loads
from apache_beam.typehints.schemas import LogicalTypeRegistry
from apache_beam.utils import shared

GLOBAL_DICT_REF = module_test.GLOBAL_DICT
Expand Down Expand Up @@ -244,6 +246,24 @@ def sample_func():
unpickled_filename = os.path.abspath(unpickled_code.co_filename)
self.assertEqual(unpickled_filename, original_filename)

@mock.patch(
"apache_beam.coders.typecoders.registry.load_custom_type_coder_tuples")
@mock.patch(
"apache_beam.typehints.schemas.LogicalType._known_logical_types.load")
def test_dump_load_session(self, logicaltype_mock, coder_mock):
session_file = 'pickled'

with tempfile.TemporaryDirectory() as tmp_dirname:
pickled_session_file = os.path.join(tmp_dirname, session_file)
beam_cloudpickle.dump_session(pickled_session_file)
beam_cloudpickle.load_session(pickled_session_file)
load_logical_types = logicaltype_mock.call_args.args
load_coders = coder_mock.call_args.args
self.assertEqual(len(load_logical_types), 1)
self.assertEqual(len(load_coders), 1)
self.assertTrue(isinstance(load_logical_types[0], LogicalTypeRegistry))
self.assertTrue(isinstance(load_coders[0], list))


if __name__ == '__main__':
unittest.main()
11 changes: 9 additions & 2 deletions sdks/python/apache_beam/internal/pickler.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ def load_session(file_path):
return desired_pickle_lib.load_session(file_path)


def is_currently_dill():
return desired_pickle_lib == dill_pickler


def is_currently_cloudpickle():
return desired_pickle_lib == cloudpickle_pickler


def set_library(selected_library=DEFAULT_PICKLE_LIB):
""" Sets pickle library that will be used. """
global desired_pickle_lib
Expand All @@ -108,12 +116,11 @@ def set_library(selected_library=DEFAULT_PICKLE_LIB):
"Pipeline option pickle_library=dill_unsafe is set, but dill is not "
"installed. Install dill in job submission and runtime environments.")

is_currently_dill = (desired_pickle_lib == dill_pickler)
dill_is_requested = (
selected_library == USE_DILL or selected_library == USE_DILL_UNSAFE)

# If switching to or from dill, update the pickler hook overrides.
if is_currently_dill != dill_is_requested:
if is_currently_dill() != dill_is_requested:
dill_pickler.override_pickler_hooks(selected_library == USE_DILL)

if dill_is_requested:
Expand Down
29 changes: 27 additions & 2 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@
# Map defined with option names to flag names for boolean options
# that have a destination(dest) in parser.add_argument() different
# from the flag name and whose default value is `None`.
_FLAG_THAT_SETS_FALSE_VALUE = {'use_public_ips': 'no_use_public_ips'}
_FLAG_THAT_SETS_FALSE_VALUE = {
'use_public_ips': 'no_use_public_ips',
'save_main_session': 'no_save_main_session'
}
# Set of options which should not be overriden when applying options from a
# different language. This is relevant when using x-lang transforms where the
# expansion service is started up with some pipeline options, and will
Expand Down Expand Up @@ -1672,14 +1675,23 @@ def _add_argparse_args(cls, parser):
choices=['cloudpickle', 'default', 'dill', 'dill_unsafe'])
parser.add_argument(
'--save_main_session',
default=False,
default=None,
action='store_true',
help=(
'Save the main session state so that pickled functions and classes '
'defined in __main__ (e.g. interactive session) can be unpickled. '
'Some workflows do not need the session state if for instance all '
'their functions/classes are defined in proper modules '
'(not __main__) and the modules are importable in the worker. '))
parser.add_argument(
'--no_save_main_session',
default=None,
action='store_false',
dest='save_main_session',
help=(
'Disable saving the main session state. It is enabled/disabled by'
'default for cloudpickle/dill pickler. See "save_main_session".'))

parser.add_argument(
'--sdk_location',
default='default',
Expand Down Expand Up @@ -1780,10 +1792,23 @@ def _add_argparse_args(cls, parser):
'If not specified, the default Maven Central repository will be '
'used.'))

def _handle_load_main_session(self, validator):
save_main_session = getattr(self, 'save_main_session')
if save_main_session is None:
# save_main_session default to False for dill, while default to true
# for cloudpickle
pickle_library = getattr(self, 'pickle_library')
if pickle_library in ['default', 'cloudpickle']:
setattr(self, 'save_main_session', True)
else:
setattr(self, 'save_main_session', False)
return []

def validate(self, validator):
errors = []
errors.extend(validator.validate_container_prebuilding_options(self))
errors.extend(validator.validate_pickle_library(self))
errors.extend(self._handle_load_main_session(validator))
return errors


Expand Down
63 changes: 43 additions & 20 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_options
from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_streaming_options
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
from apache_beam.runners.internal import names
from apache_beam.runners.runner import PipelineState
from apache_beam.testing.extra_assertions import ExtraAssertionsMixin
from apache_beam.testing.test_pipeline import TestPipeline
Expand Down Expand Up @@ -243,6 +244,18 @@ def test_create_runner(self):
self.assertTrue(
isinstance(create_runner('TestDataflowRunner'), TestDataflowRunner))

@staticmethod
def dependency_proto_from_main_session_file(serialized_path):
return [
beam_runner_api_pb2.ArtifactInformation(
type_urn=common_urns.artifact_types.FILE.urn,
type_payload=serialized_path,
role_urn=common_urns.artifact_roles.STAGING_TO.urn,
role_payload=beam_runner_api_pb2.ArtifactStagingToRolePayload(
staged_name=names.PICKLED_MAIN_SESSION_FILE).SerializeToString(
))
]

def test_environment_override_translation_legacy_worker_harness_image(self):
self.default_properties.append('--experiments=beam_fn_api')
self.default_properties.append('--worker_harness_container_image=LEGACY')
Expand All @@ -256,17 +269,22 @@ def test_environment_override_translation_legacy_worker_harness_image(self):
| 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
| ptransform.GroupByKey())

actual = list(remote_runner.proto_pipeline.components.environments.values())
self.assertEqual(len(actual), 1)
actual = actual[0]
file_path = actual.dependencies[0].type_payload
# Dependency payload contains main_session from a transient temp directory
# Use actual for expected value.
main_session_dep = self.dependency_proto_from_main_session_file(file_path)
self.assertEqual(
list(remote_runner.proto_pipeline.components.environments.values()),
[
beam_runner_api_pb2.Environment(
urn=common_urns.environments.DOCKER.urn,
payload=beam_runner_api_pb2.DockerPayload(
container_image='LEGACY').SerializeToString(),
capabilities=environments.python_sdk_docker_capabilities(),
dependencies=environments.python_sdk_dependencies(
options=options))
])
actual,
beam_runner_api_pb2.Environment(
urn=common_urns.environments.DOCKER.urn,
payload=beam_runner_api_pb2.DockerPayload(
container_image='LEGACY').SerializeToString(),
capabilities=environments.python_sdk_docker_capabilities(),
dependencies=environments.python_sdk_dependencies(options=options) +
main_session_dep))

def test_environment_override_translation_sdk_container_image(self):
self.default_properties.append('--experiments=beam_fn_api')
Expand All @@ -281,17 +299,22 @@ def test_environment_override_translation_sdk_container_image(self):
| 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
| ptransform.GroupByKey())

actual = list(remote_runner.proto_pipeline.components.environments.values())
self.assertEqual(len(actual), 1)
actual = actual[0]
file_path = actual.dependencies[0].type_payload
# Dependency payload contains main_session from a transient temp directory
# Use actual for expected value.
main_session_dep = self.dependency_proto_from_main_session_file(file_path)
self.assertEqual(
list(remote_runner.proto_pipeline.components.environments.values()),
[
beam_runner_api_pb2.Environment(
urn=common_urns.environments.DOCKER.urn,
payload=beam_runner_api_pb2.DockerPayload(
container_image='FOO').SerializeToString(),
capabilities=environments.python_sdk_docker_capabilities(),
dependencies=environments.python_sdk_dependencies(
options=options))
])
actual,
beam_runner_api_pb2.Environment(
urn=common_urns.environments.DOCKER.urn,
payload=beam_runner_api_pb2.DockerPayload(
container_image='FOO').SerializeToString(),
capabilities=environments.python_sdk_docker_capabilities(),
dependencies=environments.python_sdk_dependencies(options=options) +
main_session_dep))

def test_remote_runner_translation(self):
remote_runner = DataflowRunner()
Expand Down
1 change: 0 additions & 1 deletion sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,6 @@ def create_job_resources(
pickled_session_file = os.path.join(
temp_dir, names.PICKLED_MAIN_SESSION_FILE)
pickler.dump_session(pickled_session_file)
# for pickle_library: cloudpickle, dump_session is no op
if os.path.exists(pickled_session_file):
resources.append(
Stager._create_file_stage_to_artifact(
Expand Down
7 changes: 5 additions & 2 deletions sdks/python/apache_beam/runners/portability/stager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def test_with_main_session(self):
# (https://github.com/apache/beam/issues/21457): Remove the decorator once
# cloudpickle is default pickle library
@pytest.mark.no_xdist
def test_main_session_not_staged_when_using_cloudpickle(self):
def test_main_session_staged_when_using_cloudpickle(self):
staging_dir = self.make_temp_dir()
options = PipelineOptions()

Expand All @@ -209,7 +209,10 @@ def test_main_session_not_staged_when_using_cloudpickle(self):
# session is saved when pickle_library==cloudpickle.
options.view_as(SetupOptions).pickle_library = pickler.USE_CLOUDPICKLE
self.update_options(options)
self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILE],
self.assertEqual([
names.PICKLED_MAIN_SESSION_FILE,
stager.SUBMISSION_ENV_DEPENDENCIES_FILE
],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])

Expand Down
Loading
Loading