Skip to content

[WIP] Add operation metrics for UPDATE queries in DSv2#55141

Draft
ZiyaZa wants to merge 10 commits intoapache:masterfrom
ZiyaZa:dsv2-update-metrics
Draft

[WIP] Add operation metrics for UPDATE queries in DSv2#55141
ZiyaZa wants to merge 10 commits intoapache:masterfrom
ZiyaZa:dsv2-update-metrics

Conversation

@ZiyaZa
Copy link
Copy Markdown
Contributor

@ZiyaZa ZiyaZa commented Apr 1, 2026

What changes were proposed in this pull request?

Added numUpdatedRows and numCopiedRows metrics for UPDATE operations in DSv2. These metrics are calculated in the WritingSparkTask. For ReplaceData plans, we add a new attribute to each row to represent whether it's updated or copied.

Example simplified ReplaceData plan (notice __is_updated):

ReplaceData ...
      +- *(2) Sort [_partition#19 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(_partition#19, 5), REPARTITION_BY_COL, [plan_id=73]
                  +- *(1) Project [6 AS __row_operation#25, ((salary#15 <= 100) <=> true) AS __is_updated#24, pk#14, salary#15, if ((salary#15 <= 100)) invalid else dep#16 AS dep#22, _partition#19, if ((salary#15 <= 100)) null else index#18 AS index#23]
                     +- BatchScan ...

Why are the changes needed?

For better visibility into what happened as a result of an UPDATE query.

Does this PR introduce any user-facing change?

Yes.

How was this patch tested?

Added metric value validation to most UPDATE unit tests.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.6

Copy link
Copy Markdown

@andreaschat-db andreaschat-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ZiyaZa. Left a couple of comments.

public interface UpdateSummary extends WriteSummary {

/**
* Returns the number of rows updated.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we match the descriptions with other summaries such as MergeSummary?
E.g. Returns the number of rows updated. It returns -1 if not found.. Or is it the behavior different here (which shouldn't be)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually confused here, as I'm not sure what "not found" means. I guess for MERGE, if you don't have certain instructions like WHEN MATCHED, then some metrics won't make sense, but then my expectation would be to set them to 0 instead of -1. Here numUpdatedRows must always be set, it must be present. And I set numCopiedRows to 0 for WriteDelta plans because we don't copy anything, otherwise it's set to actual number of copied rows (which can also be 0) in ReplaceData plans. I need more explanation of what -1 means in MERGE metrics, to decide if the same applied here or not.

Copy link
Copy Markdown
Member

@szehon-ho szehon-ho Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, give some background, we chose to do the 'walk the graph' approach for existing Merge metrics (given the huge number of metrics and the convenient presence of MergeRowsExec). But the optimizer sometimes optimizes away MergeRowsExec, we dont have metrics yet (though it should be possible to infer the values, we just didnt do it yet). hence -1 if not found.

Can we always guarantee this metric exists? If so, i guess the comment is not needed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For these metrics, yes, they should not be removed from the plan as we calculate them via Nondeterministic expressions in a Filter node.

Regarding MERGE, just to clarify, -1 is a temporary solution, right? Once we fix all the cases to calculate metrics correctly, then we should be able to remove that default value of -1.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes thats right

* This is the unresolved form - resolved into IncrementMetricIf by a preparation rule.
*
* Marked as Nondeterministic to prevent the optimizer from pruning or reordering it.
* Cannot mix in [[Unevaluable]] because both [[Unevaluable]] and [[Nondeterministic]] declare
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is Unevaluable required? What is the consequence of not mixing it in?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Unresolved... expressions cannot be evaluated, and it should throw when trying to evaluate these. The consequence is needing to duplicate the logic of Unevaluable here, which is implementing functions evalInternal and doGenCode to always throw.

Comment on lines +364 to +365
ReplaceDataExec(planLater(query), refreshCache(r), projections, write,
Some(rd.operation.command())) :: Nil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking that ReplaceDataExec/WriteDataExec are by design command agnostic. By passing down the command is a bit intrusive. I do see that we already blurred the boundaries in getWriteSummary by checking for MergeRowsExec.

What if in ReplaceDataExec/WriteDataExec we just pass a writeSummaryBuilder callback. DataSourceV2Strategy knows which command is being executed and can orchestrate this. Would something like that make sense?

This will have some repercussions at ResolveIncrementMetrics below (Perhaps it will simplify it?).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReplaceDataExec/WriteDataExec are by design command agnostic

I'm not so sure about this. It might be that we didn't need to know the executed command before, but now we do. To me, it makes sense that the physical plan nodes know what command they are performing.

What if in ReplaceDataExec/WriteDataExec we just pass a writeSummaryBuilder callback.

I'm not a huge fan of callbacks, as it makes the code flow complex to understand. We'll move the command-handling logic to DataSourceV2Strategy, and maybe it's also supposed to be command-agnostic by design.

This will have some repercussions at ResolveIncrementMetrics below (Perhaps it will simplify it?).

I think if we do this, then we'll move the code that creates SQLMetric instances to DataSourceV2Strategy. We can also do the initial expression resolution there when we build Exec nodes. So we could remove ResolveIncrementMetrics rule entirely by moving its code to DataSourceV2Strategy, and only keep ResolveIncrementMetric rule for AQE.

Either option seems fine here. I slightly prefer the current approach because it has a separate rule for metric resolution and keeps DataSourceV2Strategy clean from metric logic, and V2Write nodes already have metrics field and logic for Merge metrics. I'll keep this open for input from one of the committers in case they have opinions about how V2Write nodes should be designed.

condition = Not(EqualNullSafe(cond, Literal.TrueLiteral)),
returnExpr = updatedRowsMetric,
metricName = "numCopiedRows")
val readRelationWithMetrics = Filter(copiedRowsMetric, readRelation)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My 2 cents here is that I am not sure if this is better than having too separate filters (or a conjunction of 2 metrics). It looks quite complex to me. It is the fact that are two different conditions in a single metric that makes it convoluted.

Would something like that work:
IncrementMetricsIf(condition, trueMetricName, falseMetricName)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to add two separate filters. Having falseMetricName would require it to be optional, as we don't always have another metric to increment. And then it makes things more complicated in the body of IncrementMetric.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to have two metrics expressions. One IncrementMetricsIf(condition, trueMetricName) and a IncrementMetricsIfElse(condition, trueMetricName, falseMetricName) . So if there is no falseMetric we just use the first.

Copy link
Copy Markdown
Contributor Author

@ZiyaZa ZiyaZa Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would give us 4 expressions then, right? IncrementMetricIf, IncrementMetricIfThenReturn, IncrementMetricIfElse, IncrementMetricIfElseThenReturn. And then each one with Unresolved version.

I think we technically only need IncrementMetricIf and IncrementMetricIfElseThenReturn at the moment, but I don't know, one of these supporting only one metric, the other one two seemed weird.

Having IfElse would look cleaner in the plan, but we might be overdoing it here by having too many different expressions.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need just 2, i.e. IncrementMetricIf/IncrementMetricIfElse?

@szehon-ho
Copy link
Copy Markdown
Member

i think this logic is a bit brittle (trying to save the IncrementMetric through AQE). I think we should do it this way: add an attribute per row that indicates "updated" vs "copied," then count at the exec level:

  • In RewriteUpdateTable, add cond AS _is_updated boolean column to every row (for ReplaceData path)
  • The column flows through the plan as regular data -- no special expressions, no AQE issues
  • At ReplaceDataExec, the writing task reads the boolean, increments SQLMetric, and strips the column before passing to the writer
  • For WriteDelta: all rows passing the filter are "updated," so just count total rows (no extra attribute needed)

* @param condition the boolean expression to evaluate.
* @param metricName the name of the metric to increment.
*/
case class UnresolvedIncrementMetricIf(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name here is somewhat confusing as it is actually resolved as long as the condition is resolved? Am I right that this acts like a placeholder for a metric but the expression is resolved from the Catalyst perspective if I call expr.resolved? That's why it doesn't fail analysis?

Copy link
Copy Markdown
Contributor

@aokolnychyi aokolnychyi Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this because SQLMetric is an execution concept and we have to assign it to Exec node?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here "unresolved" entirely refers to the metric, not the condition. I believe condition is resolved by this point.

Do we need this because SQLMetric is an execution concept and we have to assign it to Exec node?

Yes. We cannot create SQLMetric instances in RewriteUpdateTable.

"numUpdatedRows",
"numCopiedRows"
).map { name =>
name -> SQLMetrics.createMetric(sc, name)
Copy link
Copy Markdown
Contributor

@aokolnychyi aokolnychyi Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can names conflict here? What if we have concurrency in the session?

@aokolnychyi
Copy link
Copy Markdown
Contributor

I am not a big fan of having unresolved / resolved metric expressions and the way we inject these seems fragile, especially the AQE part is very involved. Also, using a new Filter node doesn't seem like the best option. Ideally, I'd like to avoid both.

I think @szehon-ho idea is quite clever. @ZiyaZa, do you think it will work? Any concerns or cases when it won't work?

@aokolnychyi
Copy link
Copy Markdown
Contributor

Another alternative to explore is to have a single expression type (without unresolved) and attempt to resolve the accumulator / SQL metric by name. I don't know if it is possible.

@juliuszsompolski
Copy link
Copy Markdown
Contributor

I like @szehon-ho 's proposal here, this makes it simpler and cleaner indeed.
It leave the question open how to handle counting the number of source rows still.

@ZiyaZa
Copy link
Copy Markdown
Contributor Author

ZiyaZa commented Apr 7, 2026

Updated the PR to apply @szehon-ho's approach.

Comment on lines +110 to +113
val remainingRowsPlan = Filter(remainingRowFilter, readRelation)
val remainingRowsPlanWithFlag = Project(
remainingRowsPlan.output :+ Alias(FalseLiteral, IS_UPDATED_COLUMN)(),
remainingRowsPlan)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it's warranted to use a var remainigRowsPlan here, having such cascading vals fooWithBar, fooWithBarAndBaz, fooWithBarBazAndEgg etc. is prone to bugs later.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing it with var looked weird. I combined these two lines into one, and I think it looks better now.

Comment on lines +728 to +731
// When representUpdateAsDeleteAndInsert is true, each logical update is split
// into a DELETE and a REINSERT. Count the DELETE as one updated row.
case DELETE_OPERATION =>
numUpdatedRows.foreach(_.add(1L))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this code used by DELETE as well? maybe putting it on REINSERT operation instead would unquestionably give more certainty that it's used by UPDATE only.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used by DELETE as well, but numUpdatedRows will be None in that case because DELETE doesn't have such a metric.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check if REINSERT is used for UPDATE only. If yes, then I agree it's better to move there.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MERGE uses REINSERT as well, but I moved it there because it might be less ambiguous there since Merge has its own plan node.

@dongjoon-hyun dongjoon-hyun marked this pull request as draft April 8, 2026 21:50
case class DataAndMetadataWritingSparkTask(
dataProj: ProjectingInternalRow,
metadataProj: ProjectingInternalRow) extends WritingSparkTask[DataWriter[InternalRow]] {
metadataProj: ProjectingInternalRow,
metricCounter: Option[BooleanMetricCounter] = None)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a more specific name for the new field?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes to the writing tasks would require another look, it is hard to navigate / understand. We should attempt to simplify it.

@aokolnychyi
Copy link
Copy Markdown
Contributor

The new approach seems much better, let me know if you want to proceed @ZiyaZa. We will need to polish and add more tests.

* The writing task reads this column to increment operation metrics and strip it before passing
* data to the writer.
*/
private[sql] final val IS_UPDATED_COLUMN: String = "__is_updated"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we may consider reusing the operation type but make it more specific.

For example, you can add the following to RowDeltaUtils:

  final val WRITE_UPDATED_OPERATION: Int = 7  // Updated row in group-based write
  final val WRITE_COPIED_OPERATION: Int = 8   // Copied row in group-based write

Instead of WRITE_WITH_METADATA_OPERATION that's generic. Then use it like this:

// BEFORE
val remainingRowsPlan = Project(
  Alias(FalseLiteral, IS_UPDATED_COLUMN)() +: readRelation.output,
  Filter(remainingRowFilter, readRelation))

// AFTER
val remainingRowsPlan = Project(
  Alias(Literal(WRITE_COPIED_OPERATION), OPERATION_COLUMN)() +: readRelation.output,
  Filter(remainingRowFilter, readRelation))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then you can use the operation in writing tasks to compute metrics...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WRITE_UPDATED_OPERATION / WRITE_COPIED_OPERATION would also need to contain WITH_METADATA to show that there are metadata columns that need to be handled in writing tasks. So it becomes WRITE_UPDATED_WITH_METADATA_OPERATION and similar for copy.

Then in writing tasks we need to handle all three (old one + 2 new ones) where they'll mostly do the same thing, only difference being metric counting.

Regarding it's usage, we also need a change inside buildReplaceDataUpdateProjection. Instead of:

val isUpdatedCol = Alias(EqualNullSafe(cond, TrueLiteral), IS_UPDATED_COLUMN)()

we need a case-when here.

Not sure if it's better than what we have at the moment. To me it seems like both approaches have some complexity, I'm leaning towards keeping it as it is.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants