Skip to content

[SPARK-57212][SQL] Track preparation rule timing in QueryPlanningTracker#56275

Open
peter-toth wants to merge 3 commits into
apache:masterfrom
peter-toth:SPARK-57212-track-preparation-rules
Open

[SPARK-57212][SQL] Track preparation rule timing in QueryPlanningTracker#56275
peter-toth wants to merge 3 commits into
apache:masterfrom
peter-toth:SPARK-57212-track-preparation-rules

Conversation

@peter-toth
Copy link
Copy Markdown
Contributor

@peter-toth peter-toth commented Jun 2, 2026

What changes were proposed in this pull request?

Thread an optional QueryPlanningTracker parameter into QueryExecution.prepareForExecution and AdaptiveSparkPlanExec.applyPhysicalRules, and record per-rule timing the same way QueryExecution.normalize already does. The primary call site in lazyExecutedPlan, the five in-class call sites in AdaptiveSparkPlanExec (initial preparations, query post-planner strategy rules, post-stage creation -- both result and intermediate -- and AQE replanning), and the preprocessing call site in InsertAdaptiveSparkPlan all pass the tracker. AdaptiveSparkPlanExec.reOptimize uses optimizer.executeAndTrack instead of optimizer.execute so AQE re-optimizer rule timing is also recorded.

The tracker is only passed when the surrounding AdaptiveSparkPlanExec is the outer (non-subquery) plan: a new mainQueryTracker: Option[QueryPlanningTracker] private val resolves to Some(context.qe.tracker) when !isSubquery and None otherwise, and the InsertAdaptiveSparkPlan call site applies the same gate. Sub-AQE -- the AdaptiveSparkPlanExec that InsertAdaptiveSparkPlan.compileSubquery builds for scalar / IN / DPP subqueries -- shares its outer query's tracker via AdaptiveExecutionContext.qe.tracker and runs its own getFinalPhysicalPlan on SubqueryExec.executionContext / BroadcastExchangeExec.executionContext, so passing the outer tracker into sub-AQE would write to the same tracker from multiple threads. QueryPlanningTracker is documented as single-threaded -- its withTracker mechanism is a ThreadLocal and recordRuleInvocation mutates plain java.util.HashMaps -- and skipping the tracker in sub-AQE preserves that contract without requiring synchronization.

After this change, preparation rules of the main query (EnsureRequirements, CollapseCodegenStages, ReuseExchangeAndSubquery, etc.) and outer-AQE rules (AdjustShuffleExchangePosition, ValidateSparkPlan, OptimizeSkewedJoin, PlanAdaptiveSubqueries, etc.) become visible via QueryPlanningTracker.rules and topRulesByTime.

The following preparation paths keep the default None and are intentionally left as follow-ups:

  • QueryExecution.prepareExecutedPlan(spark, plan) -- subquery preparation called from PlanSubqueries.
  • QueryExecution.prepareExecutedPlan(plan, context) -- AQE dynamic pruning subquery preparation called from PlanAdaptiveDynamicPruningFilters.
  • AdaptiveSparkPlanExec.optimizeQueryStage -- the per-stage queryStageOptimizerRules foldLeft, which has its own AQEShuffleReadRule rollback handling and is structurally different from applyPhysicalRules.
  • Sub-AQE (the AdaptiveSparkPlanExec for AQE-wrapped subqueries) -- per the thread-safety reasoning above. Per-subquery tracking is a larger redesign best done together with the other deferred subquery paths.

Why are the changes needed?

Preparation rules ran through plain foldLeft patterns and bypassed RuleExecutor.execute, so neither RuleExecutor.dumpTimeSpent() nor QueryPlanningTracker.topRulesByTime reported them. Their wall-clock time was only visible as part of the planning phase total. For real workloads, a long-running preparation rule -- e.g. EnsureRequirements over a key-grouped join with many partitions, or AQE rules applied per stage -- was invisible per-rule, which made diagnosing planning-time gaps hard. QueryExecution.normalize already takes an optional tracker and records per-rule timing in exactly this shape; this change extends the same precedent to prepareForExecution and to outer-AQE's applyPhysicalRules, plus switches the outer-AQE re-optimizer call to its tracking variant.

Does this PR introduce any user-facing change?

No. Observability only. tracker.rules and tracker.topRulesByTime(...) now contain entries for outer-query preparation and outer-AQE rules in addition to analyzer/optimizer rules. No plan changes, no public API additions.

How was this patch tested?

New tests in QueryPlanningTrackerEndToEndSuite:

  • SPARK-57212: Track preparation rules -- a non-shuffle query asserts EnsureRequirements and CollapseCodegenStages appear in tracker.rules.
  • SPARK-57212: Track AQE-internal preparation rules -- a shuffle query asserts AQE-only rules AdjustShuffleExchangePosition and ValidateSparkPlan appear, confirming the outer-AQE side of the wiring.

