[WIP] Add operation metrics for UPDATE queries in DSv2#55141
[WIP] Add operation metrics for UPDATE queries in DSv2#55141ZiyaZa wants to merge 10 commits intoapache:masterfrom
Conversation
andreaschat-db
left a comment
There was a problem hiding this comment.
Thanks @ZiyaZa. Left a couple of comments.
| public interface UpdateSummary extends WriteSummary { | ||
|
|
||
| /** | ||
| * Returns the number of rows updated. |
There was a problem hiding this comment.
Shall we match the descriptions with other summaries such as MergeSummary?
E.g. Returns the number of rows updated. It returns -1 if not found.. Or is it the behavior different here (which shouldn't be)?
There was a problem hiding this comment.
I was actually confused here, as I'm not sure what "not found" means. I guess for MERGE, if you don't have certain instructions like WHEN MATCHED, then some metrics won't make sense, but then my expectation would be to set them to 0 instead of -1. Here numUpdatedRows must always be set, it must be present. And I set numCopiedRows to 0 for WriteDelta plans because we don't copy anything, otherwise it's set to actual number of copied rows (which can also be 0) in ReplaceData plans. I need more explanation of what -1 means in MERGE metrics, to decide if the same applied here or not.
There was a problem hiding this comment.
hi, give some background, we chose to do the 'walk the graph' approach for existing Merge metrics (given the huge number of metrics and the convenient presence of MergeRowsExec). But the optimizer sometimes optimizes away MergeRowsExec, we dont have metrics yet (though it should be possible to infer the values, we just didnt do it yet). hence -1 if not found.
Can we always guarantee this metric exists? If so, i guess the comment is not needed
There was a problem hiding this comment.
For these metrics, yes, they should not be removed from the plan as we calculate them via Nondeterministic expressions in a Filter node.
Regarding MERGE, just to clarify, -1 is a temporary solution, right? Once we fix all the cases to calculate metrics correctly, then we should be able to remove that default value of -1.
| * This is the unresolved form - resolved into IncrementMetricIf by a preparation rule. | ||
| * | ||
| * Marked as Nondeterministic to prevent the optimizer from pruning or reordering it. | ||
| * Cannot mix in [[Unevaluable]] because both [[Unevaluable]] and [[Nondeterministic]] declare |
There was a problem hiding this comment.
Why is Unevaluable required? What is the consequence of not mixing it in?
There was a problem hiding this comment.
Because Unresolved... expressions cannot be evaluated, and it should throw when trying to evaluate these. The consequence is needing to duplicate the logic of Unevaluable here, which is implementing functions evalInternal and doGenCode to always throw.
...yst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnresolvedIncrementMetric.scala
Outdated
Show resolved
Hide resolved
...yst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnresolvedIncrementMetric.scala
Outdated
Show resolved
Hide resolved
...yst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnresolvedIncrementMetric.scala
Outdated
Show resolved
Hide resolved
| ReplaceDataExec(planLater(query), refreshCache(r), projections, write, | ||
| Some(rd.operation.command())) :: Nil |
There was a problem hiding this comment.
I am thinking that ReplaceDataExec/WriteDataExec are by design command agnostic. By passing down the command is a bit intrusive. I do see that we already blurred the boundaries in getWriteSummary by checking for MergeRowsExec.
What if in ReplaceDataExec/WriteDataExec we just pass a writeSummaryBuilder callback. DataSourceV2Strategy knows which command is being executed and can orchestrate this. Would something like that make sense?
This will have some repercussions at ResolveIncrementMetrics below (Perhaps it will simplify it?).
There was a problem hiding this comment.
ReplaceDataExec/WriteDataExecare by design command agnostic
I'm not so sure about this. It might be that we didn't need to know the executed command before, but now we do. To me, it makes sense that the physical plan nodes know what command they are performing.
What if in
ReplaceDataExec/WriteDataExecwe just pass awriteSummaryBuildercallback.
I'm not a huge fan of callbacks, as it makes the code flow complex to understand. We'll move the command-handling logic to DataSourceV2Strategy, and maybe it's also supposed to be command-agnostic by design.
This will have some repercussions at
ResolveIncrementMetricsbelow (Perhaps it will simplify it?).
I think if we do this, then we'll move the code that creates SQLMetric instances to DataSourceV2Strategy. We can also do the initial expression resolution there when we build Exec nodes. So we could remove ResolveIncrementMetrics rule entirely by moving its code to DataSourceV2Strategy, and only keep ResolveIncrementMetric rule for AQE.
Either option seems fine here. I slightly prefer the current approach because it has a separate rule for metric resolution and keeps DataSourceV2Strategy clean from metric logic, and V2Write nodes already have metrics field and logic for Merge metrics. I'll keep this open for input from one of the committers in case they have opinions about how V2Write nodes should be designed.
sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
Show resolved
Hide resolved
| condition = Not(EqualNullSafe(cond, Literal.TrueLiteral)), | ||
| returnExpr = updatedRowsMetric, | ||
| metricName = "numCopiedRows") | ||
| val readRelationWithMetrics = Filter(copiedRowsMetric, readRelation) |
There was a problem hiding this comment.
My 2 cents here is that I am not sure if this is better than having too separate filters (or a conjunction of 2 metrics). It looks quite complex to me. It is the fact that are two different conditions in a single metric that makes it convoluted.
Would something like that work:
IncrementMetricsIf(condition, trueMetricName, falseMetricName)
There was a problem hiding this comment.
I changed it to add two separate filters. Having falseMetricName would require it to be optional, as we don't always have another metric to increment. And then it makes things more complicated in the body of IncrementMetric.
There was a problem hiding this comment.
I was thinking to have two metrics expressions. One IncrementMetricsIf(condition, trueMetricName) and a IncrementMetricsIfElse(condition, trueMetricName, falseMetricName) . So if there is no falseMetric we just use the first.
There was a problem hiding this comment.
That would give us 4 expressions then, right? IncrementMetricIf, IncrementMetricIfThenReturn, IncrementMetricIfElse, IncrementMetricIfElseThenReturn. And then each one with Unresolved version.
I think we technically only need IncrementMetricIf and IncrementMetricIfElseThenReturn at the moment, but I don't know, one of these supporting only one metric, the other one two seemed weird.
Having IfElse would look cleaner in the plan, but we might be overdoing it here by having too many different expressions.
There was a problem hiding this comment.
Don't we need just 2, i.e. IncrementMetricIf/IncrementMetricIfElse?
|
i think this logic is a bit brittle (trying to save the IncrementMetric through AQE). I think we should do it this way: add an attribute per row that indicates "updated" vs "copied," then count at the exec level:
|
| * @param condition the boolean expression to evaluate. | ||
| * @param metricName the name of the metric to increment. | ||
| */ | ||
| case class UnresolvedIncrementMetricIf( |
There was a problem hiding this comment.
The name here is somewhat confusing as it is actually resolved as long as the condition is resolved? Am I right that this acts like a placeholder for a metric but the expression is resolved from the Catalyst perspective if I call expr.resolved? That's why it doesn't fail analysis?
There was a problem hiding this comment.
Do we need this because SQLMetric is an execution concept and we have to assign it to Exec node?
There was a problem hiding this comment.
Here "unresolved" entirely refers to the metric, not the condition. I believe condition is resolved by this point.
Do we need this because
SQLMetricis an execution concept and we have to assign it to Exec node?
Yes. We cannot create SQLMetric instances in RewriteUpdateTable.
| "numUpdatedRows", | ||
| "numCopiedRows" | ||
| ).map { name => | ||
| name -> SQLMetrics.createMetric(sc, name) |
There was a problem hiding this comment.
Can names conflict here? What if we have concurrency in the session?
|
I am not a big fan of having unresolved / resolved metric expressions and the way we inject these seems fragile, especially the AQE part is very involved. Also, using a new I think @szehon-ho idea is quite clever. @ZiyaZa, do you think it will work? Any concerns or cases when it won't work? |
|
Another alternative to explore is to have a single expression type (without unresolved) and attempt to resolve the accumulator / SQL metric by name. I don't know if it is possible. |
|
I like @szehon-ho 's proposal here, this makes it simpler and cleaner indeed. |
|
Updated the PR to apply @szehon-ho's approach. |
| val remainingRowsPlan = Filter(remainingRowFilter, readRelation) | ||
| val remainingRowsPlanWithFlag = Project( | ||
| remainingRowsPlan.output :+ Alias(FalseLiteral, IS_UPDATED_COLUMN)(), | ||
| remainingRowsPlan) |
There was a problem hiding this comment.
nit: I think it's warranted to use a var remainigRowsPlan here, having such cascading vals fooWithBar, fooWithBarAndBaz, fooWithBarBazAndEgg etc. is prone to bugs later.
There was a problem hiding this comment.
Doing it with var looked weird. I combined these two lines into one, and I think it looks better now.
| // When representUpdateAsDeleteAndInsert is true, each logical update is split | ||
| // into a DELETE and a REINSERT. Count the DELETE as one updated row. | ||
| case DELETE_OPERATION => | ||
| numUpdatedRows.foreach(_.add(1L)) |
There was a problem hiding this comment.
nit: is this code used by DELETE as well? maybe putting it on REINSERT operation instead would unquestionably give more certainty that it's used by UPDATE only.
There was a problem hiding this comment.
It's used by DELETE as well, but numUpdatedRows will be None in that case because DELETE doesn't have such a metric.
There was a problem hiding this comment.
I'll check if REINSERT is used for UPDATE only. If yes, then I agree it's better to move there.
There was a problem hiding this comment.
MERGE uses REINSERT as well, but I moved it there because it might be less ambiguous there since Merge has its own plan node.
...e/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Outdated
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
Outdated
Show resolved
Hide resolved
| case class DataAndMetadataWritingSparkTask( | ||
| dataProj: ProjectingInternalRow, | ||
| metadataProj: ProjectingInternalRow) extends WritingSparkTask[DataWriter[InternalRow]] { | ||
| metadataProj: ProjectingInternalRow, | ||
| metricCounter: Option[BooleanMetricCounter] = None) |
There was a problem hiding this comment.
Should we have a more specific name for the new field?
There was a problem hiding this comment.
Changes to the writing tasks would require another look, it is hard to navigate / understand. We should attempt to simplify it.
|
The new approach seems much better, let me know if you want to proceed @ZiyaZa. We will need to polish and add more tests. |
...e/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Outdated
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
Outdated
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Outdated
Show resolved
Hide resolved
| * The writing task reads this column to increment operation metrics and strip it before passing | ||
| * data to the writer. | ||
| */ | ||
| private[sql] final val IS_UPDATED_COLUMN: String = "__is_updated" |
There was a problem hiding this comment.
Actually, we may consider reusing the operation type but make it more specific.
For example, you can add the following to RowDeltaUtils:
final val WRITE_UPDATED_OPERATION: Int = 7 // Updated row in group-based write
final val WRITE_COPIED_OPERATION: Int = 8 // Copied row in group-based write
Instead of WRITE_WITH_METADATA_OPERATION that's generic. Then use it like this:
// BEFORE
val remainingRowsPlan = Project(
Alias(FalseLiteral, IS_UPDATED_COLUMN)() +: readRelation.output,
Filter(remainingRowFilter, readRelation))
// AFTER
val remainingRowsPlan = Project(
Alias(Literal(WRITE_COPIED_OPERATION), OPERATION_COLUMN)() +: readRelation.output,
Filter(remainingRowFilter, readRelation))
There was a problem hiding this comment.
Then you can use the operation in writing tasks to compute metrics...
There was a problem hiding this comment.
WRITE_UPDATED_OPERATION / WRITE_COPIED_OPERATION would also need to contain WITH_METADATA to show that there are metadata columns that need to be handled in writing tasks. So it becomes WRITE_UPDATED_WITH_METADATA_OPERATION and similar for copy.
Then in writing tasks we need to handle all three (old one + 2 new ones) where they'll mostly do the same thing, only difference being metric counting.
Regarding it's usage, we also need a change inside buildReplaceDataUpdateProjection. Instead of:
val isUpdatedCol = Alias(EqualNullSafe(cond, TrueLiteral), IS_UPDATED_COLUMN)()we need a case-when here.
Not sure if it's better than what we have at the moment. To me it seems like both approaches have some complexity, I'm leaning towards keeping it as it is.
What changes were proposed in this pull request?
Added numUpdatedRows and numCopiedRows metrics for UPDATE operations in DSv2. These metrics are calculated in the
WritingSparkTask. For ReplaceData plans, we add a new attribute to each row to represent whether it's updated or copied.Example simplified ReplaceData plan (notice
__is_updated):Why are the changes needed?
For better visibility into what happened as a result of an UPDATE query.
Does this PR introduce any user-facing change?
Yes.
How was this patch tested?
Added metric value validation to most UPDATE unit tests.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.6