Skip to content

feat: Python UDFs: per-session inlining toggle and strict refusal setting#1546

Open
timsaucer wants to merge 14 commits into
apache:mainfrom
timsaucer:pr3-toggle-sender-strict
Open

feat: Python UDFs: per-session inlining toggle and strict refusal setting#1546
timsaucer wants to merge 14 commits into
apache:mainfrom
timsaucer:pr3-toggle-sender-strict

Conversation

@timsaucer
Copy link
Copy Markdown
Member

@timsaucer timsaucer commented May 15, 2026

Which issue does this PR close?

Addresses part of #1517

This is PR 3 of 4. The subsequent PRs target this branch's tip until it merges.

Follow up PR:

Rationale for this change

PRs 1 and 2 ship Python UDFs inline through the codec. There is a follow-on need: Untrusted-input decoding. A receiver that may read Expr.from_bytes input from an untrusted source must refuse to invoke cloudpickle.loads on the inline payload. (pickle.loads on untrusted input is still unsafe regardless of this toggle — see the security note in the docstrings.)

We resolve this by an on/off switch at the codec level. The codec already sits on every session, so the toggle is naturally per-session.

What changes are included in this PR?

  • Add a toggle at the SessionContext level to enable/disable Python inlining of UDFs, which gets passed through to the codec layer.
  • Add a sender-side context setting to match the worker side context setting
  • Add unit tests, including pickle multiprocessing example that demonstrates need for the sender side context
  • Add timeout to unit test because bad behavior could cause a hang on multiprocessing (courtesy of @ntjohnson1)

Are there any user-facing changes?

Yes, but it's pure addition:

  • SessionContext.with_python_udf_inlining is a new method.
  • datafusion.ipc.set_sender_ctx / get_sender_ctx / clear_sender_ctx are new functions for propagating a configured session through pickle.dumps.

The user-guide page documenting the full pattern (and the multiprocessing / Ray runnable examples) lands in PR 4 of this series.

timsaucer and others added 2 commits May 21, 2026 07:45
…fusal

Adds a per-session toggle that turns inline Python UDF encoding on or
off, plus the supporting plumbing to make it usable through
pickle.dumps.

Codec layer:
  * PythonLogicalCodec / PythonPhysicalCodec gain a python_udf_inlining
    bool (default true) and a with_python_udf_inlining(enabled) builder.
    Each try_encode_udf{,af,wf} short-circuits to inner when the toggle
    is off; each try_decode_udf{,af,wf} that recognizes a DFPY* magic
    on a strict codec returns a clean Execution error instead of
    invoking cloudpickle.loads. The refusal message names the UDF and
    the wire family so an operator can see at a glance whether to
    re-encode the bytes or register the UDF on the receiver.

Session layer:
  * PySessionContext::with_python_udf_inlining(enabled) returns a new
    session whose stacked logical + physical codecs both carry the
    toggle. The Arc<SessionState> is cloned (cheap), only the codec
    pair is rebuilt, so registrations and config stay attached.
  * SessionContext.with_python_udf_inlining(*, enabled) is the Python
    wrapper. enabled is keyword-only because positional booleans at
    the call site read as opaque.

Sender-side context:
  * datafusion.ipc gains set_sender_ctx / get_sender_ctx /
    clear_sender_ctx thread-locals. Expr.__reduce__ now consults
    get_sender_ctx() to pick the codec for outbound pickles, which is
    the only path through which a strict session affects pickle.dumps
    (the protocol calls __reduce__ with no arguments). Without a
    sender context the default codec is used.

Tests:
  * test_pickle_expr.py picks up TestPythonUdfInliningToggle (covers
    both directions of the toggle plus the explicit-ctx fast path),
    TestWorkerCtxLifecycle (set/clear/threading), and
    TestSenderCtxLifecycle.
  * New test_pickle_multiprocessing.py + helpers exercise the full
    driver -> worker round-trip on a multiprocessing.Pool with set_*_ctx
    installed in the worker initializer.
  * CI workflow gets a 30-minute timeout-minutes backstop so a hung
    pickle worker can't block the matrix indefinitely.

User-guide docs and the runnable examples land in PR4 of this series.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@timsaucer timsaucer force-pushed the pr3-toggle-sender-strict branch from d680e12 to 14178db Compare May 21, 2026 11:52
timsaucer and others added 4 commits May 21, 2026 08:19
Rewrite with_python_udf_inlining docstring for readability and remove
references to /user-guide/io/distributing_work, which does not exist
yet. Keep security warning inline as a .. warning:: Security block,
matching the existing pattern in Expr.to_bytes / from_bytes /
__reduce__. The central doc will land in a follow-on PR.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per CLAUDE.md, every Python function needs a docstring example.
Adds examples to with_python_udf_inlining, set_sender_ctx,
clear_sender_ctx, and get_sender_ctx. Also clarifies that
with_python_udf_inlining returns a new SessionContext and leaves
the original unchanged, matching the with_logical_extension_codec
pattern.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* codec: strict refusal routes through `read_framed_payload` so
  malformed inline bytes surface their own diagnostic; the
  "inlining is disabled" message now fires only when the payload
  would have decoded.
