[SPARK-50520][PySpark] Respect timeout in df.rdd.countApprox()#56060
[SPARK-50520][PySpark] Respect timeout in df.rdd.countApprox()#56060rishav23 wants to merge 2 commits into
Conversation
|
@hvanhovell PTAL when you have a moment. Thanks! |
|
@cloud-fan PTAL . Thanks |
cloud-fan
left a comment
There was a problem hiding this comment.
1 blocking, 1 non-blocking, 0 nits.
The sumApprox -> initialValue change is correct and well-targeted; two gaps below.
Correctness (2)
- python/pyspark/core/rdd.py:4847 [blocking]: fix is incomplete --
meanApprox(rdd.py:4885) still callsgetFinalValue()and keeps the same blocking/timeout bug -- see inline - python/pyspark/tests/test_rdd.py:646 [non-blocking]:
test_count_approx_respects_timeoutpasses on the unfixed code too, so it doesn't guard the regression -- see inline
Verification
Traced the JVM unwrap path: PartialResult.getFinalValue() blocks (this.wait(), PartialResult.scala:33-42) until setFinalValue, which ApproximateActionListener only calls when finishedTasks == totalTasks (ApproximateActionListener.scala:50-52) -- so the old code waited for full completion. initialValue is the value awaitResult() returns at timeout (isFinal=false) or on completion (isFinal=true), so switching to it correctly respects timeout while still returning the exact result when the job finishes in time. countApprox inherits the fix via its delegation to sumApprox; meanApprox does not (still getFinalValue()).
| jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd()) | ||
| r = jdrdd.sumApprox(timeout, confidence).getFinalValue() | ||
| partial = jdrdd.sumApprox(timeout, confidence) | ||
| r = partial.initialValue() |
There was a problem hiding this comment.
The fix here is correct, but it's incomplete: meanApprox (line 4885) is the direct peer of sumApprox -- same JavaDoubleRDD approximate action returning a PartialResult[BoundedDouble] -- yet it still calls .getFinalValue(), which blocks until the entire job finishes (PartialResult.getFinalValue waits until setFinalValue, fired only when all tasks complete). So meanApprox(timeout=...) retains the exact bug this PR fixes for sumApprox/countApprox, and the two parallel paths now diverge.
The PR description frames the problem generically ("PySpark approximate RDD actions currently call getFinalValue() ..."), which reads as covering all three actions. Suggest applying the same change to meanApprox:
| r = partial.initialValue() | |
| partial = jdrdd.meanApprox(timeout, confidence) | |
| r = partial.initialValue() |
(this suggestion is for line 4885, shown here for reference). If meanApprox is intentionally out of scope, please narrow the PR description to say so explicitly.
There was a problem hiding this comment.
Thanks for the review.
I've updated meanApprox to use the same timeout-aware handling as sumApprox by switching from getFinalValue() to initialValue().
I also updated the regression test to use a deliberately slow workload so that the timeout behavior is observable. The previous version could complete quickly even with the blocking implementation and therefore did not reliably detect the regression.
Additionally, the test now cleans up the background approximate job before proceeding to avoid interference with subsequent tests.
Verified with: python/run-tests.py --testnames pyspark.tests.test_rdd
| start = time.time() | ||
| result = rdd.countApprox(timeout=100) | ||
| elapsed = time.time() - start | ||
| self.assertLess(elapsed, 10) |
There was a problem hiding this comment.
This test doesn't actually guard the regression it's named for. Summing 1M longs across 8 partitions completes in well under 10s in local mode even with the old, blocking getFinalValue() code, so assertLess(elapsed, 10) passes both before and after the fix -- the 10s ceiling is 100x the 100ms timeout against a sub-second job. assertIsNotNone(result) is also tautological since countApprox returns an int, never None.
Consider asserting the elapsed time is on the order of the timeout (not 100x it), or using a workload whose full completion provably exceeds the timeout, so the test distinguishes fixed from broken behavior. (test_count_approx_returns_exact_when_completed below is fine -- it meaningfully exercises the completed path.)
c49d15e to
ff71586
Compare
|
@cloud-fan PTAL. It's updated now |
ff71586 to
790344d
Compare
What changes were proposed in this pull request?
PySpark approximate RDD actions currently call getFinalValue() on the PartialResult returned by Spark approximate job APIs. This introduces blocking behavior and causes APIs such as countApprox(timeout=...), sumApprox(timeout=...), and meanApprox(timeout=...) to wait for full job completion instead of respecting timeout semantics. This PR updates PySpark to use PartialResult.initialValue(), which contains the timeout-aware approximation produced by ApproximateActionListener.awaitResult(). As a result, approximate RDD actions now return the partial result available at the specified timeout while still returning exact results when the computation completes within the timeout.
Additionally, regression tests were added and strengthened to validate:
Why are the changes needed?
Spark approximate actions are designed to return partial results after the specified timeout. Scala APIs already expose this behavior through PartialResult, but PySpark currently forces blocking completion by calling getFinalValue(). As a result, PySpark approximate actions ignore timeout semantics and wait for the entire job to finish, which is inconsistent with the intended behavior of Spark's approximate execution APIs.
Does this PR introduce any user-facing change?
Yes. PySpark approximate RDD actions (countApprox, sumApprox, and meanApprox) now correctly respect timeout semantics and may return timeout-aware approximate results instead of blocking until full job completion.
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
No