Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use quickwit_dst::events::merge_pipeline::{MergePipelineEvent, record_merge_pipe
use quickwit_proto::metastore::{
MetastoreService, PublishMetricsSplitsRequest, PublishSketchSplitsRequest,
};
use tracing::{info, instrument};
use tracing::{info, instrument, warn};

use super::ParquetSplitsUpdate;
use crate::actors::publisher::{Publisher, serialize_checkpoint_delta, suggest_truncate};
Expand Down Expand Up @@ -125,18 +125,23 @@ impl Handler<ParquetSplitsUpdate> for Publisher {

suggest_truncate(ctx, &self.source_mailbox_opt, checkpoint_delta_opt).await;

// Feedback loop: notify the merge planner about all newly published
// splits — both ingest outputs and merge outputs — so it can plan
// further compaction. Infinite loops are prevented by the merge
// policy's maturity checks (max_merge_ops, target_split_size_bytes,
// maturation_period), not by filtering here. This matches the Tantivy
// publisher which sends NewSplits unconditionally.
// Feedback loop: notify the merge planner about newly published splits so it can plan
// further compaction. This is BEST-EFFORT and NON-BLOCKING by design: ingest
// publish/truncate should not block on compaction. A blocking send here
// would couple ingest WAL truncation to compaction liveness: if the planner is absent or
// slow (mailbox full, or the merge pipeline failed to spawn), a blocking
// send wedges the publisher and stalls truncation, which can fill the shared WAL and take
// down every index on the ingester.
// Infinite merge loops are prevented by the merge policy's maturity checks
// (max_merge_ops, target_split_size_bytes, maturation_period), not by filtering here.
if let Some(planner_mailbox) = &self.parquet_merge_planner_mailbox_opt
&& !new_splits.is_empty()
&& let Err(error) =
planner_mailbox.try_send_message(super::ParquetNewSplits { new_splits })
{
let _ = ctx
.send_message(planner_mailbox, super::ParquetNewSplits { new_splits })
.await;
// Dropping is "safe", but not efficient: the planner re-seeds its split set from the
// metastore only on respawn, so we may have uncompacted files
warn!(%error, "dropping new-splits feedback to merge planner (best-effort)");
}

if split_ids.is_empty() {
Expand All @@ -156,6 +161,8 @@ impl Handler<ParquetSplitsUpdate> for Publisher {

#[cfg(test)]
mod tests {
use std::time::Duration;

use quickwit_actors::{QueueCapacity, Universe};
use quickwit_metastore::checkpoint::{IndexCheckpointDelta, SourceCheckpointDelta};
use quickwit_parquet_engine::split::{ParquetSplitId, ParquetSplitMetadata, TimeRange};
Expand All @@ -164,8 +171,10 @@ mod tests {
use tracing::Span;

use super::{METRICS_PUBLISHER_NAME, ParquetSplitsUpdate};
use crate::actors::parquet_pipeline::ParquetMergePlanner;
use crate::actors::publisher::Publisher;
use crate::models::PublishLock;
use crate::source::{SourceActor, SuggestTruncate};

fn create_test_metrics_split_metadata(index_uid: &str, split_id: &str) -> ParquetSplitMetadata {
ParquetSplitMetadata::metrics_builder()
Expand Down Expand Up @@ -324,4 +333,87 @@ mod tests {

universe.assert_quit().await;
}

/// Regression: a failed merge-pipeline spawn leaves the merge-planner MAILBOX created
/// (`ParquetMergePipeline::new`) but its ACTOR never spawned (`spawn_pipeline` fails at
/// `fetch_immature_splits`). The ingest publisher must NOT block publish/truncate on the
/// `Bounded(1)` merge-feedback `send(...).await`, or a dead compaction pipeline wedges WAL
/// truncation and closed shards with data never drain.
///
/// On current code this times out (the publisher wedges on the consumer-less mailbox after
/// its first feedback send); it should pass once the feedback send is made non-blocking.
#[tokio::test]
async fn test_metrics_publisher_not_wedged_when_merge_planner_absent() {
let universe = Universe::with_accelerated_time();

let mut mock_metastore = MockMetastoreService::new();
mock_metastore
.expect_publish_metrics_splits()
.times(1..)
.returning(|_| Ok(EmptyResponse {}));

// Source mailbox is the progress signal (receives SuggestTruncate); unbounded so the
// truncate send never blocks.
let (source_mailbox, source_inbox) = universe.create_test_mailbox::<SourceActor>();

// Merge-planner mailbox with NO consuming actor, Bounded(1): the failed-spawn state.
// The inbox is held (not dropped) so the channel stays open and sends BLOCK when the
// queue is full rather than erroring.
let (planner_mailbox, _planner_inbox_held) = universe
.create_mailbox::<ParquetMergePlanner>(
"ParquetMergePlanner",
QueueCapacity::Bounded(1),
);

let publisher = Publisher::new(
METRICS_PUBLISHER_NAME,
QueueCapacity::Bounded(1),
MetastoreServiceClient::from_mock(mock_metastore),
None,
Some(source_mailbox),
)
.set_parquet_merge_planner_mailbox(planner_mailbox);
let (publisher_mailbox, publisher_handle) = universe.spawn_builder().spawn(publisher);

let observation = tokio::time::timeout(Duration::from_secs(10), async {
for i in 0..3u64 {
let update = ParquetSplitsUpdate {
index_uid: IndexUid::for_test("test-index", 0),
new_splits: vec![create_test_metrics_split_metadata(
"test-index:00000000000000000000000000",
&format!("split-{i}"),
)],
replaced_split_ids: Vec::new(),
checkpoint_delta_opt: Some(IndexCheckpointDelta {
source_id: "test-source".to_string(),
source_delta: SourceCheckpointDelta::from_range((i * 10)..((i + 1) * 10)),
}),
publish_lock: PublishLock::default(),
publish_token_opt: None,
parent_span: Span::none(),
_merge_task_opt: None,
};
publisher_mailbox.send_message(update).await.unwrap();
}
publisher_handle.process_pending_and_observe().await.state
})
.await
.expect(
"publisher wedged: ingest publish/truncate blocked on a consumer-less Bounded(1) \
merge-planner mailbox",
);

assert_eq!(
observation.num_published_splits, 3,
"all splits must publish even when the merge planner never consumes feedback"
);
let truncates = source_inbox.drain_for_test_typed::<SuggestTruncate>();
assert_eq!(
truncates.len(),
3,
"WAL truncation must continue when the merge planner is absent"
);

universe.assert_quit().await;
}
}
Loading