build/sbt 'sql/testOnly *QueryPlanningTrackerEndToEndSuite'

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.7

### What changes were proposed in this pull request?

Thread an optional `QueryPlanningTracker` parameter into `QueryExecution.prepareForExecution` and `AdaptiveSparkPlanExec.applyPhysicalRules`, and record per-rule timing the same way `QueryExecution.normalize` already does. The primary call site in `lazyExecutedPlan`, the five in-class call sites in `AdaptiveSparkPlanExec` (initial preparations, query post-planner strategy rules, post-stage creation -- both result and intermediate -- and AQE replanning), and the preprocessing call site in `InsertAdaptiveSparkPlan` all pass `Some(context.qe.tracker)`. `AdaptiveSparkPlanExec.reOptimize` now uses `optimizer.executeAndTrack` instead of `optimizer.execute` so AQE re-optimizer rule timing is also recorded.

After this change, preparation rules of the main query (`EnsureRequirements`, `CollapseCodegenStages`, `ReuseExchangeAndSubquery`, etc.) and AQE-only rules (`AdjustShuffleExchangePosition`, `ValidateSparkPlan`, `OptimizeSkewedJoin`, `PlanAdaptiveSubqueries`, etc.) become visible via `QueryPlanningTracker.rules` and `topRulesByTime`.

The following preparation paths keep the default `None` and are intentionally left as follow-ups:
- `QueryExecution.prepareExecutedPlan(spark, plan)` -- subquery preparation called from `PlanSubqueries`.
- `QueryExecution.prepareExecutedPlan(plan, context)` -- AQE dynamic pruning subquery preparation called from `PlanAdaptiveDynamicPruningFilters`.
- `AdaptiveSparkPlanExec.optimizeQueryStage` -- the per-stage `queryStageOptimizerRules` foldLeft, which has its own `AQEShuffleReadRule` rollback handling and is structurally different from `applyPhysicalRules`.

### Why are the changes needed?

Preparation rules ran through plain `foldLeft` patterns and bypassed `RuleExecutor.execute`, so neither `RuleExecutor.dumpTimeSpent()` nor `QueryPlanningTracker.topRulesByTime` reported them. Their wall-clock time was only visible as part of the `planning` phase total. For real workloads, a long-running preparation rule -- e.g. `EnsureRequirements` over a key-grouped join with many partitions, or AQE rules applied per stage -- was invisible per-rule, which made diagnosing planning-time gaps hard. `QueryExecution.normalize` already takes an optional tracker and records per-rule timing in exactly this shape; this change extends the same precedent to `prepareForExecution` and to `AdaptiveSparkPlanExec.applyPhysicalRules`, plus switches the AQE re-optimizer call to its tracking variant.

### Does this PR introduce _any_ user-facing change?

No. Observability only. `tracker.rules` and `tracker.topRulesByTime(...)` now contain entries for preparation and AQE rules in addition to analyzer/optimizer rules. No plan changes, no public API additions.

### How was this patch tested?

New tests in `QueryPlanningTrackerEndToEndSuite`:
- `SPARK-57212: Track preparation rules` -- a non-shuffle query asserts `EnsureRequirements` and `CollapseCodegenStages` appear in `tracker.rules`.
- `SPARK-57212: Track AQE-internal preparation rules` -- a shuffle query asserts AQE-only rules `AdjustShuffleExchangePosition` and `ValidateSparkPlan` appear, confirming the AQE side of the wiring.

`build/sbt 'sql/testOnly *QueryPlanningTrackerEndToEndSuite'`

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.7
Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryPlanningTracker.recordRuleInvocation is not thread-safe because of HashMap. Could you wrap it with synchronized?

def recordRuleInvocation(rule: String, timeNs: Long, effective: Boolean): Unit = {
var s = rulesMap.get(rule)
if (s eq null) {
s = new RuleSummary
rulesMap.put(rule, s)
}
s.totalTimeNs += timeNs
s.numInvocations += 1
s.numEffectiveInvocations += (if (effective) 1 else 0)
}

### What changes were proposed in this pull request?

Thread an optional `QueryPlanningTracker` parameter into `QueryExecution.prepareForExecution` and `AdaptiveSparkPlanExec.applyPhysicalRules`, and record per-rule timing the same way `QueryExecution.normalize` already does. The primary call site in `lazyExecutedPlan`, the five in-class call sites in `AdaptiveSparkPlanExec` (initial preparations, query post-planner strategy rules, post-stage creation -- both result and intermediate -- and AQE replanning), and the preprocessing call site in `InsertAdaptiveSparkPlan` all pass the tracker. `AdaptiveSparkPlanExec.reOptimize` uses `optimizer.executeAndTrack` instead of `optimizer.execute` so AQE re-optimizer rule timing is also recorded.

