diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/publisher_impl.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/publisher_impl.rs index 8ae01f06c68..16206c0bdc3 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/publisher_impl.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/publisher_impl.rs @@ -22,7 +22,7 @@ use quickwit_dst::events::merge_pipeline::{MergePipelineEvent, record_merge_pipe use quickwit_proto::metastore::{ MetastoreService, PublishMetricsSplitsRequest, PublishSketchSplitsRequest, }; -use tracing::{info, instrument}; +use tracing::{info, instrument, warn}; use super::ParquetSplitsUpdate; use crate::actors::publisher::{Publisher, serialize_checkpoint_delta, suggest_truncate}; @@ -125,18 +125,23 @@ impl Handler for Publisher { suggest_truncate(ctx, &self.source_mailbox_opt, checkpoint_delta_opt).await; - // Feedback loop: notify the merge planner about all newly published - // splits — both ingest outputs and merge outputs — so it can plan - // further compaction. Infinite loops are prevented by the merge - // policy's maturity checks (max_merge_ops, target_split_size_bytes, - // maturation_period), not by filtering here. This matches the Tantivy - // publisher which sends NewSplits unconditionally. + // Feedback loop: notify the merge planner about newly published splits so it can plan + // further compaction. This is BEST-EFFORT and NON-BLOCKING by design: ingest + // publish/truncate should not block on compaction. A blocking send here + // would couple ingest WAL truncation to compaction liveness: if the planner is absent or + // slow (mailbox full, or the merge pipeline failed to spawn), a blocking + // send wedges the publisher and stalls truncation, which can fill the shared WAL and take + // down every index on the ingester. + // Infinite merge loops are prevented by the merge policy's maturity checks + // (max_merge_ops, target_split_size_bytes, maturation_period), not by filtering here. if let Some(planner_mailbox) = &self.parquet_merge_planner_mailbox_opt && !new_splits.is_empty() + && let Err(error) = + planner_mailbox.try_send_message(super::ParquetNewSplits { new_splits }) { - let _ = ctx - .send_message(planner_mailbox, super::ParquetNewSplits { new_splits }) - .await; + // Dropping is "safe", but not efficient: the planner re-seeds its split set from the + // metastore only on respawn, so we may have uncompacted files + warn!(%error, "dropping new-splits feedback to merge planner (best-effort)"); } if split_ids.is_empty() { @@ -156,6 +161,8 @@ impl Handler for Publisher { #[cfg(test)] mod tests { + use std::time::Duration; + use quickwit_actors::{QueueCapacity, Universe}; use quickwit_metastore::checkpoint::{IndexCheckpointDelta, SourceCheckpointDelta}; use quickwit_parquet_engine::split::{ParquetSplitId, ParquetSplitMetadata, TimeRange}; @@ -164,8 +171,10 @@ mod tests { use tracing::Span; use super::{METRICS_PUBLISHER_NAME, ParquetSplitsUpdate}; + use crate::actors::parquet_pipeline::ParquetMergePlanner; use crate::actors::publisher::Publisher; use crate::models::PublishLock; + use crate::source::{SourceActor, SuggestTruncate}; fn create_test_metrics_split_metadata(index_uid: &str, split_id: &str) -> ParquetSplitMetadata { ParquetSplitMetadata::metrics_builder() @@ -324,4 +333,87 @@ mod tests { universe.assert_quit().await; } + + /// Regression: a failed merge-pipeline spawn leaves the merge-planner MAILBOX created + /// (`ParquetMergePipeline::new`) but its ACTOR never spawned (`spawn_pipeline` fails at + /// `fetch_immature_splits`). The ingest publisher must NOT block publish/truncate on the + /// `Bounded(1)` merge-feedback `send(...).await`, or a dead compaction pipeline wedges WAL + /// truncation and closed shards with data never drain. + /// + /// On current code this times out (the publisher wedges on the consumer-less mailbox after + /// its first feedback send); it should pass once the feedback send is made non-blocking. + #[tokio::test] + async fn test_metrics_publisher_not_wedged_when_merge_planner_absent() { + let universe = Universe::with_accelerated_time(); + + let mut mock_metastore = MockMetastoreService::new(); + mock_metastore + .expect_publish_metrics_splits() + .times(1..) + .returning(|_| Ok(EmptyResponse {})); + + // Source mailbox is the progress signal (receives SuggestTruncate); unbounded so the + // truncate send never blocks. + let (source_mailbox, source_inbox) = universe.create_test_mailbox::(); + + // Merge-planner mailbox with NO consuming actor, Bounded(1): the failed-spawn state. + // The inbox is held (not dropped) so the channel stays open and sends BLOCK when the + // queue is full rather than erroring. + let (planner_mailbox, _planner_inbox_held) = universe + .create_mailbox::( + "ParquetMergePlanner", + QueueCapacity::Bounded(1), + ); + + let publisher = Publisher::new( + METRICS_PUBLISHER_NAME, + QueueCapacity::Bounded(1), + MetastoreServiceClient::from_mock(mock_metastore), + None, + Some(source_mailbox), + ) + .set_parquet_merge_planner_mailbox(planner_mailbox); + let (publisher_mailbox, publisher_handle) = universe.spawn_builder().spawn(publisher); + + let observation = tokio::time::timeout(Duration::from_secs(10), async { + for i in 0..3u64 { + let update = ParquetSplitsUpdate { + index_uid: IndexUid::for_test("test-index", 0), + new_splits: vec![create_test_metrics_split_metadata( + "test-index:00000000000000000000000000", + &format!("split-{i}"), + )], + replaced_split_ids: Vec::new(), + checkpoint_delta_opt: Some(IndexCheckpointDelta { + source_id: "test-source".to_string(), + source_delta: SourceCheckpointDelta::from_range((i * 10)..((i + 1) * 10)), + }), + publish_lock: PublishLock::default(), + publish_token_opt: None, + parent_span: Span::none(), + _merge_task_opt: None, + }; + publisher_mailbox.send_message(update).await.unwrap(); + } + publisher_handle.process_pending_and_observe().await.state + }) + .await + .expect( + "publisher wedged: ingest publish/truncate blocked on a consumer-less Bounded(1) \ + merge-planner mailbox", + ); + + assert_eq!( + observation.num_published_splits, 3, + "all splits must publish even when the merge planner never consumes feedback" + ); + let truncates = source_inbox.drain_for_test_typed::(); + assert_eq!( + truncates.len(), + 3, + "WAL truncation must continue when the merge planner is absent" + ); + + universe.assert_quit().await; + } }