-
Notifications
You must be signed in to change notification settings - Fork 2.1k
fix(topology): Fix for issue causing stalling on shutdown for sinks configured w/ disk buffers #24949
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
fix(topology): Fix for issue causing stalling on shutdown for sinks configured w/ disk buffers #24949
Changes from all commits
b217efc
8aebef5
79c27c3
0850fec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| Fixed issue during in-situ reload of a sink with disk buffer configured where | ||
| component would stall for batch.timeout_sec before fully gracefully reloading. | ||
|
|
||
| authors: graphcareful | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -590,10 +590,26 @@ impl RunningTopology { | |
| .collect::<HashSet<_>>(); | ||
|
|
||
| // For any existing sink that has a conflicting resource dependency with a changed/added | ||
| // sink, or for any sink that we want to reuse their buffer, we need to explicit wait for | ||
| // them to finish processing so we can reclaim ownership of those resources/buffers. | ||
| // sink, for any sink that we want to reuse their buffer, or for any changed sink with | ||
| // a disk buffer that is not being reused, we need to explicitly wait for them to finish | ||
| // processing so we can reclaim ownership of those resources/buffers. | ||
| let changed_disk_buffer_sinks = diff | ||
| .sinks | ||
| .to_change | ||
| .iter() | ||
| .filter(|key| { | ||
| !reuse_buffers.contains(*key) | ||
| && self | ||
| .config | ||
| .sink(key) | ||
| .is_some_and(|s| s.buffer.has_disk_stage()) | ||
| }) | ||
| .cloned() | ||
| .collect::<HashSet<_>>(); | ||
|
|
||
| let wait_for_sinks = conflicting_sinks | ||
| .chain(reuse_buffers.iter().cloned()) | ||
| .chain(changed_disk_buffer_sinks.iter().cloned()) | ||
| .collect::<HashSet<_>>(); | ||
|
|
||
| // First, we remove any inputs to removed sinks so they can naturally shut down. | ||
|
|
@@ -635,24 +651,26 @@ impl RunningTopology { | |
|
|
||
| for key in &sinks_to_change { | ||
| debug!(component_id = %key, "Changing sink."); | ||
| if reuse_buffers.contains(key) { | ||
| if reuse_buffers.contains(key) || changed_disk_buffer_sinks.contains(key) { | ||
| self.detach_triggers | ||
| .remove(key) | ||
| .unwrap() | ||
| .into_inner() | ||
| .cancel(); | ||
|
Comment on lines
+654
to
659
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This unconditionally calls Useful? React with 👍 / 👎.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This I'm not so sure about, will require more research to confirm/deny
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here's my thoughts on this:
Yes consumption is stopped but not abruptly, the stream will flush partial batches
In this specific case where we are calling cancel on a disk buffer sink, it is guaranteed that all queued records are gracefully drained because the stream will not end until the buffer has been emptied
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the key distinction here is between records already pulled into the sink's batching pipeline and records still unread in the disk buffer. The replacement sink recreates the disk buffer under the same component key and data directory, unread records remain durable on disk and are picked up by the new buffer instance. And unacknowledged in-flight reads are replayed via the ledger rather than lost. The guarantee comes from disk buffer durability, not from the old reader draining to completion.
pront marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // We explicitly clone the input side of the buffer and store it so we don't lose | ||
| // it when we remove the inputs below. | ||
| // | ||
| // We clone instead of removing here because otherwise the input will be missing for | ||
| // the rest of the reload process, which violates the assumption that all previous | ||
| // inputs for components not being removed are still available. It's simpler to | ||
| // allow the "old" input to stick around and be replaced (even though that's | ||
| // basically a no-op since we're reusing the same buffer) than it is to pass around | ||
| // info about which sinks are having their buffers reused and treat them differently | ||
| // at other stages. | ||
| buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone()); | ||
| if reuse_buffers.contains(key) { | ||
| // We explicitly clone the input side of the buffer and store it so we don't lose | ||
| // it when we remove the inputs below. | ||
| // | ||
| // We clone instead of removing here because otherwise the input will be missing for | ||
| // the rest of the reload process, which violates the assumption that all previous | ||
| // inputs for components not being removed are still available. It's simpler to | ||
| // allow the "old" input to stick around and be replaced (even though that's | ||
| // basically a no-op since we're reusing the same buffer) than it is to pass around | ||
| // info about which sinks are having their buffers reused and treat them differently | ||
| // at other stages. | ||
| buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone()); | ||
| } | ||
| } | ||
| self.remove_inputs(key, diff, new_config).await; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few typos here, please revise this changelog. Worth mentioning the shutdown fix here as well.