Skip to content

[SPARK-50520][PySpark] Respect timeout in df.rdd.countApprox()#56060

Open
rishav23 wants to merge 2 commits into
apache:masterfrom
rishav23:fix-spark-50520-countapprox-timeout
Open

[SPARK-50520][PySpark] Respect timeout in df.rdd.countApprox()#56060
rishav23 wants to merge 2 commits into
apache:masterfrom
rishav23:fix-spark-50520-countapprox-timeout

Conversation

@rishav23
Copy link
Copy Markdown
Contributor

@rishav23 rishav23 commented May 22, 2026

What changes were proposed in this pull request?

PySpark approximate RDD actions currently call getFinalValue() on the PartialResult returned by Spark approximate job APIs. This introduces blocking behavior and causes APIs such as countApprox(timeout=...), sumApprox(timeout=...), and meanApprox(timeout=...) to wait for full job completion instead of respecting timeout semantics. This PR updates PySpark to use PartialResult.initialValue(), which contains the timeout-aware approximation produced by ApproximateActionListener.awaitResult(). As a result, approximate RDD actions now return the partial result available at the specified timeout while still returning exact results when the computation completes within the timeout.
Additionally, regression tests were added and strengthened to validate:

  • timeout-aware approximate behavior using a deliberately slow workload,
  • exact results when computation completes successfully,
  • cleanup of background approximate jobs to avoid interference with subsequent tests.

Why are the changes needed?

Spark approximate actions are designed to return partial results after the specified timeout. Scala APIs already expose this behavior through PartialResult, but PySpark currently forces blocking completion by calling getFinalValue(). As a result, PySpark approximate actions ignore timeout semantics and wait for the entire job to finish, which is inconsistent with the intended behavior of Spark's approximate execution APIs.

Does this PR introduce any user-facing change?

Yes. PySpark approximate RDD actions (countApprox, sumApprox, and meanApprox) now correctly respect timeout semantics and may return timeout-aware approximate results instead of blocking until full job completion.

How was this patch tested?

  • Reproduced the issue locally using a slow-running RDD workload.
  • Verified timeout behavior before and after the fix.
  • Verified exact results are still returned when the computation completes within the timeout.
  • Added regression tests in python/pyspark/tests/test_rdd.py.
  • Ran: python/run-tests.py --testnames pyspark.tests.test_rdd
  • Manually validated timeout and completed-result behavior for countApprox, sumApprox, and meanApprox.

Was this patch authored or co-authored using generative AI tooling?

No

@rishav23
Copy link
Copy Markdown
Contributor Author

@hvanhovell PTAL when you have a moment. Thanks!

@rishav23
Copy link
Copy Markdown
Contributor Author

rishav23 commented Jun 2, 2026

@cloud-fan PTAL . Thanks

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 blocking, 1 non-blocking, 0 nits.
The sumApprox -> initialValue change is correct and well-targeted; two gaps below.

Correctness (2)

  • python/pyspark/core/rdd.py:4847 [blocking]: fix is incomplete -- meanApprox (rdd.py:4885) still calls getFinalValue() and keeps the same blocking/timeout bug -- see inline
  • python/pyspark/tests/test_rdd.py:646 [non-blocking]: test_count_approx_respects_timeout passes on the unfixed code too, so it doesn't guard the regression -- see inline

Verification

Traced the JVM unwrap path: PartialResult.getFinalValue() blocks (this.wait(), PartialResult.scala:33-42) until setFinalValue, which ApproximateActionListener only calls when finishedTasks == totalTasks (ApproximateActionListener.scala:50-52) -- so the old code waited for full completion. initialValue is the value awaitResult() returns at timeout (isFinal=false) or on completion (isFinal=true), so switching to it correctly respects timeout while still returning the exact result when the job finishes in time. countApprox inherits the fix via its delegation to sumApprox; meanApprox does not (still getFinalValue()).

jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
partial = jdrdd.sumApprox(timeout, confidence)
r = partial.initialValue()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix here is correct, but it's incomplete: meanApprox (line 4885) is the direct peer of sumApprox -- same JavaDoubleRDD approximate action returning a PartialResult[BoundedDouble] -- yet it still calls .getFinalValue(), which blocks until the entire job finishes (PartialResult.getFinalValue waits until setFinalValue, fired only when all tasks complete). So meanApprox(timeout=...) retains the exact bug this PR fixes for sumApprox/countApprox, and the two parallel paths now diverge.

The PR description frames the problem generically ("PySpark approximate RDD actions currently call getFinalValue() ..."), which reads as covering all three actions. Suggest applying the same change to meanApprox:

Suggested change
r = partial.initialValue()
partial = jdrdd.meanApprox(timeout, confidence)
r = partial.initialValue()

(this suggestion is for line 4885, shown here for reference). If meanApprox is intentionally out of scope, please narrow the PR description to say so explicitly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review.
I've updated meanApprox to use the same timeout-aware handling as sumApprox by switching from getFinalValue() to initialValue().
I also updated the regression test to use a deliberately slow workload so that the timeout behavior is observable. The previous version could complete quickly even with the blocking implementation and therefore did not reliably detect the regression.
Additionally, the test now cleans up the background approximate job before proceeding to avoid interference with subsequent tests.
Verified with: python/run-tests.py --testnames pyspark.tests.test_rdd

Comment thread python/pyspark/tests/test_rdd.py Outdated
start = time.time()
result = rdd.countApprox(timeout=100)
elapsed = time.time() - start
self.assertLess(elapsed, 10)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't actually guard the regression it's named for. Summing 1M longs across 8 partitions completes in well under 10s in local mode even with the old, blocking getFinalValue() code, so assertLess(elapsed, 10) passes both before and after the fix -- the 10s ceiling is 100x the 100ms timeout against a sub-second job. assertIsNotNone(result) is also tautological since countApprox returns an int, never None.

Consider asserting the elapsed time is on the order of the timeout (not 100x it), or using a workload whose full completion provably exceeds the timeout, so the test distinguishes fixed from broken behavior. (test_count_approx_returns_exact_when_completed below is fine -- it meaningfully exercises the completed path.)

@rishav23 rishav23 force-pushed the fix-spark-50520-countapprox-timeout branch from c49d15e to ff71586 Compare June 3, 2026 09:07
@rishav23
Copy link
Copy Markdown
Contributor Author

rishav23 commented Jun 3, 2026

@cloud-fan PTAL. It's updated now

@rishav23 rishav23 force-pushed the fix-spark-50520-countapprox-timeout branch from ff71586 to 790344d Compare June 3, 2026 09:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants