[SPARK-57212][SQL] Track preparation rule timing in QueryPlanningTracker#56275
Open
peter-toth wants to merge 3 commits into
Open
[SPARK-57212][SQL] Track preparation rule timing in QueryPlanningTracker#56275peter-toth wants to merge 3 commits into
peter-toth wants to merge 3 commits into
Conversation
### What changes were proposed in this pull request? Thread an optional `QueryPlanningTracker` parameter into `QueryExecution.prepareForExecution` and `AdaptiveSparkPlanExec.applyPhysicalRules`, and record per-rule timing the same way `QueryExecution.normalize` already does. The primary call site in `lazyExecutedPlan`, the five in-class call sites in `AdaptiveSparkPlanExec` (initial preparations, query post-planner strategy rules, post-stage creation -- both result and intermediate -- and AQE replanning), and the preprocessing call site in `InsertAdaptiveSparkPlan` all pass `Some(context.qe.tracker)`. `AdaptiveSparkPlanExec.reOptimize` now uses `optimizer.executeAndTrack` instead of `optimizer.execute` so AQE re-optimizer rule timing is also recorded. After this change, preparation rules of the main query (`EnsureRequirements`, `CollapseCodegenStages`, `ReuseExchangeAndSubquery`, etc.) and AQE-only rules (`AdjustShuffleExchangePosition`, `ValidateSparkPlan`, `OptimizeSkewedJoin`, `PlanAdaptiveSubqueries`, etc.) become visible via `QueryPlanningTracker.rules` and `topRulesByTime`. The following preparation paths keep the default `None` and are intentionally left as follow-ups: - `QueryExecution.prepareExecutedPlan(spark, plan)` -- subquery preparation called from `PlanSubqueries`. - `QueryExecution.prepareExecutedPlan(plan, context)` -- AQE dynamic pruning subquery preparation called from `PlanAdaptiveDynamicPruningFilters`. - `AdaptiveSparkPlanExec.optimizeQueryStage` -- the per-stage `queryStageOptimizerRules` foldLeft, which has its own `AQEShuffleReadRule` rollback handling and is structurally different from `applyPhysicalRules`. ### Why are the changes needed? Preparation rules ran through plain `foldLeft` patterns and bypassed `RuleExecutor.execute`, so neither `RuleExecutor.dumpTimeSpent()` nor `QueryPlanningTracker.topRulesByTime` reported them. Their wall-clock time was only visible as part of the `planning` phase total. For real workloads, a long-running preparation rule -- e.g. `EnsureRequirements` over a key-grouped join with many partitions, or AQE rules applied per stage -- was invisible per-rule, which made diagnosing planning-time gaps hard. `QueryExecution.normalize` already takes an optional tracker and records per-rule timing in exactly this shape; this change extends the same precedent to `prepareForExecution` and to `AdaptiveSparkPlanExec.applyPhysicalRules`, plus switches the AQE re-optimizer call to its tracking variant. ### Does this PR introduce _any_ user-facing change? No. Observability only. `tracker.rules` and `tracker.topRulesByTime(...)` now contain entries for preparation and AQE rules in addition to analyzer/optimizer rules. No plan changes, no public API additions. ### How was this patch tested? New tests in `QueryPlanningTrackerEndToEndSuite`: - `SPARK-57212: Track preparation rules` -- a non-shuffle query asserts `EnsureRequirements` and `CollapseCodegenStages` appear in `tracker.rules`. - `SPARK-57212: Track AQE-internal preparation rules` -- a shuffle query asserts AQE-only rules `AdjustShuffleExchangePosition` and `ValidateSparkPlan` appear, confirming the AQE side of the wiring. `build/sbt 'sql/testOnly *QueryPlanningTrackerEndToEndSuite'` ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7
dongjoon-hyun
previously requested changes
Jun 2, 2026
Member
dongjoon-hyun
left a comment
There was a problem hiding this comment.
QueryPlanningTracker.recordRuleInvocation is not thread-safe because of HashMap. Could you wrap it with synchronized?
### What changes were proposed in this pull request? Thread an optional `QueryPlanningTracker` parameter into `QueryExecution.prepareForExecution` and `AdaptiveSparkPlanExec.applyPhysicalRules`, and record per-rule timing the same way `QueryExecution.normalize` already does. The primary call site in `lazyExecutedPlan`, the five in-class call sites in `AdaptiveSparkPlanExec` (initial preparations, query post-planner strategy rules, post-stage creation -- both result and intermediate -- and AQE replanning), and the preprocessing call site in `InsertAdaptiveSparkPlan` all pass the tracker. `AdaptiveSparkPlanExec.reOptimize` uses `optimizer.executeAndTrack` instead of `optimizer.execute` so AQE re-optimizer rule timing is also recorded. The tracker is only passed when the surrounding `AdaptiveSparkPlanExec` is the outer (non-subquery) plan: a new `mainQueryTracker: Option[QueryPlanningTracker]` private val resolves to `Some(context.qe.tracker)` when `!isSubquery` and `None` otherwise, and the `InsertAdaptiveSparkPlan` call site applies the same gate. Sub-AQE -- the `AdaptiveSparkPlanExec` that `InsertAdaptiveSparkPlan.compileSubquery` builds for scalar / IN / DPP subqueries -- shares its outer query's tracker via `AdaptiveExecutionContext.qe.tracker` and runs its own `getFinalPhysicalPlan` on `SubqueryExec.executionContext` / `BroadcastExchangeExec.executionContext`, so passing the outer tracker into sub-AQE would write to the same tracker from multiple threads. `QueryPlanningTracker` is documented as single-threaded -- its `withTracker` mechanism is a `ThreadLocal` and `recordRuleInvocation` mutates plain `java.util.HashMap`s -- and skipping the tracker in sub-AQE preserves that contract without requiring synchronization. After this change, preparation rules of the main query (`EnsureRequirements`, `CollapseCodegenStages`, `ReuseExchangeAndSubquery`, etc.) and outer-AQE rules (`AdjustShuffleExchangePosition`, `ValidateSparkPlan`, `OptimizeSkewedJoin`, `PlanAdaptiveSubqueries`, etc.) become visible via `QueryPlanningTracker.rules` and `topRulesByTime`. The following preparation paths keep the default `None` and are intentionally left as follow-ups: - `QueryExecution.prepareExecutedPlan(spark, plan)` -- subquery preparation called from `PlanSubqueries`. - `QueryExecution.prepareExecutedPlan(plan, context)` -- AQE dynamic pruning subquery preparation called from `PlanAdaptiveDynamicPruningFilters`. - `AdaptiveSparkPlanExec.optimizeQueryStage` -- the per-stage `queryStageOptimizerRules` foldLeft, which has its own `AQEShuffleReadRule` rollback handling and is structurally different from `applyPhysicalRules`. - Sub-AQE (the `AdaptiveSparkPlanExec` for AQE-wrapped subqueries) -- per the thread-safety reasoning above. Per-subquery tracking is a larger redesign best done together with the other deferred subquery paths. ### Why are the changes needed? Preparation rules ran through plain `foldLeft` patterns and bypassed `RuleExecutor.execute`, so neither `RuleExecutor.dumpTimeSpent()` nor `QueryPlanningTracker.topRulesByTime` reported them. Their wall-clock time was only visible as part of the `planning` phase total. For real workloads, a long-running preparation rule -- e.g. `EnsureRequirements` over a key-grouped join with many partitions, or AQE rules applied per stage -- was invisible per-rule, which made diagnosing planning-time gaps hard. `QueryExecution.normalize` already takes an optional tracker and records per-rule timing in exactly this shape; this change extends the same precedent to `prepareForExecution` and to outer-AQE's `applyPhysicalRules`, plus switches the outer-AQE re-optimizer call to its tracking variant. ### Does this PR introduce _any_ user-facing change? No. Observability only. `tracker.rules` and `tracker.topRulesByTime(...)` now contain entries for outer-query preparation and outer-AQE rules in addition to analyzer/optimizer rules. No plan changes, no public API additions. ### How was this patch tested? New tests in `QueryPlanningTrackerEndToEndSuite`: - `SPARK-57212: Track preparation rules` -- a non-shuffle query asserts `EnsureRequirements` and `CollapseCodegenStages` appear in `tracker.rules`. - `SPARK-57212: Track AQE-internal preparation rules` -- a shuffle query asserts AQE-only rules `AdjustShuffleExchangePosition` and `ValidateSparkPlan` appear, confirming the outer-AQE side of the wiring. `build/sbt 'sql/testOnly *QueryPlanningTrackerEndToEndSuite'` ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7
Contributor
Author
Yeah, good catch! Actually I didn't want to deal with subqueries yet. Adjusted with 27602b5 so currently no concurrency issue should occur. |
27602b5 to
7f64aba
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Thread an optional
QueryPlanningTrackerparameter intoQueryExecution.prepareForExecutionandAdaptiveSparkPlanExec.applyPhysicalRules, and record per-rule timing the same wayQueryExecution.normalizealready does. The primary call site inlazyExecutedPlan, the five in-class call sites inAdaptiveSparkPlanExec(initial preparations, query post-planner strategy rules, post-stage creation -- both result and intermediate -- and AQE replanning), and the preprocessing call site inInsertAdaptiveSparkPlanall pass the tracker.AdaptiveSparkPlanExec.reOptimizeusesoptimizer.executeAndTrackinstead ofoptimizer.executeso AQE re-optimizer rule timing is also recorded.The tracker is only passed when the surrounding
AdaptiveSparkPlanExecis the outer (non-subquery) plan: a newmainQueryTracker: Option[QueryPlanningTracker]private val resolves toSome(context.qe.tracker)when!isSubqueryandNoneotherwise, and theInsertAdaptiveSparkPlancall site applies the same gate. Sub-AQE -- theAdaptiveSparkPlanExecthatInsertAdaptiveSparkPlan.compileSubquerybuilds for scalar / IN / DPP subqueries -- shares its outer query's tracker viaAdaptiveExecutionContext.qe.trackerand runs its owngetFinalPhysicalPlanonSubqueryExec.executionContext/BroadcastExchangeExec.executionContext, so passing the outer tracker into sub-AQE would write to the same tracker from multiple threads.QueryPlanningTrackeris documented as single-threaded -- itswithTrackermechanism is aThreadLocalandrecordRuleInvocationmutates plainjava.util.HashMaps -- and skipping the tracker in sub-AQE preserves that contract without requiring synchronization.After this change, preparation rules of the main query (
EnsureRequirements,CollapseCodegenStages,ReuseExchangeAndSubquery, etc.) and outer-AQE rules (AdjustShuffleExchangePosition,ValidateSparkPlan,OptimizeSkewedJoin,PlanAdaptiveSubqueries, etc.) become visible viaQueryPlanningTracker.rulesandtopRulesByTime.The following preparation paths keep the default
Noneand are intentionally left as follow-ups:QueryExecution.prepareExecutedPlan(spark, plan)-- subquery preparation called fromPlanSubqueries.QueryExecution.prepareExecutedPlan(plan, context)-- AQE dynamic pruning subquery preparation called fromPlanAdaptiveDynamicPruningFilters.AdaptiveSparkPlanExec.optimizeQueryStage-- the per-stagequeryStageOptimizerRulesfoldLeft, which has its ownAQEShuffleReadRulerollback handling and is structurally different fromapplyPhysicalRules.AdaptiveSparkPlanExecfor AQE-wrapped subqueries) -- per the thread-safety reasoning above. Per-subquery tracking is a larger redesign best done together with the other deferred subquery paths.Why are the changes needed?
Preparation rules ran through plain
foldLeftpatterns and bypassedRuleExecutor.execute, so neitherRuleExecutor.dumpTimeSpent()norQueryPlanningTracker.topRulesByTimereported them. Their wall-clock time was only visible as part of theplanningphase total. For real workloads, a long-running preparation rule -- e.g.EnsureRequirementsover a key-grouped join with many partitions, or AQE rules applied per stage -- was invisible per-rule, which made diagnosing planning-time gaps hard.QueryExecution.normalizealready takes an optional tracker and records per-rule timing in exactly this shape; this change extends the same precedent toprepareForExecutionand to outer-AQE'sapplyPhysicalRules, plus switches the outer-AQE re-optimizer call to its tracking variant.Does this PR introduce any user-facing change?
No. Observability only.
tracker.rulesandtracker.topRulesByTime(...)now contain entries for outer-query preparation and outer-AQE rules in addition to analyzer/optimizer rules. No plan changes, no public API additions.How was this patch tested?
New tests in
QueryPlanningTrackerEndToEndSuite:SPARK-57212: Track preparation rules-- a non-shuffle query assertsEnsureRequirementsandCollapseCodegenStagesappear intracker.rules.SPARK-57212: Track AQE-internal preparation rules-- a shuffle query asserts AQE-only rulesAdjustShuffleExchangePositionandValidateSparkPlanappear, confirming the outer-AQE side of the wiring.build/sbt 'sql/testOnly *QueryPlanningTrackerEndToEndSuite'Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7