Skip to content

Commit a24daf6

Browse files
timsaucerclaude
andcommitted
refactor: address review nits for UDF inlining toggle + sender ctx
* codec: strict refusal routes through `read_framed_payload` so malformed inline bytes surface their own diagnostic; the "inlining is disabled" message now fires only when the payload would have decoded. * codec: add summary line above `PythonPhysicalCodec::with_python_udf_inlining` cross-link for rustdoc rendering. * expr: hoist `get_sender_ctx` import to module top; note that `__reduce__` also drives `copy.copy` / `copy.deepcopy`. * context: accept `with_python_udf_inlining` positionally or as kwarg (drop `*,`). * tests: replace size-ratio heuristic with semantic check for the `DFPYUDF` family prefix; switch single-batch closure test to `pool.apply`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 7dd4252 commit a24daf6

5 files changed

Lines changed: 57 additions & 31 deletions

File tree

crates/core/src/codec.rs

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,8 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
340340
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
341341
return Ok(udf);
342342
}
343-
} else if buf.starts_with(PY_SCALAR_UDF_FAMILY) {
344-
return Err(refuse_inline_payload("scalar UDF", name));
343+
} else {
344+
refuse_if_inline(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", name)?;
345345
}
346346
self.inner.try_decode_udf(name, buf)
347347
}
@@ -358,8 +358,8 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
358358
if let Some(udaf) = try_decode_python_udaf(buf)? {
359359
return Ok(udaf);
360360
}
361-
} else if buf.starts_with(PY_AGG_UDF_FAMILY) {
362-
return Err(refuse_inline_payload("aggregate UDF", name));
361+
} else {
362+
refuse_if_inline(buf, PY_AGG_UDF_FAMILY, "aggregate UDF", name)?;
363363
}
364364
self.inner.try_decode_udaf(name, buf)
365365
}
@@ -376,13 +376,30 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
376376
if let Some(udwf) = try_decode_python_udwf(buf)? {
377377
return Ok(udwf);
378378
}
379-
} else if buf.starts_with(PY_WINDOW_UDF_FAMILY) {
380-
return Err(refuse_inline_payload("window UDF", name));
379+
} else {
380+
refuse_if_inline(buf, PY_WINDOW_UDF_FAMILY, "window UDF", name)?;
381381
}
382382
self.inner.try_decode_udwf(name, buf)
383383
}
384384
}
385385

386+
/// Strict-mode gate: if `buf` is a well-framed inline payload for
387+
/// `family`, return the strict-refusal error; otherwise return
388+
/// `Ok(())` so the caller can delegate to its `inner` codec.
389+
///
390+
/// Routing through [`read_framed_payload`] (rather than a bare
391+
/// `starts_with` probe) means malformed inline bytes — wrong
392+
/// wire-format version, mismatched Python version, truncated header —
393+
/// surface *their* diagnostic instead of the strict-mode message.
394+
/// The strict message implies sender intent ("inlining is disabled"),
395+
/// so it should fire only when the bytes really would have decoded.
396+
fn refuse_if_inline(buf: &[u8], family: &[u8], kind: &str, name: &str) -> Result<()> {
397+
Python::attach(|py| match read_framed_payload(py, buf, family, kind)? {
398+
Some(_) => Err(refuse_inline_payload(kind, name)),
399+
None => Ok(()),
400+
})
401+
}
402+
386403
/// Build the error returned by a strict codec when it receives an
387404
/// inline Python-UDF payload it has been told not to deserialize.
388405
fn refuse_inline_payload(kind: &str, name: &str) -> datafusion::error::DataFusionError {
@@ -429,7 +446,10 @@ impl PythonPhysicalCodec {
429446
&self.inner
430447
}
431448

432-
/// See [`PythonLogicalCodec::with_python_udf_inlining`].
449+
/// Toggle inline encoding of Python UDFs on this physical codec.
450+
///
451+
/// Mirrors [`PythonLogicalCodec::with_python_udf_inlining`]; see
452+
/// that method for the full security and portability discussion.
433453
pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self {
434454
self.python_udf_inlining = enabled;
435455
self
@@ -472,8 +492,8 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
472492
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
473493
return Ok(udf);
474494
}
475-
} else if buf.starts_with(PY_SCALAR_UDF_FAMILY) {
476-
return Err(refuse_inline_payload("scalar UDF", name));
495+
} else {
496+
refuse_if_inline(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", name)?;
477497
}
478498
self.inner.try_decode_udf(name, buf)
479499
}
@@ -502,8 +522,8 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
502522
if let Some(udaf) = try_decode_python_udaf(buf)? {
503523
return Ok(udaf);
504524
}
505-
} else if buf.starts_with(PY_AGG_UDF_FAMILY) {
506-
return Err(refuse_inline_payload("aggregate UDF", name));
525+
} else {
526+
refuse_if_inline(buf, PY_AGG_UDF_FAMILY, "aggregate UDF", name)?;
507527
}
508528
self.inner.try_decode_udaf(name, buf)
509529
}
@@ -520,8 +540,8 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
520540
if let Some(udwf) = try_decode_python_udwf(buf)? {
521541
return Ok(udwf);
522542
}
523-
} else if buf.starts_with(PY_WINDOW_UDF_FAMILY) {
524-
return Err(refuse_inline_payload("window UDF", name));
543+
} else {
544+
refuse_if_inline(buf, PY_WINDOW_UDF_FAMILY, "window UDF", name)?;
525545
}
526546
self.inner.try_decode_udwf(name, buf)
527547
}