* codec: add summary line above `PythonPhysicalCodec::with_python_udf_inlining`
  cross-link for rustdoc rendering.
* expr: hoist `get_sender_ctx` import to module top; note that
  `__reduce__` also drives `copy.copy` / `copy.deepcopy`.
* context: accept `with_python_udf_inlining` positionally or as
  kwarg (drop `*,`).
* tests: replace size-ratio heuristic with semantic check for the
  `DFPYUDF` family prefix; switch single-batch closure test to
  `pool.apply`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- `SessionContext.with_python_udf_inlining` now keyword-only (`*, enabled`)
  to match the documented call style and the existing doctests/tests.
- `refuse_if_inline` and the three `try_decode_python_*` decoders short-
  circuit on a `starts_with(family)` check before `Python::attach`, so
  plans whose UDFs are not Python-defined no longer pay a GIL acquisition
  per decode call. Semantics preserved: `strip_wire_header` already
  returns `Ok(None)` when the prefix does not match.
- `datafusion.ipc` module docstring wraps the `set_sender_ctx` example in
  `try`/`finally` and notes that the thread-local holds a strong
  reference to the installed `SessionContext` until cleared.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@timsaucer timsaucer changed the title feat: per-session Python UDF inlining toggle + sender ctx + strict refusal (3/4) feat: per-session Python UDF inlining toggle and strict refusal setting May 21, 2026
@timsaucer timsaucer changed the title feat: per-session Python UDF inlining toggle and strict refusal setting feat: Python UDFs: per-session inlining toggle and strict refusal setting May 21, 2026
@timsaucer timsaucer marked this pull request as ready for review May 21, 2026 13:35
timsaucer and others added 4 commits May 21, 2026 17:05
Multiprocessing forkserver/spawn hang was diagnosed and fixed: workers
could not import `tests._pickle_multiprocessing_helpers` because
`pytest --import-mode=importlib` does not add the test parent dir to
`sys.path`. The fix (appending the parent dir to `sys.path` so it is
inherited by mp workers without shadowing the installed `datafusion`
wheel) is retained. This commit drops the diagnostic scaffolding that
was added to identify the hang point:

- `_diag` + per-import / per-task log writes to /tmp
- `snapshot_processes` and the `threading.Timer` that captured worker
  state mid-hang
- `diag_init` Pool initializer
- "Dump multiprocessing diagnostic log" CI step

Pre-existing infrastructure is kept: per-test `@pytest.mark.timeout(120)`
(backed by `pytest-timeout` dev dep) and the job-level
`timeout-minutes: 30` backstop on the test matrix.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@timsaucer timsaucer marked this pull request as ready for review May 22, 2026 16:55
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a per-session switch to control whether Python UDF definitions are inlined into serialized expressions, enabling “strict” decode behavior that refuses inline (cloudpickled) payloads, and introduces a driver-side sender context so pickle.dumps(Expr) can honor that session configuration.

Changes:

  • Introduce SessionContext.with_python_udf_inlining(enabled=...) and plumb the toggle through the Rust logical/physical codecs to gate inline encode/decode (and refuse inline payloads when disabled).
  • Add datafusion.ipc sender-context APIs (set_sender_ctx/get_sender_ctx/clear_sender_ctx) and wire Expr.__reduce__ to use the sender context for pickling.
  • Add unit tests (including multiprocessing coverage) plus CI/test harness guardrails (pytest-timeout, workflow timeout).

Reviewed changes

Copilot reviewed 10 out of 11 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
crates/core/src/codec.rs Adds the inlining toggle to Python codecs and strict refusal logic for inline UDF payloads.
crates/core/src/context.rs Exposes a Rust PySessionContext.with_python_udf_inlining constructor-style method.
python/datafusion/context.py Adds public Python API SessionContext.with_python_udf_inlining.
python/datafusion/expr.py Makes pickling honor a driver-side sender context via get_sender_ctx() and updates serialization docs.
python/datafusion/ipc.py Adds sender-context thread-local APIs and expands driver/worker distribution docs.
python/tests/test_pickle_expr.py Adds coverage for strict inlining behavior and sender/worker context lifecycle semantics.
python/tests/test_pickle_multiprocessing.py Adds cross-process pickle tests across multiprocessing start methods with timeouts.
python/tests/_pickle_multiprocessing_helpers.py Provides importable worker helpers for multiprocessing tests (not pytest-collected).
.github/workflows/test.yml Adds a job-level timeout to prevent hung multiprocessing runs from stalling CI.
pyproject.toml Adds pytest-timeout to dev dependencies.
uv.lock Locks pytest-timeout and updates the resolved dev dependency set.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread crates/core/src/codec.rs Outdated
Comment thread python/datafusion/expr.py Outdated
Address PR review feedback:

- codec.rs: rewrite strict-refusal error to present the two real
  remediations (sender re-encode by-name + receiver register; or
  receiver enables inlining, accepting cloudpickle risk) instead of
  bundling registration with both-side inlining.
- expr.py: qualify to_bytes docstring so Python UDF self-contained
  behavior is conditional on with_python_udf_inlining being enabled.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants