Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.d/24929_fix_stall_on_disk_shutdown.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Fixed issue during in-situ reload of a sink with disk buffer configured where
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few typos here, please revise this changelog. Worth mentioning the shutdown fix here as well.

component would stall for batch.timeout_sec before fully gracefully reloading.

authors: graphcareful
7 changes: 7 additions & 0 deletions lib/vector-buffers/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,13 @@ impl Default for BufferConfig {
}

impl BufferConfig {
/// Returns true if any stage in this buffer configuration uses disk-based storage.
pub fn has_disk_stage(&self) -> bool {
self.stages()
.iter()
.any(|stage| matches!(stage, BufferType::DiskV2 { .. }))
}

/// Gets all of the configured stages for this buffer.
pub fn stages(&self) -> &[BufferType] {
match self {
Expand Down
14 changes: 14 additions & 0 deletions lib/vector-core/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,20 @@ impl Fanout {
}
}

/// Waits for the next control message and applies it.
///
/// Returns `true` if a message was processed, `false` if the control
/// channel was closed.
pub async fn recv_control_message(&mut self) -> bool {
match self.control_channel.recv().await {
Some(msg) => {
self.apply_control_message(msg);
true
}
None => false,
}
}

/// Apply a control message directly against this instance.
///
/// This method should not be used if there is an active `SendGroup` being processed.
Expand Down
64 changes: 40 additions & 24 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -884,30 +884,46 @@ async fn run_source_output_pump(
) -> TaskResult {
debug!("Source pump starting.");

while let Some(SourceSenderItem {
events: mut array,
send_reference,
}) = rx.next().await
{
// Even though we have a `send_reference` timestamp above, that reference time is when
// the events were enqueued in the `SourceSender`, not when they were pulled out of the
// `rx` stream on this end. Since those times can be quite different (due to blocking
// inherent to the fanout send operation), we set the `last_transform_timestamp` to the
// current time instead to get an accurate reference for when the events started waiting
// for the first transform.
let now = Instant::now();
array.for_each_metadata_mut(|metadata| {
metadata.set_source_id(Arc::clone(&source));
metadata.set_source_type(source_type);
metadata.set_last_transform_timestamp(now);
});
fanout
.send(array, Some(send_reference))
.await
.map_err(|e| {
debug!("Source pump finished with an error.");
TaskError::wrapped(e)
})?;
let mut control_channel_open = true;
loop {
tokio::select! {
biased;
// Process control messages (e.g. Remove/Pause) even when the source
// is idle, so that config reloads can proceed without waiting for the
// next event.
alive = fanout.recv_control_message(), if control_channel_open => {
if !alive {
control_channel_open = false;
}
}
item = rx.next() => {
match item {
Some(SourceSenderItem { events: mut array, send_reference }) => {
// Even though we have a `send_reference` timestamp above, that reference
// time is when the events were enqueued in the `SourceSender`, not when
// they were pulled out of the `rx` stream on this end. Since those times
// can be quite different (due to blocking inherent to the fanout send
// operation), we set the `last_transform_timestamp` to the current time
// instead to get an accurate reference for when the events started
// waiting for the first transform.
let now = Instant::now();
array.for_each_metadata_mut(|metadata| {
metadata.set_source_id(Arc::clone(&source));
metadata.set_source_type(source_type);
metadata.set_last_transform_timestamp(now);
});
fanout
.send(array, Some(send_reference))
.await
.map_err(|e| {
debug!("Source pump finished with an error.");
TaskError::wrapped(e)
})?;
}
None => break,
}
}
}
}

debug!("Source pump finished normally.");
Expand Down
46 changes: 32 additions & 14 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,26 @@ impl RunningTopology {
.collect::<HashSet<_>>();

// For any existing sink that has a conflicting resource dependency with a changed/added
// sink, or for any sink that we want to reuse their buffer, we need to explicit wait for
// them to finish processing so we can reclaim ownership of those resources/buffers.
// sink, for any sink that we want to reuse their buffer, or for any changed sink with
// a disk buffer that is not being reused, we need to explicitly wait for them to finish
// processing so we can reclaim ownership of those resources/buffers.
let changed_disk_buffer_sinks = diff
.sinks
.to_change
.iter()
.filter(|key| {
!reuse_buffers.contains(*key)
&& self
.config
.sink(key)
.is_some_and(|s| s.buffer.has_disk_stage())
})
.cloned()
.collect::<HashSet<_>>();

let wait_for_sinks = conflicting_sinks
.chain(reuse_buffers.iter().cloned())
.chain(changed_disk_buffer_sinks.iter().cloned())
.collect::<HashSet<_>>();

// First, we remove any inputs to removed sinks so they can naturally shut down.
Expand Down Expand Up @@ -635,24 +651,26 @@ impl RunningTopology {

for key in &sinks_to_change {
debug!(component_id = %key, "Changing sink.");
if reuse_buffers.contains(key) {
if reuse_buffers.contains(key) || changed_disk_buffer_sinks.contains(key) {
self.detach_triggers
.remove(key)
.unwrap()
.into_inner()
.cancel();
Comment on lines +654 to 659
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Restrict sink cancel to avoid dropping buffered events

This unconditionally calls cancel() for every changed sink with a prior disk stage, even when this is a normal reload path and the buffer is not being reused. Because sink input is wrapped in take_until_if(tripwire), cancellation stops consumption immediately; later, when reuse_buffers is false, the old receiver is dropped instead of handed off, so queued records are not gracefully drained. In practical terms, changing a sink from disk-backed buffering to a different topology (for example disk→memory) can lose in-flight buffered data that previously continued draining in the detached old sink.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This I'm not so sure about, will require more research to confirm/deny

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's my thoughts on this:

Because sink input is wrapped in take_until_if(tripwire), cancellation stops consumption immediately

Yes consumption is stopped but not abruptly, the stream will flush partial batches

so queued records are not gracefully drained

In this specific case where we are calling cancel on a disk buffer sink, it is guaranteed that all queued records are gracefully drained because the stream will not end until the buffer has been emptied

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the key distinction here is between records already pulled into the sink's batching pipeline and records still unread in the disk buffer.

The replacement sink recreates the disk buffer under the same component key and data directory, unread records remain durable on disk and are picked up by the new buffer instance. And unacknowledged in-flight reads are replayed via the ledger rather than lost. The guarantee comes from disk buffer durability, not from the old reader draining to completion.


// We explicitly clone the input side of the buffer and store it so we don't lose
// it when we remove the inputs below.
//
// We clone instead of removing here because otherwise the input will be missing for
// the rest of the reload process, which violates the assumption that all previous
// inputs for components not being removed are still available. It's simpler to
// allow the "old" input to stick around and be replaced (even though that's
// basically a no-op since we're reusing the same buffer) than it is to pass around
// info about which sinks are having their buffers reused and treat them differently
// at other stages.
buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
if reuse_buffers.contains(key) {
// We explicitly clone the input side of the buffer and store it so we don't lose
// it when we remove the inputs below.
//
// We clone instead of removing here because otherwise the input will be missing for
// the rest of the reload process, which violates the assumption that all previous
// inputs for components not being removed are still available. It's simpler to
// allow the "old" input to stick around and be replaced (even though that's
// basically a no-op since we're reusing the same buffer) than it is to pass around
// info about which sinks are having their buffers reused and treat them differently
// at other stages.
buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
}
}
self.remove_inputs(key, diff, new_config).await;
}
Expand Down
137 changes: 134 additions & 3 deletions src/topology/test/reload.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::{
collections::HashSet,
net::{SocketAddr, TcpListener},
num::NonZeroU64,
num::{NonZeroU64, NonZeroUsize},
time::Duration,
};

use futures::StreamExt;
use tokio::time::sleep;
use tokio_stream::wrappers::UnboundedReceiverStream;
use vector_lib::{
buffers::{BufferConfig, BufferType, WhenFull},
buffers::{BufferConfig, BufferType, MemoryBufferSize, WhenFull},
config::ComponentKey,
};

Expand Down Expand Up @@ -293,7 +293,6 @@ async fn topology_readd_input() {
#[tokio::test]
async fn topology_reload_component() {
test_util::trace_init();

let (_guard, address_0) = next_addr();

let mut old_config = Config::builder();
Expand All @@ -320,6 +319,138 @@ async fn topology_reload_component() {
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn topology_disk_buffer_config_change_does_not_stall() {
// Changing a disk buffer's configuration on a running sink (e.g. via in-situ
// config edit) must not stall the reload. Previously, the detach trigger was
// only cancelled for sinks whose buffers were being reused, so sinks with
// changed disk buffer configs would never have their input stream terminated,
// causing the reload to hang indefinitely.
test_util::trace_init();

let (_guard, address) = next_addr();

let data_dir = temp_dir();
std::fs::create_dir(&data_dir).unwrap();

let mut old_config = Config::builder();
old_config.global.data_dir = Some(data_dir);
old_config.add_source("in", internal_metrics_source());
old_config.add_sink("out", &["in"], prom_exporter_sink(address, 1));

let sink_key = ComponentKey::from("out");
old_config.sinks[&sink_key].buffer = BufferConfig::Single(BufferType::DiskV2 {
max_size: NonZeroU64::new(268435488).unwrap(),
when_full: WhenFull::Block,
});

// Change only the disk buffer's max_size.
let mut new_config = old_config.clone();
new_config.sinks[&sink_key].buffer = BufferConfig::Single(BufferType::DiskV2 {
max_size: NonZeroU64::new(536870912).unwrap(),
when_full: WhenFull::Block,
});

let (mut topology, crash) = start_topology(old_config.build().unwrap(), true).await;
let mut crash_stream = UnboundedReceiverStream::new(crash);

tokio::select! {
_ = wait_for_tcp(address) => {},
_ = crash_stream.next() => panic!("topology crashed before reload"),
}

// Simulate an in-situ config edit: the config watcher would put the changed
// sink into components_to_reload, which excludes it from reuse_buffers.
topology.extend_reload_set(HashSet::from_iter(vec![sink_key]));

let reload_result = tokio::time::timeout(
Duration::from_secs(5),
topology.reload_config_and_respawn(new_config.build().unwrap(), Default::default()),
)
.await;

assert!(
reload_result.is_ok(),
"Reload stalled: changing a disk buffer config should not cause the reload to hang"
);
reload_result.unwrap().unwrap();

// Verify the new sink is running.
tokio::select! {
_ = wait_for_tcp(address) => {},
_ = crash_stream.next() => panic!("topology crashed after reload"),
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn topology_disk_buffer_config_change_chained_does_not_stall() {
// Same as above but with a chained memory → disk overflow buffer to verify
// that the writer-drop notification is collected from overflow stages too.
test_util::trace_init();

let (_guard, address) = next_addr();

let data_dir = temp_dir();
std::fs::create_dir(&data_dir).unwrap();

let memory_stage = BufferType::Memory {
size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(100).unwrap()),
when_full: WhenFull::Overflow,
};

let mut old_config = Config::builder();
old_config.global.data_dir = Some(data_dir);
old_config.add_source("in", internal_metrics_source());
old_config.add_sink("out", &["in"], prom_exporter_sink(address, 1));

let sink_key = ComponentKey::from("out");
old_config.sinks[&sink_key].buffer = BufferConfig::Chained(vec![
memory_stage,
BufferType::DiskV2 {
max_size: NonZeroU64::new(268435488).unwrap(),
when_full: WhenFull::Block,
},
]);

// Change only the disk overflow stage's max_size.
let mut new_config = old_config.clone();
new_config.sinks[&sink_key].buffer = BufferConfig::Chained(vec![
memory_stage,
BufferType::DiskV2 {
max_size: NonZeroU64::new(536870912).unwrap(),
when_full: WhenFull::Block,
},
]);

let (mut topology, crash) = start_topology(old_config.build().unwrap(), true).await;
let mut crash_stream = UnboundedReceiverStream::new(crash);

tokio::select! {
_ = wait_for_tcp(address) => {},
_ = crash_stream.next() => panic!("topology crashed before reload"),
}

topology.extend_reload_set(HashSet::from_iter(vec![sink_key]));

let reload_result = tokio::time::timeout(
Duration::from_secs(5),
topology.reload_config_and_respawn(new_config.build().unwrap(), Default::default()),
)
.await;

assert!(
reload_result.is_ok(),
"Reload stalled: changing a chained disk buffer config should not cause the reload to hang"
);
reload_result.unwrap().unwrap();

// Verify the new sink is running.
tokio::select! {
_ = wait_for_tcp(address) => {},
_ = crash_stream.next() => panic!("topology crashed after reload"),
}
}

async fn reload_sink_test(
old_config: Config,
new_config: Config,
Expand Down
Loading