-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Save Logical Type and Coder Registry on cloudpickle save main session #36271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Summary of ChangesHello @Abacn, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical issue in the Apache Beam Python SDK where user-defined logical types and schema registries were not correctly persisted when using Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
d358b81 to
5bfae4e
Compare
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
18c6524 to
dedbdf5
Compare
dedbdf5 to
e64910c
Compare
|
Tested with Dataflow, registered a user logical type and a namedtuple coder in main session, and checked logical type/coder registry had these items in DoFn process (job id): 2025-09-25_11_50_50-8758466825726433704 |
e64910c to
0091b24
Compare
|
beam_PreCommit_Yaml_Xlang_Direct failing on master, not related to this change |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
shunping
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR looks great! I left some minor comments for naming style.
I think after this is merged, we can probably revert some JDBC logical type fixes we had before.
|
Btw, is this enabled by default for cloudpickle or do users need to set the |
These fixes stay there as previously it relied on io.jdbc gets imported to make those type work, which was error prone
Not by default. The goal of this PR is to restore the save_main_sesson behavior pre 2.65.0 (make it effective again). Enable "save_main_sesson" by default for cloudpickle is left for future discussion as it affects users using default pipeline options. |
CHANGES.md
Outdated
| * PulsarIO has now changed support status from incomplete to experimental. Both read and writes should now minimally | ||
| function (un-partitioned topics, without schema support, timestamp ordered messages for read) (Java) | ||
| ([#36141](https://github.com/apache/beam/issues/36141)). | ||
| * (Python) Logical type and coder registry are saved for pipelines with `save_main_session` pipeline option enabled in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use a new flag for this, or no flag at all (if we can infer that a user is using coders that are not registered in schemas.py). save_main_session is misleading, we are not really doing that.
Can we determine if a coder was added from anywhere that is not schemas.py, and then automatically pickle only those coders that are not registered in schemas.py?
A lot of users still have save_main_session set on their pipelines that used to run on dill and now we will pickle coder registry even if they don't really need this functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it becomes misleading naming, unfortunately this flag was named by implementation detail instead of what user want to achieve (states at submission session env are preserved job execution time). Here I want to un-break existing pipeline option setting. If user needs a new flag on upgrading their pipeline they can choose to add --pickle_library=dill. Probably one can add an alias "--save_beam_registries" and recommend for use for new users.
Making it enabled all times is a step further and has risk of new breakages, though I haven't observe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| 'Save the main session state so that pickled functions and classes ' |
I am still worried that changing the default behavior for cloudpickle when save_main_session=True which used to be a no-op (since cloudpickle doesnt need main session to unpickle types defined in main) can lead to some kind of errors.
I think having an explicit flag so that users who run into a logical type issue can use is reasonable instead of pickling registries for all users that unintentionally set the save_main_session=True flag but don't use custom logical types at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to unbreak existing pipelines that use custom logical types, can we detect that someone registered a logical type (from outside schemas.py) and set the save_beam_registries=True?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well there are two use case here
-
User set
--save_main_sessionin Beam 2.64.0 and relies on it (for registries). This should still work in newer version. This PR fixes this case. -
User set
--save_main_sessionin Beam 2.64.0 and no longer relies on it (e.g. for user defined classes). This change won't break them, as registries are merged on load. Yes it is no longer "no-op" but expected to stay no effect as long as remote SDK versions are expected to be in sync. If it unexpectedly having error it's due to some other unexposed bugs, and users should then remove--save_main_sessionflag because this flag was "unintentional"
can we detect that someone registered a logical type
It's hard. logical type registry is used to not only adding new types but also to overwrite the mapping of those defined in schema.py (e.g. the notorious MillisInstant)
| # Since References are saved (https://s.apache.org/beam-picklers), we only | ||
| # dump supported Beam Registries (currently only logical type registry) | ||
| from apache_beam.typehints import schemas | ||
| from apache_beam.coders import typecoders |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please explain how coder registry is related here? Is it a separate issue or the same logical type issue?
I believe all custom coders are fully pickled in the pipeline, and there are currently no problems with custom coders?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a related issue. I made a minimum example pipeline based on https://github.com/apache/beam/blob/bb340c2f66ac8730334160d6ed5ecd18822d059d/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
in particular, in L67 there it registered a coder
coders.registry.register_coder(JdbcTestRow, coders.RowCoder)
I checked with dill and save main session, this mapping is registered in coder registry on worker. But for cloudpickle, it is no longer registered on worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it matter if the coder is registered on the worker though? My understanding is coders.registry.register_coder(JdbcTestRow, coders.RowCoder) just maps any transforms that use JdbcTestRow to use RowCoder during pipeline construction.
Then the RowCoder is pickled as part of the pipeline proto, and the worker doesnt actually need the coder registry, it just uses the pickled coder.
Am I missing something? Can you reproduce some sort of error in the case where coder registry is not correct on worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true. It doesn't have error, meaning "worker doesnt actually need the coder registry" in this use case. The original error is due to logical type registry, adding coder registry here is for completeness conceptually as both are exposed to user.
| pickler = cloudpickle.CloudPickler(file) | ||
| # TODO(https://github.com/apache/beam/issues/18500) add file system registry | ||
| # once implemented | ||
| pickler.dump({"coder": coder_reg, "logicaltype": logicaltype_reg}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of only pickling/loading the coders that are not registered in schemas.py? Maybe it does not matter though, but everything registered in schemas.py will be registered on import of that file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only pickling/loading the coders that are not registered in schemas.py?
I understand this change already handles this. It uses "get_custom_type_coder_tuples". Standard coders are registered inside typecoders.py, which uses "_register_coder_internal" directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I was referring to the LogicalType registry, sorry
|
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
…n session fix naming
d195d71 to
8383902
Compare
8383902 to
f2d6d5c
Compare
|
R: @claudevdm Finally got another iteration |
claudevdm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
- Can we add an integration test/validatesrunner test?
- Can we dump/load session by default if we detect that the user has used custom logical types? I dont think it is necessary for a user to control this via flag if we can detect, but maybe we can keep an option to explicitly disable it in case there are issues?
c93c7a8 to
c3e00ba
Compare
Added a unit test. Tried to add a local integration test (run on prism runner) but find that Prism does not save/load sessions. Actually only service_runner (DataflowRunner) do. Now prefer to revert #35659 (and other workarounds) in follow ups, then the path is exercised by integration tests previously affected
Done. load session is now enabled by default for cloudpickle and introduced a "disable_save_main_session" flag to explicitly disable it |
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request addresses issue #35738 by restoring the functionality of user-registered logical types and schema, ensuring that the logical type registry and coder registry are saved and restored when --save_main_session is enabled with cloudpickle. The changes involve refactoring the type coders, enhancing cloudpickle's session management, and adjusting pipeline options and worker behavior to properly handle these registries. The introduction of internal registration mechanisms for logical types helps differentiate between built-in and user-defined types, ensuring only relevant data is serialized. Overall, the changes are well-implemented and directly address the reported bug.
claudevdm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we run a postcommit workflow to see if it breaks any integration tests?
|
PreCommit Python ML tests with ML deps installed was failing on master branch, not related to this change PostCommit Python Xlang Gcp Dataflow failing spanner tests also on master branch, not related to this change beam_PostCommit_Python failing mock API tests also on master branch, not related to this change |
Fix #35738
Restore functionality of user registered logical type and schema, thus preserves the behavior that logical type registry and coder registry are saved to worker when pipeline option
--save_main_sessionexplicitly added.Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.