python/datafusion/context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1770,7 +1770,7 @@ def with_physical_extension_codec(self, codec: Any) -> SessionContext:
17701770
new.ctx = new_internal
17711771
return new
17721772

1773-
def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext:
1773+
def with_python_udf_inlining(self, enabled: bool) -> SessionContext:
17741774
"""Control whether Python UDFs are embedded in serialized expressions.
17751775
17761776
When ``enabled=True`` (the default), serialized expressions carry

python/datafusion/expr.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353

5454
from ._internal import expr as expr_internal
5555
from ._internal import functions as functions_internal
56+
from .ipc import get_sender_ctx
5657

5758
if TYPE_CHECKING:
5859
from collections.abc import Sequence
@@ -594,10 +595,10 @@ def __reduce__(self) -> tuple[Callable[[bytes], Expr], tuple[bytes]]:
594595
The encoding side honors a driver-side sender context installed
595596
via :func:`datafusion.ipc.set_sender_ctx` — that is how
596597
:meth:`SessionContext.with_python_udf_inlining` propagates
597-
through ``pickle.dumps``.
598+
through ``pickle.dumps``. The sender context is read by
599+
``__reduce__``, so :func:`copy.copy` and :func:`copy.deepcopy`
600+
— which also go through ``__reduce__`` — pick it up too.
598601
"""
599-
from datafusion.ipc import get_sender_ctx
600-
601602
return (Expr._reconstruct, (self.to_bytes(get_sender_ctx()),))
602603

603604
@classmethod

python/tests/test_pickle_expr.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -337,9 +337,13 @@ def _build_double_udf(self):
337337
name="double",
338338
)
339339

340-
def test_strict_encoder_emits_smaller_blob(self):
341-
"""Strict mode skips cloudpickle of the Python callable, so the
342-
encoded bytes are dramatically smaller than the inline form."""
340+
def test_strict_encoder_omits_inline_payload(self):
341+
"""Strict mode emits the by-name wire form: no `DFPYUDF` magic
342+
in the blob, no cloudpickled callable. Semantic check is
343+
sharper than a size-ratio heuristic — a renamed UDF or a
344+
smaller-than-expected closure would still flip the magic
345+
bytes, but might not move the size by 4x.
346+
"""
343347
ctx_inline = SessionContext()
344348
ctx_strict = ctx_inline.with_python_udf_inlining(enabled=False)
345349
u = self._build_double_udf()
@@ -348,7 +352,10 @@ def test_strict_encoder_emits_smaller_blob(self):
348352
blob_inline = e.to_bytes(ctx_inline)
349353
blob_strict = e.to_bytes(ctx_strict)
350354

351-
assert len(blob_strict) < len(blob_inline) // 4
355+
# `DFPYUDF` is the scalar Python-UDF family prefix; see
356+
# `PY_SCALAR_UDF_FAMILY` in crates/core/src/codec.rs.
357+
assert b"DFPYUDF" in blob_inline
358+
assert b"DFPYUDF" not in blob_strict
352359

353360
def test_toggle_off_then_on_restores_inline_encoding(self):
354361
"""`with_python_udf_inlining` is per-call clone semantics:
@@ -417,10 +424,10 @@ def test_strict_decoder_refuses_inline_payload(self):
417424
def test_sender_ctx_propagates_through_pickle(self):
418425
"""`set_sender_ctx` makes `pickle.dumps` use a strict codec.
419426
420-
Without a sender context, pickle defaults to the inline codec and
421-
the blob is large. With a strict sender context installed, the
422-
blob shrinks because the Python callable is encoded by name
423-
instead of cloudpickled.
427+
Without a sender context, pickle defaults to the inline codec
428+
and the blob contains the `DFPYUDF` family prefix. With a
429+
strict sender context installed, the callable encodes by name
430+
and the prefix is absent.
424431
"""
425432
u = self._build_double_udf()
426433
e = u(col("a"))
@@ -434,7 +441,8 @@ def test_sender_ctx_propagates_through_pickle(self):
434441
finally:
435442
clear_sender_ctx()
436443

437-
assert len(blob_strict) < len(blob_default) // 4
444+
assert b"DFPYUDF" in blob_default
445+
assert b"DFPYUDF" not in blob_strict
438446

439447
def test_sender_ctx_strict_roundtrip_via_pickle(self):
440448
"""End-to-end pickle round-trip with strict mode on both sides.

python/tests/test_pickle_multiprocessing.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,6 @@ def test_closure_capturing_udf_via_pool(start_method):
123123

124124
ctx = mp.get_context(start_method)
125125
with ctx.Pool(processes=2) as pool:
126-
results = pool.starmap(
127-
helpers.unpickle_and_evaluate,
128-
[(blob, [1, 2, 3])],
129-
)
126+
result = pool.apply(helpers.unpickle_and_evaluate, (blob, [1, 2, 3]))
130127

131-
assert results[0] == [7, 14, 21]
128+
assert result == [7, 14, 21]

0 commit comments

Comments
 (0)