Skip to content

Conversation

@Abacn
Copy link
Contributor

@Abacn Abacn commented Sep 24, 2025

Fix #35738

Restore functionality of user registered logical type and schema, thus preserves the behavior that logical type registry and coder registry are saved to worker when pipeline option --save_main_session explicitly added.

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@Abacn Abacn marked this pull request as draft September 24, 2025 20:47
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @Abacn, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical issue in the Apache Beam Python SDK where user-defined logical types and schema registries were not correctly persisted when using cloudpickle in conjunction with the save_main_session pipeline option. This oversight, which became prominent after cloudpickle was made the default pickler, meant that custom type definitions were lost across pipeline stages. The changes explicitly implement the saving and loading of these registries within cloudpickle, ensuring that the save_main_session option now functions as expected for users relying on custom type definitions.

Highlights

  • Cloudpickle Main Session Saving: The cloudpickle pickler now correctly saves and loads user-registered Logical Type and Schema Registries when the save_main_session pipeline option is enabled. This resolves an issue where these custom type definitions were not persisted across pipeline stages, particularly after cloudpickle became the default pickler in Beam 2.65.0.
  • Pickler-Agnostic Session Loading: The main session loading logic in sdk_worker_main.py has been refactored to be more pickler-agnostic, allowing _load_main_session to be called universally, with internal logic handling dill and cloudpickle specific behaviors.
  • Improved Warning/Error Messages: Warning and error messages related to main session loading have been enhanced to provide more specific feedback based on whether dill or cloudpickle is the active pickler, improving diagnostic clarity.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@Abacn Abacn force-pushed the save-main-session-registries branch 4 times, most recently from d358b81 to 5bfae4e Compare September 25, 2025 01:52
@Abacn Abacn marked this pull request as ready for review September 25, 2025 01:53
@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@Abacn Abacn force-pushed the save-main-session-registries branch from 18c6524 to dedbdf5 Compare September 25, 2025 04:10
@Abacn Abacn changed the title Save Logical Type Registry and Schema Registry on cloudpickle save main session Save Logical Type Registry on cloudpickle save main session Sep 25, 2025
@Abacn Abacn force-pushed the save-main-session-registries branch from dedbdf5 to e64910c Compare September 25, 2025 18:59
@Abacn Abacn changed the title Save Logical Type Registry on cloudpickle save main session Save Logical Type and Coder Registry on cloudpickle save main session Sep 25, 2025
@Abacn
Copy link
Contributor Author

Abacn commented Sep 25, 2025

Tested with Dataflow, registered a user logical type and a namedtuple coder in main session, and checked logical type/coder registry had these items in DoFn process (job id): 2025-09-25_11_50_50-8758466825726433704

@Abacn Abacn force-pushed the save-main-session-registries branch from e64910c to 0091b24 Compare September 25, 2025 19:08
@Abacn
Copy link
Contributor Author

Abacn commented Sep 25, 2025

PreCommit Whitespace failing on master branch, not related to this change (fixed on master)

beam_PreCommit_Yaml_Xlang_Direct failing on master, not related to this change

@Abacn
Copy link
Contributor Author

Abacn commented Sep 25, 2025

R: @shunping @claudevdm

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

Copy link
Collaborator

@shunping shunping left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR looks great! I left some minor comments for naming style.

I think after this is merged, we can probably revert some JDBC logical type fixes we had before.

@shunping
Copy link
Collaborator

Btw, is this enabled by default for cloudpickle or do users need to set the --save_main_sesson flag?

@Abacn
Copy link
Contributor Author

Abacn commented Sep 30, 2025

we can probably revert some JDBC logical type fixes we had before

These fixes stay there as previously it relied on io.jdbc gets imported to make those type work, which was error prone

Btw, is this enabled by default for cloudpickle or do users need to set the --save_main_sesson flag?

Not by default. The goal of this PR is to restore the save_main_sesson behavior pre 2.65.0 (make it effective again). Enable "save_main_sesson" by default for cloudpickle is left for future discussion as it affects users using default pipeline options.

