Skip to content

compute: move MV sink persist I/O off Timely thread#35328

Draft
antiguru wants to merge 6 commits intoMaterializeInc:mainfrom
antiguru:deasync
Draft

compute: move MV sink persist I/O off Timely thread#35328
antiguru wants to merge 6 commits intoMaterializeInc:mainfrom
antiguru:deasync

Conversation

@antiguru
Copy link
Member

@antiguru antiguru commented Mar 5, 2026

Move persist I/O in the materialized view sink operators (mint, write, append) off the Timely thread into spawned Tokio tasks.

The Timely operators use OperatorBuilderRc::build_reschedule directly, keeping !Send state (corrections, logging, capabilities) on the Timely thread.
Communication with Tokio tasks uses mpsc::unbounded_channel and SyncActivator for waking.

🤖 Generated with Claude Code

@github-actions
Copy link

github-actions bot commented Mar 5, 2026

Thanks for opening this PR! Here are a few tips to help make the review process smooth for everyone.

PR title guidelines

  • Use imperative mood: "Fix X" not "Fixed X" or "Fixes X"
  • Be specific: "Fix panic in catalog sync when controller restarts" not "Fix bug" or "Update catalog code"
  • Prefix with area if helpful: compute: , storage: , adapter: , sql:

Pre-merge checklist

  • The PR title is descriptive and will make sense in the git log.
  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).

Copy link
Contributor

@teskje teskje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention checks out to me. I'll note that for me this makes it noticeably harder to follow the logic and be sure that it's still correct, so we should only make this change if we have measured performance improvements. Aiui, the thinking is that Timely activations are expensive enough to justify the overhead of the additional tokio tasks and channels. It'd be great to have some numbers!

Move persist I/O (opening writers, writing batches, watching frontiers,
appending) from the Timely thread to Tokio tasks. The Timely operators
use `OperatorBuilderRc::build` (sync) and communicate with Tokio tasks
via channels.

Key changes:
* Replace `AsyncOperatorBuilder` with `OperatorBuilderRc` for mint,
  write, and append operators.
* Remove shutdown buttons (unnecessary with `drop_dataflow`).
* Wrap spawned Tokio tasks with `AbortOnDropHandle`.
* Delete batch blobs on failed channel send to avoid leaking.
* Obtain `SinkMetrics` synchronously from `PersistClientCache`, removing
  lazy state init, the init oneshot channel, and all pending buffers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@antiguru antiguru force-pushed the deasync branch 2 times, most recently from c6a2b46 to 03d3c19 Compare March 10, 2026 10:05
antiguru and others added 2 commits March 10, 2026 11:23
Move the corrections buffer (OkErr<Correction<Row>, Correction<DataflowError>>)
from the Timely thread to the Tokio write task. This eliminates the
intermediate Vec allocation when writing batches: ok and err correction
iterators are chained directly via peekable() and passed to
writer.batch().

The Timely operator sends commands (inserts, frontier advances,
write requests) over an mpsc channel. The Tokio task owns the
corrections buffer and WriteHandle, processes commands, and sends
batch results back.

Since Correction uses Rc-based logging (not Send), introduce
ChannelLogging which sends LoggingEvent over an mpsc channel back to
the Timely thread. CorrectionLogger on the Timely side drains these
events each operator activation.

Add ArcActivator to timely-util: a Send-safe SyncActivator wrapper
that coalesces redundant activations using an AtomicBool. The Tokio
task calls activate() after processing commands; the Timely operator
acknowledges via ActivationAck::ack(). Also removes ActivatorTrait
(single implementor) in favor of concrete RcActivator in replay.rs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The Tokio task handles for the write and append operators were captured
in the `build()` constructor closure but not referenced by the inner
schedule closure. Since the schedule closure is `move`, it only captures
what it references, so the task handles were dropped when the constructor
returned, immediately aborting both tasks. The `let _ = cmd_tx.send(...)`
pattern silently discarded the resulting errors, causing a hang.

Fix by referencing the task handles inside the schedule closure so they
are captured and kept alive for the operator's lifetime. Also change
channel sends to `expect()` so similar issues surface as panics.

Additional fixes:
* Send `WriteResponse { batch: None }` for empty corrections instead of
  dropping the response, which left `batch_in_flight` permanently set.
* Move `descs.broadcast()` to the call site to avoid double-broadcast.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@antiguru antiguru force-pushed the deasync branch 2 times, most recently from 50d1dda to 418e52b Compare March 10, 2026 13:33
Same task handle capture bug as write/append: the persist_watch task
handles were dropped at the end of the constructor closure. Move the
reference into the schedule closure.

Also check for `Disconnected` on `persist_rx.try_recv()` instead of
silently ignoring it via `while let Ok(...)`.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
antiguru and others added 2 commits March 10, 2026 15:19
Merge the three `worker_id == active_worker_id` checks into one block
that sets up background tasks and state before `builder.build()`.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When the write operator is dropped (e.g., during dataflow cancellation),
the Tokio task holding the Correction buffers is aborted asynchronously.
Its destructors may not have run yet, so ChainDropped events may not
arrive in the logging channel.

Move the channel receiver into CorrectionLogger and track net
batch/record counts. On drop, drain any available events, then emit
DropEvents to retract any outstanding differential logging state.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants