compute: move MV sink persist I/O off Timely thread#35328
Draft
antiguru wants to merge 6 commits intoMaterializeInc:mainfrom
Draft
compute: move MV sink persist I/O off Timely thread#35328antiguru wants to merge 6 commits intoMaterializeInc:mainfrom
antiguru wants to merge 6 commits intoMaterializeInc:mainfrom
Conversation
|
Thanks for opening this PR! Here are a few tips to help make the review process smooth for everyone. PR title guidelines
Pre-merge checklist
|
teskje
reviewed
Mar 9, 2026
Contributor
teskje
left a comment
There was a problem hiding this comment.
The intention checks out to me. I'll note that for me this makes it noticeably harder to follow the logic and be sure that it's still correct, so we should only make this change if we have measured performance improvements. Aiui, the thinking is that Timely activations are expensive enough to justify the overhead of the additional tokio tasks and channels. It'd be great to have some numbers!
Move persist I/O (opening writers, writing batches, watching frontiers, appending) from the Timely thread to Tokio tasks. The Timely operators use `OperatorBuilderRc::build` (sync) and communicate with Tokio tasks via channels. Key changes: * Replace `AsyncOperatorBuilder` with `OperatorBuilderRc` for mint, write, and append operators. * Remove shutdown buttons (unnecessary with `drop_dataflow`). * Wrap spawned Tokio tasks with `AbortOnDropHandle`. * Delete batch blobs on failed channel send to avoid leaking. * Obtain `SinkMetrics` synchronously from `PersistClientCache`, removing lazy state init, the init oneshot channel, and all pending buffers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
c6a2b46 to
03d3c19
Compare
Move the corrections buffer (OkErr<Correction<Row>, Correction<DataflowError>>) from the Timely thread to the Tokio write task. This eliminates the intermediate Vec allocation when writing batches: ok and err correction iterators are chained directly via peekable() and passed to writer.batch(). The Timely operator sends commands (inserts, frontier advances, write requests) over an mpsc channel. The Tokio task owns the corrections buffer and WriteHandle, processes commands, and sends batch results back. Since Correction uses Rc-based logging (not Send), introduce ChannelLogging which sends LoggingEvent over an mpsc channel back to the Timely thread. CorrectionLogger on the Timely side drains these events each operator activation. Add ArcActivator to timely-util: a Send-safe SyncActivator wrapper that coalesces redundant activations using an AtomicBool. The Tokio task calls activate() after processing commands; the Timely operator acknowledges via ActivationAck::ack(). Also removes ActivatorTrait (single implementor) in favor of concrete RcActivator in replay.rs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The Tokio task handles for the write and append operators were captured
in the `build()` constructor closure but not referenced by the inner
schedule closure. Since the schedule closure is `move`, it only captures
what it references, so the task handles were dropped when the constructor
returned, immediately aborting both tasks. The `let _ = cmd_tx.send(...)`
pattern silently discarded the resulting errors, causing a hang.
Fix by referencing the task handles inside the schedule closure so they
are captured and kept alive for the operator's lifetime. Also change
channel sends to `expect()` so similar issues surface as panics.
Additional fixes:
* Send `WriteResponse { batch: None }` for empty corrections instead of
dropping the response, which left `batch_in_flight` permanently set.
* Move `descs.broadcast()` to the call site to avoid double-broadcast.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
50d1dda to
418e52b
Compare
Same task handle capture bug as write/append: the persist_watch task handles were dropped at the end of the constructor closure. Move the reference into the schedule closure. Also check for `Disconnected` on `persist_rx.try_recv()` instead of silently ignoring it via `while let Ok(...)`. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Merge the three `worker_id == active_worker_id` checks into one block that sets up background tasks and state before `builder.build()`. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When the write operator is dropped (e.g., during dataflow cancellation), the Tokio task holding the Correction buffers is aborted asynchronously. Its destructors may not have run yet, so ChainDropped events may not arrive in the logging channel. Move the channel receiver into CorrectionLogger and track net batch/record counts. On drop, drain any available events, then emit DropEvents to retract any outstanding differential logging state. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Move persist I/O in the materialized view sink operators (mint, write, append) off the Timely thread into spawned Tokio tasks.
The Timely operators use
OperatorBuilderRc::build_rescheduledirectly, keeping!Sendstate (corrections, logging, capabilities) on the Timely thread.Communication with Tokio tasks uses
mpsc::unbounded_channelandSyncActivatorfor waking.🤖 Generated with Claude Code