CHANGES.md Outdated
* PulsarIO has now changed support status from incomplete to experimental. Both read and writes should now minimally
function (un-partitioned topics, without schema support, timestamp ordered messages for read) (Java)
([#36141](https://github.com/apache/beam/issues/36141)).
* (Python) Logical type and coder registry are saved for pipelines with `save_main_session` pipeline option enabled in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use a new flag for this, or no flag at all (if we can infer that a user is using coders that are not registered in schemas.py). save_main_session is misleading, we are not really doing that.

Can we determine if a coder was added from anywhere that is not schemas.py, and then automatically pickle only those coders that are not registered in schemas.py?

A lot of users still have save_main_session set on their pipelines that used to run on dill and now we will pickle coder registry even if they don't really need this functionality.

Copy link
Contributor Author

@Abacn Abacn Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it becomes misleading naming, unfortunately this flag was named by implementation detail instead of what user want to achieve (states at submission session env are preserved job execution time). Here I want to un-break existing pipeline option setting. If user needs a new flag on upgrading their pipeline they can choose to add --pickle_library=dill. Probably one can add an alias "--save_beam_registries" and recommend for use for new users.

Making it enabled all times is a step further and has risk of new breakages, though I haven't observe

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'Save the main session state so that pickled functions and classes '

I am still worried that changing the default behavior for cloudpickle when save_main_session=True which used to be a no-op (since cloudpickle doesnt need main session to unpickle types defined in main) can lead to some kind of errors.

I think having an explicit flag so that users who run into a logical type issue can use is reasonable instead of pickling registries for all users that unintentionally set the save_main_session=True flag but don't use custom logical types at all.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to unbreak existing pipelines that use custom logical types, can we detect that someone registered a logical type (from outside schemas.py) and set the save_beam_registries=True?

Copy link
Contributor Author

@Abacn Abacn Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well there are two use case here

  • User set --save_main_session in Beam 2.64.0 and relies on it (for registries). This should still work in newer version. This PR fixes this case.

  • User set --save_main_session in Beam 2.64.0 and no longer relies on it (e.g. for user defined classes). This change won't break them, as registries are merged on load. Yes it is no longer "no-op" but expected to stay no effect as long as remote SDK versions are expected to be in sync. If it unexpectedly having error it's due to some other unexposed bugs, and users should then remove --save_main_session flag because this flag was "unintentional"


can we detect that someone registered a logical type

It's hard. logical type registry is used to not only adding new types but also to overwrite the mapping of those defined in schema.py (e.g. the notorious MillisInstant)

# Since References are saved (https://s.apache.org/beam-picklers), we only
# dump supported Beam Registries (currently only logical type registry)
from apache_beam.typehints import schemas
from apache_beam.coders import typecoders
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain how coder registry is related here? Is it a separate issue or the same logical type issue?

I believe all custom coders are fully pickled in the pipeline, and there are currently no problems with custom coders?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a related issue. I made a minimum example pipeline based on https://github.com/apache/beam/blob/bb340c2f66ac8730334160d6ed5ecd18822d059d/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py

in particular, in L67 there it registered a coder

coders.registry.register_coder(JdbcTestRow, coders.RowCoder)

I checked with dill and save main session, this mapping is registered in coder registry on worker. But for cloudpickle, it is no longer registered on worker.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it matter if the coder is registered on the worker though? My understanding is coders.registry.register_coder(JdbcTestRow, coders.RowCoder) just maps any transforms that use JdbcTestRow to use RowCoder during pipeline construction.

Then the RowCoder is pickled as part of the pipeline proto, and the worker doesnt actually need the coder registry, it just uses the pickled coder.

Am I missing something? Can you reproduce some sort of error in the case where coder registry is not correct on worker?

Copy link
Contributor Author

@Abacn Abacn Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. It doesn't have error, meaning "worker doesnt actually need the coder registry" in this use case. The original error is due to logical type registry, adding coder registry here is for completeness conceptually as both are exposed to user.

pickler = cloudpickle.CloudPickler(file)
# TODO(https://github.com/apache/beam/issues/18500) add file system registry
# once implemented
pickler.dump({"coder": coder_reg, "logicaltype": logicaltype_reg})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of only pickling/loading the coders that are not registered in schemas.py? Maybe it does not matter though, but everything registered in schemas.py will be registered on import of that file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only pickling/loading the coders that are not registered in schemas.py?

I understand this change already handles this. It uses "get_custom_type_coder_tuples". Standard coders are registered inside typecoders.py, which uses "_register_coder_internal" directly

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I was referring to the LogicalType registry, sorry

@Abacn Abacn marked this pull request as draft September 30, 2025 20:04
@github-actions
Copy link
Contributor

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Nov 30, 2025
@Abacn Abacn force-pushed the save-main-session-registries branch from d195d71 to 8383902 Compare December 1, 2025 19:28
@Abacn Abacn force-pushed the save-main-session-registries branch from 8383902 to f2d6d5c Compare December 1, 2025 19:56
@Abacn Abacn marked this pull request as ready for review December 1, 2025 20:57
@Abacn
Copy link
Contributor Author

Abacn commented Dec 1, 2025

R: @claudevdm

Finally got another iteration

@github-actions github-actions bot removed the stale label Dec 2, 2025
Copy link
Collaborator

@claudevdm claudevdm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

  1. Can we add an integration test/validatesrunner test?
  2. Can we dump/load session by default if we detect that the user has used custom logical types? I dont think it is necessary for a user to control this via flag if we can detect, but maybe we can keep an option to explicitly disable it in case there are issues?

@Abacn Abacn force-pushed the save-main-session-registries branch from c93c7a8 to c3e00ba Compare December 3, 2025 19:01
@Abacn
Copy link
Contributor Author

Abacn commented Dec 3, 2025

Can we add an integration test/validatesrunner test?

Added a unit test. Tried to add a local integration test (run on prism runner) but find that Prism does not save/load sessions. Actually only service_runner (DataflowRunner) do.

Now prefer to revert #35659 (and other workarounds) in follow ups, then the path is exercised by integration tests previously affected

Can we dump/load session by default if we detect that the user has used custom logical types? I dont think it is necessary for a user to control this via flag if we can detect, but maybe we can keep an option to explicitly disable it in case there are issues?

Done. load session is now enabled by default for cloudpickle and introduced a "disable_save_main_session" flag to explicitly disable it

@claudevdm
Copy link
Collaborator

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses issue #35738 by restoring the functionality of user-registered logical types and schema, ensuring that the logical type registry and coder registry are saved and restored when --save_main_session is enabled with cloudpickle. The changes involve refactoring the type coders, enhancing cloudpickle's session management, and adjusting pipeline options and worker behavior to properly handle these registries. The introduction of internal registration mechanisms for logical types helps differentiate between built-in and user-defined types, ensuring only relevant data is serialized. Overall, the changes are well-implemented and directly address the reported bug.

Copy link
Collaborator

@claudevdm claudevdm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we run a postcommit workflow to see if it breaks any integration tests?

@Abacn
Copy link
Contributor Author

Abacn commented Dec 5, 2025

PreCommit Python ML tests with ML deps installed was failing on master branch, not related to this change

PostCommit Python Xlang Gcp Dataflow failing spanner tests also on master branch, not related to this change

beam_PostCommit_Python failing mock API tests also on master branch, not related to this change

@Abacn Abacn merged commit f22d2bc into apache:master Dec 5, 2025
110 of 113 checks passed
@Abacn Abacn deleted the save-main-session-registries branch December 5, 2025 21:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: xlang JDBC (versions 2.65.0, 2.66.65)- No logical type registered for URN 'beam:logical_type:javasdk:v1'

3 participants