diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index e4c6995d639..6b938b42745 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -37,6 +37,10 @@ use quickwit_common::{Progress, shared_consts}; use quickwit_config::service::QuickwitService; use quickwit_config::{ClusterConfig, IndexConfig, IndexTemplate, SourceConfig}; use quickwit_ingest::{IngesterPool, LocalShardsUpdate}; +use quickwit_proto::ingest::ingester::{ + IngesterService, IngesterServiceStream, IngesterStatus, ObservationMessage, + OpenObservationStreamRequest, +}; use quickwit_metastore::{CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadataResponseExt}; use quickwit_proto::control_plane::{ AdviseResetShardsRequest, AdviseResetShardsResponse, ControlPlaneError, ControlPlaneResult, @@ -54,7 +58,7 @@ use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId, Source use serde::Serialize; use serde_json::{Value as JsonValue, json}; use tokio::sync::watch; -use tracing::{Level, debug, enabled, error, info}; +use tracing::{Level, debug, enabled, error, info, warn}; use crate::IndexerPool; use crate::cooldown_map::{CooldownMap, CooldownStatus}; @@ -1053,9 +1057,16 @@ impl Handler for ControlPlane { message: IndexerJoined, ctx: &ActorContext, ) -> Result { + let node_id: NodeId = message.0.node_id().into(); info!( - "indexer `{}` joined the cluster: rebalancing shards and rebuilding indexing plan", - message.0.node_id() + "indexer `{node_id}` joined the cluster: rebalancing shards and rebuilding indexing \ + plan" + ); + // Subscribe to the ingester observation stream so we can track decommissioning state. + spawn_observe_ingester_task( + node_id, + self.ingest_controller.ingester_pool().clone(), + ctx.mailbox().downgrade(), ); // TODO: Update shard table. if let Err(metastore_error) = self @@ -1083,10 +1094,12 @@ impl Handler for ControlPlane { message: IndexerLeft, ctx: &ActorContext, ) -> Result { + let node_id: NodeId = message.0.node_id().into(); info!( - "indexer `{}` left the cluster: rebalancing shards and rebuilding indexing plan", - message.0.node_id() + "indexer `{node_id}` left the cluster: rebalancing shards and rebuilding indexing plan" ); + // Clear any decommissioning state tracked for this node's ingester. + self.model.remove_decommissioning_ingester(&node_id); // TODO: Update shard table. if let Err(metastore_error) = self .ingest_controller @@ -1164,6 +1177,98 @@ async fn watcher_indexers( } } +fn spawn_observe_ingester_task( + node_id: NodeId, + ingester_pool: IngesterPool, + weak_mailbox: WeakMailbox, +) { + tokio::spawn(observe_ingester(node_id, ingester_pool, weak_mailbox)); +} + +async fn observe_ingester( + node_id: NodeId, + ingester_pool: IngesterPool, + weak_mailbox: WeakMailbox, +) { + let Some(ingester) = ingester_pool.get(&node_id) else { + warn!(ingester_id=%node_id, "ingester not found in pool, skipping observation stream"); + return; + }; + let observation_stream = match ingester + .open_observation_stream(OpenObservationStreamRequest {}) + .await + { + Ok(stream) => stream, + Err(error) => { + warn!(ingester_id=%node_id, %error, "failed to open observation stream from ingester"); + return; + } + }; + watch_ingester_observation_stream(node_id, observation_stream, weak_mailbox).await; +} + +async fn watch_ingester_observation_stream( + node_id: NodeId, + mut observation_stream: IngesterServiceStream, + weak_mailbox: WeakMailbox, +) { + while let Some(observation_message_result) = observation_stream.next().await { + let Some(mailbox) = weak_mailbox.upgrade() else { + return; + }; + let status = match observation_message_result { + Ok(observation_message) => observation_message.status(), + Err(error) => { + warn!(ingester_id=%node_id, %error, "observation stream error from ingester"); + return; + } + }; + if let Err(error) = mailbox + .send_message(IngesterObservation { + node_id: node_id.clone(), + status, + }) + .await + { + error!(ingester_id=%node_id, %error, "failed to forward `IngesterObservation` to control plane"); + } + // `Decommissioned` and `Failed` are terminal states: no further status changes + // will be emitted. Close our end of the stream so the ingester can shut down cleanly. + match status { + IngesterStatus::Decommissioned | IngesterStatus::Failed => return, + _ => {} + } + } +} + +/// Status update received from an ingester's observation stream. +#[derive(Debug)] +struct IngesterObservation { + node_id: NodeId, + status: IngesterStatus, +} + +#[async_trait] +impl Handler for ControlPlane { + type Reply = (); + + async fn handle( + &mut self, + message: IngesterObservation, + _ctx: &ActorContext, + ) -> Result { + match message.status { + IngesterStatus::Decommissioning | IngesterStatus::Decommissioned => { + self.model.add_decommissioning_ingester(message.node_id); + } + _ => { + self.model.remove_decommissioning_ingester(&message.node_id); + } + } + Ok(()) + } +} + #[cfg(test)] mod tests { use std::num::NonZero; @@ -1186,6 +1291,7 @@ mod tests { ApplyIndexingPlanRequest, ApplyIndexingPlanResponse, CpuCapacity, IndexingServiceClient, MockIndexingService, }; + use quickwit_common::ServiceStream; use quickwit_proto::ingest::ingester::{ IngesterServiceClient, InitShardSuccess, InitShardsResponse, MockIngesterService, RetainShardsResponse, @@ -2810,4 +2916,53 @@ mod tests { universe.assert_quit().await; } + + #[tokio::test] + async fn test_watch_ingester_observation_stream() { + let universe = Universe::with_accelerated_time(); + let (control_plane_mailbox, control_plane_inbox) = universe.create_test_mailbox(); + let weak_mailbox = control_plane_mailbox.downgrade(); + + let node_id: NodeId = "test-ingester".into(); + let ingester_pool = IngesterPool::default(); + + let mut mock_ingester = MockIngesterService::new(); + mock_ingester + .expect_open_observation_stream() + .return_once(|_| { + let stream = ServiceStream::from(vec![ + Ok(ObservationMessage { + node_id: "test-ingester".to_string(), + status: IngesterStatus::Ready as i32, + }), + Ok(ObservationMessage { + node_id: "test-ingester".to_string(), + status: IngesterStatus::Decommissioning as i32, + }), + Ok(ObservationMessage { + node_id: "test-ingester".to_string(), + status: IngesterStatus::Decommissioned as i32, + }), + ]); + Ok(stream) + }); + let ingester = IngesterServiceClient::from_mock(mock_ingester); + ingester_pool.insert(node_id.clone(), ingester); + + spawn_observe_ingester_task(node_id.clone(), ingester_pool, weak_mailbox); + + let msg: IngesterObservation = control_plane_inbox.recv_typed_message().await.unwrap(); + assert_eq!(msg.node_id, node_id); + assert_eq!(msg.status, IngesterStatus::Ready); + + let msg: IngesterObservation = control_plane_inbox.recv_typed_message().await.unwrap(); + assert_eq!(msg.node_id, node_id); + assert_eq!(msg.status, IngesterStatus::Decommissioning); + + let msg: IngesterObservation = control_plane_inbox.recv_typed_message().await.unwrap(); + assert_eq!(msg.node_id, node_id); + assert_eq!(msg.status, IngesterStatus::Decommissioned); + + universe.assert_quit().await; + } } diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 63295d61eca..857d0d23ad1 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -303,6 +303,10 @@ impl IngestController { } } + pub(crate) fn ingester_pool(&self) -> &IngesterPool { + &self.ingester_pool + } + /// Sends a retain shard request to the given list of ingesters. /// /// If the request fails, we just log an error. @@ -459,6 +463,8 @@ impl IngestController { .unavailable_leaders .into_iter() .map(NodeId::from) + // Also exclude ingesters that have announced they are decommissioning + .chain(model.decommissioning_ingesters().iter().cloned()) .collect(); // We do a first pass to identify the shards that are missing from the model and need to be @@ -699,8 +705,10 @@ impl IngestController { let new_num_open_shards = shard_stats.num_open_shards + num_shards_to_open; let new_shards_per_source: HashMap = HashMap::from_iter([(source_uid.clone(), num_shards_to_open)]); + let unavailable_leaders: FnvHashSet = + model.decommissioning_ingesters().iter().cloned().collect(); let successful_source_uids_res = self - .try_open_shards(new_shards_per_source, model, &Default::default(), progress) + .try_open_shards(new_shards_per_source, model, &unavailable_leaders, progress) .await; match successful_source_uids_res { @@ -1037,11 +1045,13 @@ impl IngestController { .or_default() += 1; } + let unavailable_leaders: FnvHashSet = + model.decommissioning_ingesters().iter().cloned().collect(); let mut per_source_num_opened_shards: HashMap = self .try_open_shards( per_source_num_shards_to_open, model, - &Default::default(), + &unavailable_leaders, progress, ) .await @@ -1114,10 +1124,12 @@ impl IngestController { /// The closing operation can only be done by the leader of that shard. /// For these reason, we exclude these shards from the rebalance process. fn compute_shards_to_rebalance(&self, model: &ControlPlaneModel) -> Vec { + let decommissioning_ingesters = model.decommissioning_ingesters(); let mut per_available_ingester_shards: HashMap> = self .ingester_pool .keys() .into_iter() + .filter(|ingester_id| !decommissioning_ingesters.contains(ingester_id)) .map(|ingester_id| (ingester_id, Vec::new())) .collect(); @@ -3704,4 +3716,188 @@ mod tests { test_compute_shards_to_rebalance_aux(&[0, 1], &[1]); test_compute_shards_to_rebalance_aux(&[0, 1, 2], &[3, 4]); } + + #[test] + fn test_compute_shards_to_rebalance_excludes_decommissioning_ingesters() { + let index_metadata = + IndexMetadata::for_test("test-index", "ram://indexes/test-index"); + let index_uid = index_metadata.index_uid.clone(); + let source_id: SourceId = "test-source".to_string(); + + let mut model = ControlPlaneModel::default(); + model.add_index(index_metadata); + let mut source_config = SourceConfig::ingest_v2(); + source_config.source_id = source_id.clone(); + model.add_source(&index_uid, source_config).unwrap(); + + let ingester_pool = IngesterPool::default(); + let ingester = IngesterServiceClient::from_mock(MockIngesterService::new()); + ingester_pool.insert(NodeId::from("ingester-0"), ingester.clone()); + ingester_pool.insert(NodeId::from("ingester-1"), ingester); + + // Ingester-0 has 5 open shards, ingester-1 has 0. + let shards: Vec = (0..5) + .map(|i| Shard { + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + shard_id: Some(ShardId::from(i as u64)), + leader_id: "ingester-0".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }) + .collect(); + model.insert_shards(&index_uid, &source_id, shards); + + let controller = IngestController::new( + MetastoreServiceClient::mocked(), + ingester_pool, + 1, + TEST_SHARD_THROUGHPUT_LIMIT_MIB, + 1.001, + ); + + // Both ingesters available: rebalancing should propose moves. + assert!(!controller.compute_shards_to_rebalance(&model).is_empty()); + + // Mark the target ingester as decommissioning: no rebalancing. + model.add_decommissioning_ingester(NodeId::from("ingester-1")); + assert!(controller.compute_shards_to_rebalance(&model).is_empty()); + } + + #[tokio::test] + async fn test_get_or_create_open_shards_excludes_decommissioning_ingesters() { + let source_id: &'static str = "test-source"; + let mut index_metadata = + IndexMetadata::for_test("test-index-0", "ram://indexes/test-index-0"); + let index_uid = index_metadata.index_uid.clone(); + let doc_mapping_uid = DocMappingUid::random(); + index_metadata.index_config.doc_mapping.doc_mapping_uid = doc_mapping_uid; + + let progress = Progress::default(); + + let mut mock_metastore = MockMetastoreService::new(); + let index_uid_clone = index_uid.clone(); + mock_metastore.expect_open_shards().returning(move |request| { + // The shard must be assigned to ingester-1 (ingester-0 is decommissioning). + assert_eq!(request.subrequests[0].leader_id, "test-ingester-1"); + + let subresponses = vec![OpenShardSubresponse { + subrequest_id: 0, + open_shard: Some(Shard { + index_uid: index_uid_clone.clone().into(), + source_id: source_id.to_string(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester-1".to_string(), + doc_mapping_uid: Some(doc_mapping_uid), + ..Default::default() + }), + }]; + Ok(metastore::OpenShardsResponse { subresponses }) + }); + let metastore = MetastoreServiceClient::from_mock(mock_metastore); + + let ingester_pool = IngesterPool::default(); + ingester_pool.insert( + NodeId::from("test-ingester-0"), + IngesterServiceClient::from_mock(MockIngesterService::new()), + ); + + let mut mock_ingester_1 = MockIngesterService::new(); + mock_ingester_1 + .expect_init_shards() + .returning(|request| { + let shard = request.subrequests[0].shard().clone(); + Ok(InitShardsResponse { + successes: vec![InitShardSuccess { + subrequest_id: request.subrequests[0].subrequest_id, + shard: Some(shard), + }], + failures: Vec::new(), + }) + }); + ingester_pool.insert( + NodeId::from("test-ingester-1"), + IngesterServiceClient::from_mock(mock_ingester_1), + ); + + let mut controller = IngestController::new( + metastore, + ingester_pool, + 1, + TEST_SHARD_THROUGHPUT_LIMIT_MIB, + 1.001, + ); + + let mut model = ControlPlaneModel::default(); + model.add_index(index_metadata); + let mut source_config = SourceConfig::ingest_v2(); + source_config.source_id = source_id.to_string(); + model.add_source(&index_uid, source_config).unwrap(); + model.add_decommissioning_ingester(NodeId::from("test-ingester-0")); + + let request = GetOrCreateOpenShardsRequest { + subrequests: vec![GetOrCreateOpenShardsSubrequest { + subrequest_id: 0, + index_id: "test-index-0".to_string(), + source_id: source_id.to_string(), + }], + closed_shards: Vec::new(), + unavailable_leaders: Vec::new(), + }; + let response = controller + .get_or_create_open_shards(request, &mut model, &progress) + .await + .unwrap(); + assert_eq!(response.successes.len(), 1); + assert_eq!(response.successes[0].open_shards[0].leader_id, "test-ingester-1"); + } + + #[tokio::test] + async fn test_try_scale_up_shards_excludes_decommissioning() { + let mut mock_metastore = MockMetastoreService::new(); + mock_metastore.expect_open_shards().never(); + let metastore = MetastoreServiceClient::from_mock(mock_metastore); + + let ingester_pool = IngesterPool::default(); + ingester_pool.insert( + "test-ingester".into(), + IngesterServiceClient::from_mock(MockIngesterService::new()), + ); + + let mut controller = IngestController::new( + metastore, + ingester_pool, + 1, + TEST_SHARD_THROUGHPUT_LIMIT_MIB, + 1.001, + ); + + let index_uid = IndexUid::for_test("test-index", 0); + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: INGEST_V2_SOURCE_ID.to_string(), + }; + let shard_stats = ShardStats { + num_open_shards: 2, + ..Default::default() + }; + + let mut model = ControlPlaneModel::default(); + model.add_index(IndexMetadata::for_test( + &index_uid.index_id, + "ram://indexes/test-index:0", + )); + model + .add_source(&index_uid, SourceConfig::ingest_v2()) + .unwrap(); + model.add_decommissioning_ingester(NodeId::from("test-ingester")); + + let progress = Progress::default(); + controller + .try_scale_up_shards(source_uid, shard_stats, &mut model, &progress, 1) + .await + .unwrap(); + assert_eq!(model.all_shards().count(), 0); + } } diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index 0d0431a67ce..0caac25b726 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -53,6 +53,9 @@ pub(crate) struct ControlPlaneModel { index_uid_table: FnvHashMap, index_table: FnvHashMap, shard_table: ShardTable, + /// Set of ingester node IDs that have announced they are decommissioning via gRPC. + /// These ingesters are excluded from shard allocation. + decommissioning_ingesters: FnvHashSet, } impl ControlPlaneModel { @@ -61,6 +64,26 @@ impl ControlPlaneModel { *self = Default::default(); } + pub fn add_decommissioning_ingester(&mut self, node_id: NodeId) -> bool { + debug!( + node_id=%node_id, + "adding node as as decommissioning" + ); + self.decommissioning_ingesters.insert(node_id) + } + + pub fn remove_decommissioning_ingester(&mut self, node_id: &NodeId) -> bool { + debug!( + node_id=%node_id, + "removing node from decommissioning" + ); + self.decommissioning_ingesters.remove(node_id) + } + + pub fn decommissioning_ingesters(&self) -> &FnvHashSet { + &self.decommissioning_ingesters + } + pub fn num_indexes(&self) -> usize { self.index_table.len() } diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index 5277e0ac740..e278fb8bece 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -351,7 +351,7 @@ impl ClusterSandbox { TraceServiceClient::new(self.channel(QuickwitService::Indexer)) } - async fn wait_for_cluster_num_ready_nodes( + pub async fn wait_for_cluster_num_ready_nodes( &self, expected_num_ready_nodes: usize, ) -> anyhow::Result<()> { @@ -596,4 +596,21 @@ impl ClusterSandbox { self.shutdown_services(QuickwitService::supported_services()) .await } + + /// Remove a node from the sandbox and return its shutdown handle. + /// After this call, `rest_client` and other lookup methods skip the removed + /// node, so callers can trigger shutdown concurrently with other sandbox + /// operations. + pub fn remove_node_with_service( + &mut self, + service: QuickwitService, + ) -> NodeShutdownHandle { + let idx = self + .node_shutdown_handles + .iter() + .position(|h| h.node_services.contains(&service)) + .unwrap_or_else(|| panic!("no node with service {service:?}")); + self.node_configs.remove(idx); + self.node_shutdown_handles.remove(idx) + } } diff --git a/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs b/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs index 4a5e29c9565..3169dfaf28a 100644 --- a/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs @@ -856,3 +856,139 @@ async fn test_shutdown_indexer_first() { .unwrap() .unwrap(); } + +/// Tests that the graceful shutdown sequence works correctly in a multi-indexer +/// cluster: shutting down one indexer does NOT cause 500 errors or data loss, +/// and the cluster eventually rebalances. see #6158 +#[tokio::test] +async fn test_graceful_shutdown_no_data_loss() { + let mut sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Indexer]) + .add_node([QuickwitService::Indexer]) + .add_node([ + QuickwitService::ControlPlane, + QuickwitService::Searcher, + QuickwitService::Metastore, + QuickwitService::Janitor, + ]) + .build_and_start() + .await; + let index_id = "test_graceful_shutdown_no_data_loss"; + + // Create index with a long commit timeout so documents stay uncommitted + // in the ingesters' WAL. The decommission sequence should commit + // them before the indexer quits. + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .create( + format!( + r#" + version: 0.8 + index_id: {index_id} + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 5 + "# + ), + ConfigFormat::Yaml, + false, + ) + .await + .unwrap(); + + // Ingest docs with auto-commit. With a 5s commit timeout, these documents + // sit uncommitted in the ingesters' WAL - exactly the in-flight state we + // want to exercise during draining. + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "before-shutdown-1"}), + CommitType::Auto, + ) + .await + .unwrap(); + + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "before-shutdown-2"}), + CommitType::Auto, + ) + .await + .unwrap(); + + // Remove the first indexer node from the sandbox and get its shutdown handle. + // After this call, rest_client(Indexer) returns the second (surviving) indexer. + let shutdown_handle = sandbox.remove_node_with_service(QuickwitService::Indexer); + + // Concurrently: shut down the removed indexer AND ingest more data via the + // surviving indexer. This verifies the cluster stays operational and the + // router on the surviving node does not return 500 errors while one indexer + // is decommissioning. The control plane excludes the decommissioning + // ingester from shard allocation, so new shards go to the surviving one. + let ingest_client = sandbox.rest_client(QuickwitService::Indexer); + let (shutdown_result, ingest_result) = tokio::join!( + async { + tokio::time::timeout(Duration::from_secs(30), shutdown_handle.shutdown()) + .await + .expect("indexer shutdown timed out — decommission may be stuck") + }, + async { + // Small delay so the decommission sequence has started before we ingest. + tokio::time::sleep(Duration::from_millis(200)).await; + ingest( + &ingest_client, + index_id, + ingest_json!({"body": "during-shutdown"}), + CommitType::Auto, + ) + .await + }, + ); + shutdown_result.expect("indexer shutdown failed"); + ingest_result.expect("ingest during shutdown should succeed (no 500 errors)"); + + // All 3 documents should eventually be searchable. Documents 1 & 2 were + // in-flight on the decommissioning indexer and should have been committed during + // the decommission step. Document 3 was ingested to the surviving indexer. + wait_until_predicate( + || async { + match sandbox + .rest_client(QuickwitService::Searcher) + .search( + index_id, + quickwit_serve::SearchRequestQueryString { + query: "*".to_string(), + max_hits: 10, + ..Default::default() + }, + ) + .await + { + Ok(resp) => resp.num_hits == 3, + Err(_) => false, + } + }, + Duration::from_secs(30), + Duration::from_millis(500), + ) + .await + .expect("expected 3 documents after decommission shutdown, some data may have been lost"); + + // Verify the cluster sees 2 ready nodes (the surviving indexer + the + // control-plane/searcher/metastore/janitor node). + sandbox + .wait_for_cluster_num_ready_nodes(2) + .await + .expect("cluster should see 2 ready nodes after indexer shutdown"); + + // Clean shutdown of the remaining nodes. + tokio::time::timeout(Duration::from_secs(15), sandbox.shutdown()) + .await + .unwrap() + .unwrap(); +}