The tracker is only passed when the surrounding `AdaptiveSparkPlanExec` is the outer (non-subquery) plan: a new `mainQueryTracker: Option[QueryPlanningTracker]` private val resolves to `Some(context.qe.tracker)` when `!isSubquery` and `None` otherwise, and the `InsertAdaptiveSparkPlan` call site applies the same gate. Sub-AQE -- the `AdaptiveSparkPlanExec` that `InsertAdaptiveSparkPlan.compileSubquery` builds for scalar / IN / DPP subqueries -- shares its outer query's tracker via `AdaptiveExecutionContext.qe.tracker` and runs its own `getFinalPhysicalPlan` on `SubqueryExec.executionContext` / `BroadcastExchangeExec.executionContext`, so passing the outer tracker into sub-AQE would write to the same tracker from multiple threads. `QueryPlanningTracker` is documented as single-threaded -- its `withTracker` mechanism is a `ThreadLocal` and `recordRuleInvocation` mutates plain `java.util.HashMap`s -- and skipping the tracker in sub-AQE preserves that contract without requiring synchronization.

After this change, preparation rules of the main query (`EnsureRequirements`, `CollapseCodegenStages`, `ReuseExchangeAndSubquery`, etc.) and outer-AQE rules (`AdjustShuffleExchangePosition`, `ValidateSparkPlan`, `OptimizeSkewedJoin`, `PlanAdaptiveSubqueries`, etc.) become visible via `QueryPlanningTracker.rules` and `topRulesByTime`.

The following preparation paths keep the default `None` and are intentionally left as follow-ups:
- `QueryExecution.prepareExecutedPlan(spark, plan)` -- subquery preparation called from `PlanSubqueries`.
- `QueryExecution.prepareExecutedPlan(plan, context)` -- AQE dynamic pruning subquery preparation called from `PlanAdaptiveDynamicPruningFilters`.
- `AdaptiveSparkPlanExec.optimizeQueryStage` -- the per-stage `queryStageOptimizerRules` foldLeft, which has its own `AQEShuffleReadRule` rollback handling and is structurally different from `applyPhysicalRules`.
- Sub-AQE (the `AdaptiveSparkPlanExec` for AQE-wrapped subqueries) -- per the thread-safety reasoning above. Per-subquery tracking is a larger redesign best done together with the other deferred subquery paths.

### Why are the changes needed?

Preparation rules ran through plain `foldLeft` patterns and bypassed `RuleExecutor.execute`, so neither `RuleExecutor.dumpTimeSpent()` nor `QueryPlanningTracker.topRulesByTime` reported them. Their wall-clock time was only visible as part of the `planning` phase total. For real workloads, a long-running preparation rule -- e.g. `EnsureRequirements` over a key-grouped join with many partitions, or AQE rules applied per stage -- was invisible per-rule, which made diagnosing planning-time gaps hard. `QueryExecution.normalize` already takes an optional tracker and records per-rule timing in exactly this shape; this change extends the same precedent to `prepareForExecution` and to outer-AQE's `applyPhysicalRules`, plus switches the outer-AQE re-optimizer call to its tracking variant.

### Does this PR introduce _any_ user-facing change?

No. Observability only. `tracker.rules` and `tracker.topRulesByTime(...)` now contain entries for outer-query preparation and outer-AQE rules in addition to analyzer/optimizer rules. No plan changes, no public API additions.

### How was this patch tested?

New tests in `QueryPlanningTrackerEndToEndSuite`:
- `SPARK-57212: Track preparation rules` -- a non-shuffle query asserts `EnsureRequirements` and `CollapseCodegenStages` appear in `tracker.rules`.
- `SPARK-57212: Track AQE-internal preparation rules` -- a shuffle query asserts AQE-only rules `AdjustShuffleExchangePosition` and `ValidateSparkPlan` appear, confirming the outer-AQE side of the wiring.

`build/sbt 'sql/testOnly *QueryPlanningTrackerEndToEndSuite'`

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.7
@dongjoon-hyun dongjoon-hyun dismissed their stale review June 2, 2026 16:11

Stale review.

@peter-toth
Copy link
Copy Markdown
Contributor Author

peter-toth commented Jun 2, 2026

QueryPlanningTracker.recordRuleInvocation is not thread-safe because of HashMap. Could you wrap it with synchronized?

def recordRuleInvocation(rule: String, timeNs: Long, effective: Boolean): Unit = {
var s = rulesMap.get(rule)
if (s eq null) {
s = new RuleSummary
rulesMap.put(rule, s)
}
s.totalTimeNs += timeNs
s.numInvocations += 1
s.numEffectiveInvocations += (if (effective) 1 else 0)
}

Yeah, good catch! Actually I didn't want to deal with subqueries yet. Adjusted with 27602b5 so currently no concurrency issue should occur.

@peter-toth peter-toth force-pushed the SPARK-57212-track-preparation-rules branch from 27602b5 to 7f64aba Compare June 3, 2026 10:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants