diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index e3244746e0a..9b199feaecd 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -392,6 +392,15 @@ impl Cluster { tokio::time::sleep(self.gossip_interval * 2).await; } + pub async fn wait_for_ingester_status_propagation(&self) { + let propagation_delay = self.gossip_interval * 2; + info!( + "waiting {:?} for cluster to propagate ingester status change", + propagation_delay + ); + tokio::time::sleep(propagation_delay).await; + } + pub async fn initiate_shutdown(&self) -> anyhow::Result<()> { self.inner.read().await.chitchat_handle.initiate_shutdown() } diff --git a/quickwit/quickwit-cluster/src/node.rs b/quickwit/quickwit-cluster/src/node.rs index 4a8b11dbafc..4f768104338 100644 --- a/quickwit/quickwit-cluster/src/node.rs +++ b/quickwit/quickwit-cluster/src/node.rs @@ -18,8 +18,10 @@ use std::net::SocketAddr; use std::sync::Arc; use chitchat::{ChitchatId, NodeState}; +use quickwit_common::shared_consts::INGESTER_STATUS_KEY; use quickwit_config::service::QuickwitService; use quickwit_proto::indexing::{CpuCapacity, IndexingTask}; +use quickwit_proto::ingest::ingester::IngesterStatus; use quickwit_proto::types::NodeIdRef; use tonic::transport::Channel; @@ -39,6 +41,9 @@ impl ClusterNode { is_self_node: bool, ) -> anyhow::Result { let member = build_cluster_member(chitchat_id.clone(), node_state)?; + let ingester_status = node_state + .get(INGESTER_STATUS_KEY) + .and_then(IngesterStatus::from_json_str_name); let inner = InnerNode { chitchat_id, channel, @@ -46,6 +51,7 @@ impl ClusterNode { grpc_advertise_addr: member.grpc_advertise_addr, indexing_tasks: member.indexing_tasks, indexing_capacity: member.indexing_cpu_capacity, + ingester_status, is_ready: member.is_ready, is_self_node, }; @@ -62,6 +68,26 @@ impl ClusterNode { is_self_node: bool, enabled_services: &[&str], indexing_tasks: &[IndexingTask], + ) -> Self { + Self::for_test_with_ingester_status( + node_id, + port, + is_self_node, + enabled_services, + indexing_tasks, + None, + ) + .await + } + + #[cfg(any(test, feature = "testsuite"))] + pub async fn for_test_with_ingester_status( + node_id: &str, + port: u16, + is_self_node: bool, + enabled_services: &[&str], + indexing_tasks: &[IndexingTask], + ingester_status: Option, ) -> Self { use quickwit_common::tower::{ClientGrpcConfig, make_channel}; @@ -76,6 +102,9 @@ impl ClusterNode { node_state.set(ENABLED_SERVICES_KEY, enabled_services.join(",")); node_state.set(GRPC_ADVERTISE_ADDR_KEY, grpc_advertise_addr.to_string()); set_indexing_tasks_in_node_state(indexing_tasks, &mut node_state); + if let Some(status) = ingester_status { + node_state.set(INGESTER_STATUS_KEY, status.as_json_str_name()); + } Self::try_new(chitchat_id, &node_state, channel, is_self_node).unwrap() } @@ -125,6 +154,10 @@ impl ClusterNode { self.inner.indexing_capacity } + pub fn ingester_status(&self) -> Option { + self.inner.ingester_status + } + pub fn is_ready(&self) -> bool { self.inner.is_ready } @@ -151,6 +184,7 @@ impl PartialEq for ClusterNode { && self.inner.enabled_services == other.inner.enabled_services && self.inner.grpc_advertise_addr == other.inner.grpc_advertise_addr && self.inner.indexing_tasks == other.inner.indexing_tasks + && self.inner.ingester_status == other.inner.ingester_status && self.inner.is_ready == other.inner.is_ready && self.inner.is_self_node == other.inner.is_self_node } @@ -163,6 +197,30 @@ struct InnerNode { grpc_advertise_addr: SocketAddr, indexing_tasks: Vec, indexing_capacity: CpuCapacity, + ingester_status: Option, is_ready: bool, is_self_node: bool, } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_cluster_node_ingester_status_from_chitchat() { + let node = ClusterNode::for_test_with_ingester_status( + "test-node", + 1337, + false, + &["indexer"], + &[], + Some(IngesterStatus::Decommissioning), + ) + .await; + assert_eq!( + node.ingester_status(), + Some(IngesterStatus::Decommissioning) + ); + } + +} diff --git a/quickwit/quickwit-common/src/shared_consts.rs b/quickwit/quickwit-common/src/shared_consts.rs index 9923705f0b2..f76b86d1436 100644 --- a/quickwit/quickwit-common/src/shared_consts.rs +++ b/quickwit/quickwit-common/src/shared_consts.rs @@ -64,6 +64,9 @@ pub const SCROLL_BATCH_LEN: usize = 1_000; /// Prefix used in chitchat to broadcast the list of primary shards hosted by a leader. pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:"; +/// Key used in chitchat to broadcast the ingester status (lifecycle state). +pub const INGESTER_STATUS_KEY: &str = "ingester.status"; + /// File name for the encoded list of fields in the split pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields"; diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index e4c6995d639..8896c6b7a99 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -43,6 +43,7 @@ use quickwit_proto::control_plane::{ GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSubrequest, }; use quickwit_proto::indexing::ShardPositionsUpdate; +use quickwit_proto::ingest::ingester::IngesterStatus; use quickwit_proto::metastore::{ AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteIndexRequest, DeleteShardsRequest, DeleteSourceRequest, EmptyResponse, FindIndexTemplateMatchesRequest, @@ -198,6 +199,7 @@ pub struct ControlPlaneObservableState { pub ingest_controller: IngestControllerStats, pub num_indexes: usize, pub num_sources: usize, + pub num_decommissioning_ingesters: usize, pub readiness: bool, } @@ -215,6 +217,7 @@ impl Actor for ControlPlane { ingest_controller: self.ingest_controller.stats, num_indexes: self.model.num_indexes(), num_sources: self.model.num_sources(), + num_decommissioning_ingesters: self.model.decommissioning_ingesters().len(), readiness: *self.readiness_tx.borrow(), } } @@ -1083,10 +1086,13 @@ impl Handler for ControlPlane { message: IndexerLeft, ctx: &ActorContext, ) -> Result { + let node_id: NodeId = message.0.node_id().into(); info!( "indexer `{}` left the cluster: rebalancing shards and rebuilding indexing plan", - message.0.node_id() + node_id ); + // Clean up decommissioning state for the departed node. + self.model.remove_decommissioning_ingester(&node_id); // TODO: Update shard table. if let Err(metastore_error) = self .ingest_controller @@ -1100,6 +1106,51 @@ impl Handler for ControlPlane { } } +/// An ingester node has changed its status (e.g. transitioning to decommissioning). +#[derive(Debug)] +struct IngesterStatusChanged { + node_id: NodeId, + status: IngesterStatus, +} + +#[async_trait] +impl Handler for ControlPlane { + type Reply = (); + + async fn handle( + &mut self, + message: IngesterStatusChanged, + _ctx: &ActorContext, + ) -> Result { + match message.status { + IngesterStatus::Decommissioning => { + if self + .model + .add_decommissioning_ingester(message.node_id.clone()) + { + info!( + ingester_id=%message.node_id, + "ingester is decommissioning: excluding from shard allocation" + ); + } + } + _ => { + if self + .model + .remove_decommissioning_ingester(&message.node_id) + { + info!( + ingester_id=%message.node_id, + status=%message.status.as_json_str_name(), + "ingester is no longer decommissioning" + ); + } + } + } + Ok(()) + } +} + #[async_trait] impl Handler for ControlPlane { type Reply = (); @@ -1144,6 +1195,17 @@ async fn watcher_indexers( }; match cluster_change { ClusterChange::Add(node) => { + // If a node joins (or transitions to ready) while already decommissioning, + // immediately notify the control plane. + if node.ingester_status() == Some(IngesterStatus::Decommissioning) { + let node_id: NodeId = node.node_id().into(); + if let Err(error) = mailbox + .send_message(IngesterStatusChanged { node_id, status: IngesterStatus::Decommissioning }) + .await + { + error!(%error, "failed to forward `IngesterStatusChanged` event to control plane"); + } + } if node.enabled_services().contains(&QuickwitService::Indexer) && let Err(error) = mailbox.send_message(IndexerJoined(node)).await { @@ -1157,8 +1219,16 @@ async fn watcher_indexers( error!(%error, "failed to forward `IndexerLeft` event to control plane"); } } - ClusterChange::Update(_) => { - // We are not interested in updates (yet). + ClusterChange::Update(node) => { + if let Some(status) = node.ingester_status() { + let node_id: NodeId = node.node_id().into(); + if let Err(error) = mailbox + .send_message(IngesterStatusChanged { node_id, status }) + .await + { + error!(%error, "failed to forward `IngesterStatusChanged` event to control plane"); + } + } } } } @@ -2443,18 +2513,21 @@ mod tests { let cluster_change_stream_tx = cluster_change_stream_factory.change_stream_tx(); + // Non-indexer nodes are ignored. let metastore_node = ClusterNode::for_test("test-metastore", 1337, false, &["metastore"], &[]).await; - let cluster_change = ClusterChange::Add(metastore_node); - cluster_change_stream_tx.send(cluster_change).unwrap(); + cluster_change_stream_tx + .send(ClusterChange::Add(metastore_node)) + .unwrap(); let indexer_node = ClusterNode::for_test("test-indexer", 1515, false, &["indexer"], &[]).await; - let cluster_change = ClusterChange::Add(indexer_node.clone()); - cluster_change_stream_tx.send(cluster_change).unwrap(); - - let cluster_change = ClusterChange::Remove(indexer_node.clone()); - cluster_change_stream_tx.send(cluster_change).unwrap(); + cluster_change_stream_tx + .send(ClusterChange::Add(indexer_node.clone())) + .unwrap(); + cluster_change_stream_tx + .send(ClusterChange::Remove(indexer_node.clone())) + .unwrap(); let IndexerJoined(joined) = control_plane_inbox.recv_typed_message().await.unwrap(); assert_eq!(joined.grpc_advertise_addr().port(), 1516); @@ -2462,6 +2535,38 @@ mod tests { let IndexerLeft(left) = control_plane_inbox.recv_typed_message().await.unwrap(); assert_eq!(left.grpc_advertise_addr().port(), 1516); + // An Update with ingester status emits IngesterStatusChanged. + let decommissioning_node = ClusterNode::for_test_with_ingester_status( + "test-indexer", + 1515, + false, + &["indexer"], + &[], + Some(IngesterStatus::Decommissioning), + ) + .await; + cluster_change_stream_tx + .send(ClusterChange::Update(decommissioning_node.clone())) + .unwrap(); + + let msg: IngesterStatusChanged = + control_plane_inbox.recv_typed_message().await.unwrap(); + assert_eq!(msg.node_id, NodeId::from("test-indexer")); + assert_eq!(msg.status, IngesterStatus::Decommissioning); + + // An Add with ingester status emits both IngesterStatusChanged and IndexerJoined. + cluster_change_stream_tx + .send(ClusterChange::Add(decommissioning_node)) + .unwrap(); + + let msg: IngesterStatusChanged = + control_plane_inbox.recv_typed_message().await.unwrap(); + assert_eq!(msg.node_id, NodeId::from("test-indexer")); + assert_eq!(msg.status, IngesterStatus::Decommissioning); + + let IndexerJoined(joined) = control_plane_inbox.recv_typed_message().await.unwrap(); + assert_eq!(joined.grpc_advertise_addr().port(), 1516); + universe.assert_quit().await; } @@ -2810,4 +2915,119 @@ mod tests { universe.assert_quit().await; } + + #[tokio::test] + async fn test_control_plane_decommissioning_lifecycle() { + let universe = Universe::with_accelerated_time(); + + let cluster_config = ClusterConfig::for_test(); + let node_id = NodeId::from("test-control-plane"); + let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default(); + + let indexer_pool = IndexerPool::default(); + let ingester_pool = IngesterPool::default(); + let mut mock_metastore = MockMetastoreService::new(); + mock_metastore + .expect_list_indexes_metadata() + .return_once(|_| Ok(ListIndexesMetadataResponse::for_test(Vec::new()))); + mock_metastore + .expect_delete_shards() + .returning(|request| { + Ok(DeleteShardsResponse { + index_uid: request.index_uid, + source_id: request.source_id, + successes: request.shard_ids, + failures: Vec::new(), + }) + }); + let metastore = MetastoreServiceClient::from_mock(mock_metastore); + let disable_control_loop = true; + let (_control_plane_mailbox, control_plane_handle, _readiness_rx) = + ControlPlane::spawn_inner( + &universe, + cluster_config, + node_id, + cluster_change_stream_factory.clone(), + indexer_pool, + ingester_pool, + metastore, + disable_control_loop, + ); + let cluster_change_stream_tx = cluster_change_stream_factory.change_stream_tx(); + + macro_rules! num_decommissioning { + () => { + control_plane_handle + .process_pending_and_observe() + .await + .state_opt + .as_ref() + .unwrap() + .num_decommissioning_ingesters + }; + } + + // Mark ingester as decommissioning. + let node = ClusterNode::for_test_with_ingester_status( + "test-ingester-0", + 1515, + false, + &["indexer"], + &[], + Some(IngesterStatus::Decommissioning), + ) + .await; + cluster_change_stream_tx + .send(ClusterChange::Update(node.clone())) + .unwrap(); + universe.sleep(Duration::from_secs(1)).await; + assert_eq!(num_decommissioning!(), 1); + + // Sending it again is idempotent. + cluster_change_stream_tx + .send(ClusterChange::Update(node)) + .unwrap(); + universe.sleep(Duration::from_secs(1)).await; + assert_eq!(num_decommissioning!(), 1); + + // Transitioning back to Ready removes the decommissioning state. + let ready_node = ClusterNode::for_test_with_ingester_status( + "test-ingester-0", + 1515, + false, + &["indexer"], + &[], + Some(IngesterStatus::Ready), + ) + .await; + cluster_change_stream_tx + .send(ClusterChange::Update(ready_node)) + .unwrap(); + universe.sleep(Duration::from_secs(1)).await; + assert_eq!(num_decommissioning!(), 0); + + // Add a decommissioning node, then remove it — should clean up. + let node = ClusterNode::for_test_with_ingester_status( + "test-ingester-1", + 1516, + false, + &["indexer"], + &[], + Some(IngesterStatus::Decommissioning), + ) + .await; + cluster_change_stream_tx + .send(ClusterChange::Add(node.clone())) + .unwrap(); + universe.sleep(Duration::from_secs(1)).await; + assert_eq!(num_decommissioning!(), 1); + + cluster_change_stream_tx + .send(ClusterChange::Remove(node)) + .unwrap(); + universe.sleep(Duration::from_secs(1)).await; + assert_eq!(num_decommissioning!(), 0); + + universe.assert_quit().await; + } } diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 63295d61eca..b134f647818 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -459,6 +459,8 @@ impl IngestController { .unavailable_leaders .into_iter() .map(NodeId::from) + // Also exclude ingesters that have announced they are decommissioning via chitchat. + .chain(model.decommissioning_ingesters().iter().cloned()) .collect(); // We do a first pass to identify the shards that are missing from the model and need to be @@ -699,8 +701,10 @@ impl IngestController { let new_num_open_shards = shard_stats.num_open_shards + num_shards_to_open; let new_shards_per_source: HashMap = HashMap::from_iter([(source_uid.clone(), num_shards_to_open)]); + let unavailable_leaders: FnvHashSet = + model.decommissioning_ingesters().iter().cloned().collect(); let successful_source_uids_res = self - .try_open_shards(new_shards_per_source, model, &Default::default(), progress) + .try_open_shards(new_shards_per_source, model, &unavailable_leaders, progress) .await; match successful_source_uids_res { @@ -1037,11 +1041,13 @@ impl IngestController { .or_default() += 1; } + let unavailable_leaders: FnvHashSet = + model.decommissioning_ingesters().iter().cloned().collect(); let mut per_source_num_opened_shards: HashMap = self .try_open_shards( per_source_num_shards_to_open, model, - &Default::default(), + &unavailable_leaders, progress, ) .await @@ -1114,10 +1120,12 @@ impl IngestController { /// The closing operation can only be done by the leader of that shard. /// For these reason, we exclude these shards from the rebalance process. fn compute_shards_to_rebalance(&self, model: &ControlPlaneModel) -> Vec { + let decommissioning_ingesters = model.decommissioning_ingesters(); let mut per_available_ingester_shards: HashMap> = self .ingester_pool .keys() .into_iter() + .filter(|ingester_id| !decommissioning_ingesters.contains(ingester_id)) .map(|ingester_id| (ingester_id, Vec::new())) .collect(); @@ -3704,4 +3712,188 @@ mod tests { test_compute_shards_to_rebalance_aux(&[0, 1], &[1]); test_compute_shards_to_rebalance_aux(&[0, 1, 2], &[3, 4]); } + + #[test] + fn test_compute_shards_to_rebalance_excludes_decommissioning_ingesters() { + let index_metadata = + IndexMetadata::for_test("test-index", "ram://indexes/test-index"); + let index_uid = index_metadata.index_uid.clone(); + let source_id: SourceId = "test-source".to_string(); + + let mut model = ControlPlaneModel::default(); + model.add_index(index_metadata); + let mut source_config = SourceConfig::ingest_v2(); + source_config.source_id = source_id.clone(); + model.add_source(&index_uid, source_config).unwrap(); + + let ingester_pool = IngesterPool::default(); + let ingester = IngesterServiceClient::from_mock(MockIngesterService::new()); + ingester_pool.insert(NodeId::from("ingester-0"), ingester.clone()); + ingester_pool.insert(NodeId::from("ingester-1"), ingester); + + // Ingester-0 has 5 open shards, ingester-1 has 0. + let shards: Vec = (0..5) + .map(|i| Shard { + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + shard_id: Some(ShardId::from(i as u64)), + leader_id: "ingester-0".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }) + .collect(); + model.insert_shards(&index_uid, &source_id, shards); + + let controller = IngestController::new( + MetastoreServiceClient::mocked(), + ingester_pool, + 1, + TEST_SHARD_THROUGHPUT_LIMIT_MIB, + 1.001, + ); + + // Both ingesters available: rebalancing should propose moves. + assert!(!controller.compute_shards_to_rebalance(&model).is_empty()); + + // Mark the target ingester as decommissioning: no rebalancing. + model.add_decommissioning_ingester(NodeId::from("ingester-1")); + assert!(controller.compute_shards_to_rebalance(&model).is_empty()); + } + + #[tokio::test] + async fn test_get_or_create_open_shards_excludes_decommissioning_ingesters() { + let source_id: &'static str = "test-source"; + let mut index_metadata = + IndexMetadata::for_test("test-index-0", "ram://indexes/test-index-0"); + let index_uid = index_metadata.index_uid.clone(); + let doc_mapping_uid = DocMappingUid::random(); + index_metadata.index_config.doc_mapping.doc_mapping_uid = doc_mapping_uid; + + let progress = Progress::default(); + + let mut mock_metastore = MockMetastoreService::new(); + let index_uid_clone = index_uid.clone(); + mock_metastore.expect_open_shards().returning(move |request| { + // The shard must be assigned to ingester-1 (ingester-0 is decommissioning). + assert_eq!(request.subrequests[0].leader_id, "test-ingester-1"); + + let subresponses = vec![OpenShardSubresponse { + subrequest_id: 0, + open_shard: Some(Shard { + index_uid: index_uid_clone.clone().into(), + source_id: source_id.to_string(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester-1".to_string(), + doc_mapping_uid: Some(doc_mapping_uid), + ..Default::default() + }), + }]; + Ok(metastore::OpenShardsResponse { subresponses }) + }); + let metastore = MetastoreServiceClient::from_mock(mock_metastore); + + let ingester_pool = IngesterPool::default(); + ingester_pool.insert( + NodeId::from("test-ingester-0"), + IngesterServiceClient::from_mock(MockIngesterService::new()), + ); + + let mut mock_ingester_1 = MockIngesterService::new(); + mock_ingester_1 + .expect_init_shards() + .returning(|request| { + let shard = request.subrequests[0].shard().clone(); + Ok(InitShardsResponse { + successes: vec![InitShardSuccess { + subrequest_id: request.subrequests[0].subrequest_id, + shard: Some(shard), + }], + failures: Vec::new(), + }) + }); + ingester_pool.insert( + NodeId::from("test-ingester-1"), + IngesterServiceClient::from_mock(mock_ingester_1), + ); + + let mut controller = IngestController::new( + metastore, + ingester_pool, + 1, + TEST_SHARD_THROUGHPUT_LIMIT_MIB, + 1.001, + ); + + let mut model = ControlPlaneModel::default(); + model.add_index(index_metadata); + let mut source_config = SourceConfig::ingest_v2(); + source_config.source_id = source_id.to_string(); + model.add_source(&index_uid, source_config).unwrap(); + model.add_decommissioning_ingester(NodeId::from("test-ingester-0")); + + let request = GetOrCreateOpenShardsRequest { + subrequests: vec![GetOrCreateOpenShardsSubrequest { + subrequest_id: 0, + index_id: "test-index-0".to_string(), + source_id: source_id.to_string(), + }], + closed_shards: Vec::new(), + unavailable_leaders: Vec::new(), + }; + let response = controller + .get_or_create_open_shards(request, &mut model, &progress) + .await + .unwrap(); + assert_eq!(response.successes.len(), 1); + assert_eq!(response.successes[0].open_shards[0].leader_id, "test-ingester-1"); + } + + #[tokio::test] + async fn test_try_scale_up_shards_excludes_decommissioning() { + let mut mock_metastore = MockMetastoreService::new(); + mock_metastore.expect_open_shards().never(); + let metastore = MetastoreServiceClient::from_mock(mock_metastore); + + let ingester_pool = IngesterPool::default(); + ingester_pool.insert( + "test-ingester".into(), + IngesterServiceClient::from_mock(MockIngesterService::new()), + ); + + let mut controller = IngestController::new( + metastore, + ingester_pool, + 1, + TEST_SHARD_THROUGHPUT_LIMIT_MIB, + 1.001, + ); + + let index_uid = IndexUid::for_test("test-index", 0); + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: INGEST_V2_SOURCE_ID.to_string(), + }; + let shard_stats = ShardStats { + num_open_shards: 2, + ..Default::default() + }; + + let mut model = ControlPlaneModel::default(); + model.add_index(IndexMetadata::for_test( + &index_uid.index_id, + "ram://indexes/test-index:0", + )); + model + .add_source(&index_uid, SourceConfig::ingest_v2()) + .unwrap(); + model.add_decommissioning_ingester(NodeId::from("test-ingester")); + + let progress = Progress::default(); + controller + .try_scale_up_shards(source_uid, shard_stats, &mut model, &progress, 1) + .await + .unwrap(); + assert_eq!(model.all_shards().count(), 0); + } } diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index 0d0431a67ce..679d6ef1e0c 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -53,6 +53,9 @@ pub(crate) struct ControlPlaneModel { index_uid_table: FnvHashMap, index_table: FnvHashMap, shard_table: ShardTable, + /// Set of ingester node IDs that have announced they are decommissioning via chitchat. + /// These ingesters are excluded from shard allocation. + decommissioning_ingesters: FnvHashSet, } impl ControlPlaneModel { @@ -61,6 +64,26 @@ impl ControlPlaneModel { *self = Default::default(); } + pub fn add_decommissioning_ingester(&mut self, node_id: NodeId) -> bool { + debug!( + node_id=%node_id, + "adding node as as decommissioning" + ); + self.decommissioning_ingesters.insert(node_id) + } + + pub fn remove_decommissioning_ingester(&mut self, node_id: &NodeId) -> bool { + debug!( + node_id=%node_id, + "removing node from decommissioning" + ); + self.decommissioning_ingesters.remove(node_id) + } + + pub fn decommissioning_ingesters(&self) -> &FnvHashSet { + &self.decommissioning_ingesters + } + pub fn num_indexes(&self) -> usize { self.index_table.len() } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 65c268881ac..288e42d8222 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -141,7 +141,7 @@ impl Ingester { idle_shard_timeout: Duration, ) -> IngestV2Result { let self_node_id: NodeId = cluster.self_node_id().into(); - let state = IngesterState::load(wal_dir_path, rate_limiter_settings); + let state = IngesterState::load(wal_dir_path, rate_limiter_settings, cluster.clone()); let weak_state = state.weak(); BroadcastLocalShardsTask::spawn(cluster, weak_state.clone()); @@ -1262,6 +1262,8 @@ pub async fn wait_for_ingester_decommission(ingester: Ingester) -> anyhow::Resul .await .context("failed to initiate ingester decommission")?; + ingester.state.cluster.wait_for_ingester_status_propagation().await; + wait_for_ingester_status(ingester, IngesterStatus::Decommissioned).await?; info!( diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index 591ef4f704f..099eb063278 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -20,8 +20,10 @@ use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use mrecordlog::error::{DeleteQueueError, TruncateError}; +use quickwit_cluster::Cluster; use quickwit_common::pretty::PrettyDisplay; use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; +use quickwit_common::shared_consts::INGESTER_STATUS_KEY; use quickwit_doc_mapper::DocMapper; use quickwit_proto::control_plane::AdviseResetShardsResponse; use quickwit_proto::ingest::ingester::IngesterStatus; @@ -48,6 +50,7 @@ pub(super) struct IngesterState { // `inner` is a mutex because it's almost always accessed mutably. inner: Arc>, mrecordlog: Arc>>, + pub(super) cluster: Cluster, pub status_rx: watch::Receiver, } @@ -60,6 +63,7 @@ pub(super) struct InnerIngesterState { pub replication_tasks: HashMap, status: IngesterStatus, status_tx: watch::Sender, + cluster: Cluster, } impl InnerIngesterState { @@ -70,6 +74,14 @@ impl InnerIngesterState { pub fn set_status(&mut self, status: IngesterStatus) { self.status = status; self.status_tx.send(status).expect("channel should be open"); + + // Broadcast status to chitchat asynchronously + let cluster = self.cluster.clone(); + let status_str = status.as_json_str_name().to_string(); + tokio::spawn(async move { + cluster.set_self_key_value(INGESTER_STATUS_KEY, &status_str).await; + info!(status=%status_str, "broadcasted ingester status to chitchat"); + }); } /// Returns the shard with the most available permits for this index and source. @@ -90,7 +102,7 @@ impl InnerIngesterState { } impl IngesterState { - fn new() -> Self { + fn new(cluster: Cluster) -> Self { let status = IngesterStatus::Initializing; let (status_tx, status_rx) = watch::channel(status); let inner = InnerIngesterState { @@ -100,6 +112,7 @@ impl IngesterState { replication_tasks: Default::default(), status, status_tx, + cluster: cluster.clone(), }; let inner = Arc::new(Mutex::new(inner)); let mrecordlog = Arc::new(RwLock::new(None)); @@ -107,12 +120,13 @@ impl IngesterState { Self { inner, mrecordlog, + cluster, status_rx, } } - pub fn load(wal_dir_path: &Path, rate_limiter_settings: RateLimiterSettings) -> Self { - let state = Self::new(); + pub fn load(wal_dir_path: &Path, rate_limiter_settings: RateLimiterSettings, cluster: Cluster) -> Self { + let state = Self::new(cluster); let state_clone = state.clone(); let wal_dir_path = wal_dir_path.to_path_buf(); @@ -126,8 +140,14 @@ impl IngesterState { #[cfg(test)] pub async fn for_test() -> (tempfile::TempDir, Self) { + use quickwit_cluster::{ChannelTransport, create_cluster_for_test}; + let temp_dir = tempfile::tempdir().unwrap(); - let mut state = IngesterState::load(temp_dir.path(), RateLimiterSettings::default()); + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &[], &transport, false) + .await + .unwrap(); + let mut state = IngesterState::load(temp_dir.path(), RateLimiterSettings::default(), cluster); state .status_rx @@ -294,6 +314,7 @@ impl IngesterState { WeakIngesterState { inner: Arc::downgrade(&self.inner), mrecordlog: Arc::downgrade(&self.mrecordlog), + cluster: self.cluster.clone(), status_rx: self.status_rx.clone(), } } @@ -447,6 +468,7 @@ impl FullyLockedIngesterState<'_> { pub(super) struct WeakIngesterState { inner: Weak>, mrecordlog: Weak>>, + cluster: Cluster, status_rx: watch::Receiver, } @@ -454,10 +476,12 @@ impl WeakIngesterState { pub fn upgrade(&self) -> Option { let inner = self.inner.upgrade()?; let mrecordlog = self.mrecordlog.upgrade()?; + let cluster = self.cluster.clone(); let status_rx = self.status_rx.clone(); let state = IngesterState { inner, mrecordlog, + cluster, status_rx, }; Some(state) @@ -467,6 +491,7 @@ impl WeakIngesterState { #[cfg(test)] mod tests { use bytesize::ByteSize; + use quickwit_cluster::{ChannelTransport, create_cluster_for_test}; use quickwit_proto::types::ShardId; use tokio::time::timeout; @@ -474,7 +499,11 @@ mod tests { #[tokio::test] async fn test_ingester_state_does_not_lock_while_initializing() { - let state = IngesterState::new(); + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &[], &transport, false) + .await + .unwrap(); + let state = IngesterState::new(cluster); let inner_guard = state.inner.lock().await; assert_eq!(inner_guard.status(), IngesterStatus::Initializing); @@ -489,7 +518,11 @@ mod tests { #[tokio::test] async fn test_ingester_state_failed() { - let state = IngesterState::new(); + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &[], &transport, false) + .await + .unwrap(); + let state = IngesterState::new(cluster); state.inner.lock().await.set_status(IngesterStatus::Failed); @@ -502,7 +535,11 @@ mod tests { #[tokio::test] async fn test_ingester_state_init() { - let mut state = IngesterState::new(); + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &[], &transport, false) + .await + .unwrap(); + let mut state = IngesterState::new(cluster); let temp_dir = tempfile::tempdir().unwrap(); state @@ -642,4 +679,36 @@ mod tests { locked_state.find_most_capacity_shard_mut(&index_uid, &SourceId::from("other-source")); assert!(shard_opt.is_none()); } + + #[tokio::test] + async fn test_set_status_broadcasts_to_chitchat() { + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &[], &transport, false) + .await + .unwrap(); + let state = IngesterState::new(cluster.clone()); + + // Set status to Decommissioning. + state + .inner + .lock() + .await + .set_status(IngesterStatus::Decommissioning); + + // Let the spawned task run. + tokio::task::yield_now().await; + tokio::time::sleep(Duration::from_millis(50)).await; + + let value = cluster.get_self_key_value(INGESTER_STATUS_KEY).await; + assert_eq!(value.as_deref(), Some("decommissioning")); + + // Set status to Ready. + state.inner.lock().await.set_status(IngesterStatus::Ready); + + tokio::task::yield_now().await; + tokio::time::sleep(Duration::from_millis(50)).await; + + let value = cluster.get_self_key_value(INGESTER_STATUS_KEY).await; + assert_eq!(value.as_deref(), Some("ready")); + } } diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index 5277e0ac740..e278fb8bece 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -351,7 +351,7 @@ impl ClusterSandbox { TraceServiceClient::new(self.channel(QuickwitService::Indexer)) } - async fn wait_for_cluster_num_ready_nodes( + pub async fn wait_for_cluster_num_ready_nodes( &self, expected_num_ready_nodes: usize, ) -> anyhow::Result<()> { @@ -596,4 +596,21 @@ impl ClusterSandbox { self.shutdown_services(QuickwitService::supported_services()) .await } + + /// Remove a node from the sandbox and return its shutdown handle. + /// After this call, `rest_client` and other lookup methods skip the removed + /// node, so callers can trigger shutdown concurrently with other sandbox + /// operations. + pub fn remove_node_with_service( + &mut self, + service: QuickwitService, + ) -> NodeShutdownHandle { + let idx = self + .node_shutdown_handles + .iter() + .position(|h| h.node_services.contains(&service)) + .unwrap_or_else(|| panic!("no node with service {service:?}")); + self.node_configs.remove(idx); + self.node_shutdown_handles.remove(idx) + } } diff --git a/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs b/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs index 4a5e29c9565..3169dfaf28a 100644 --- a/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs @@ -856,3 +856,139 @@ async fn test_shutdown_indexer_first() { .unwrap() .unwrap(); } + +/// Tests that the graceful shutdown sequence works correctly in a multi-indexer +/// cluster: shutting down one indexer does NOT cause 500 errors or data loss, +/// and the cluster eventually rebalances. see #6158 +#[tokio::test] +async fn test_graceful_shutdown_no_data_loss() { + let mut sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Indexer]) + .add_node([QuickwitService::Indexer]) + .add_node([ + QuickwitService::ControlPlane, + QuickwitService::Searcher, + QuickwitService::Metastore, + QuickwitService::Janitor, + ]) + .build_and_start() + .await; + let index_id = "test_graceful_shutdown_no_data_loss"; + + // Create index with a long commit timeout so documents stay uncommitted + // in the ingesters' WAL. The decommission sequence should commit + // them before the indexer quits. + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .create( + format!( + r#" + version: 0.8 + index_id: {index_id} + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 5 + "# + ), + ConfigFormat::Yaml, + false, + ) + .await + .unwrap(); + + // Ingest docs with auto-commit. With a 5s commit timeout, these documents + // sit uncommitted in the ingesters' WAL - exactly the in-flight state we + // want to exercise during draining. + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "before-shutdown-1"}), + CommitType::Auto, + ) + .await + .unwrap(); + + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "before-shutdown-2"}), + CommitType::Auto, + ) + .await + .unwrap(); + + // Remove the first indexer node from the sandbox and get its shutdown handle. + // After this call, rest_client(Indexer) returns the second (surviving) indexer. + let shutdown_handle = sandbox.remove_node_with_service(QuickwitService::Indexer); + + // Concurrently: shut down the removed indexer AND ingest more data via the + // surviving indexer. This verifies the cluster stays operational and the + // router on the surviving node does not return 500 errors while one indexer + // is decommissioning. The control plane excludes the decommissioning + // ingester from shard allocation, so new shards go to the surviving one. + let ingest_client = sandbox.rest_client(QuickwitService::Indexer); + let (shutdown_result, ingest_result) = tokio::join!( + async { + tokio::time::timeout(Duration::from_secs(30), shutdown_handle.shutdown()) + .await + .expect("indexer shutdown timed out — decommission may be stuck") + }, + async { + // Small delay so the decommission sequence has started before we ingest. + tokio::time::sleep(Duration::from_millis(200)).await; + ingest( + &ingest_client, + index_id, + ingest_json!({"body": "during-shutdown"}), + CommitType::Auto, + ) + .await + }, + ); + shutdown_result.expect("indexer shutdown failed"); + ingest_result.expect("ingest during shutdown should succeed (no 500 errors)"); + + // All 3 documents should eventually be searchable. Documents 1 & 2 were + // in-flight on the decommissioning indexer and should have been committed during + // the decommission step. Document 3 was ingested to the surviving indexer. + wait_until_predicate( + || async { + match sandbox + .rest_client(QuickwitService::Searcher) + .search( + index_id, + quickwit_serve::SearchRequestQueryString { + query: "*".to_string(), + max_hits: 10, + ..Default::default() + }, + ) + .await + { + Ok(resp) => resp.num_hits == 3, + Err(_) => false, + } + }, + Duration::from_secs(30), + Duration::from_millis(500), + ) + .await + .expect("expected 3 documents after decommission shutdown, some data may have been lost"); + + // Verify the cluster sees 2 ready nodes (the surviving indexer + the + // control-plane/searcher/metastore/janitor node). + sandbox + .wait_for_cluster_num_ready_nodes(2) + .await + .expect("cluster should see 2 ready nodes after indexer shutdown"); + + // Clean shutdown of the remaining nodes. + tokio::time::timeout(Duration::from_secs(15), sandbox.shutdown()) + .await + .unwrap() + .unwrap(); +} diff --git a/quickwit/quickwit-proto/src/ingest/ingester.rs b/quickwit/quickwit-proto/src/ingest/ingester.rs index d2da3f8d9bd..8ade5f5ee49 100644 --- a/quickwit/quickwit-proto/src/ingest/ingester.rs +++ b/quickwit/quickwit-proto/src/ingest/ingester.rs @@ -77,6 +77,18 @@ impl IngesterStatus { Self::Failed => "failed", } } + + pub fn from_json_str_name(ingester_status_json_name: &str) -> Option { + match ingester_status_json_name { + "unspecified" => Some(Self::Unspecified), + "initializing" => Some(Self::Initializing), + "ready" => Some(Self::Ready), + "decommissioning" => Some(Self::Decommissioning), + "decommissioned" => Some(Self::Decommissioned), + "failed" => Some(Self::Failed), + _ => None, + } + } } impl OpenFetchStreamRequest { @@ -184,3 +196,25 @@ impl TruncateShardsSubrequest { queue_id(self.index_uid(), &self.source_id, self.shard_id()) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ingester_status_from_json_str_name_round_trip() { + let all_variants = [ + IngesterStatus::Unspecified, + IngesterStatus::Initializing, + IngesterStatus::Ready, + IngesterStatus::Decommissioning, + IngesterStatus::Decommissioned, + IngesterStatus::Failed, + ]; + for variant in all_variants { + let json_name = variant.as_json_str_name(); + let parsed = IngesterStatus::from_json_str_name(json_name); + assert_eq!(parsed, Some(variant), "round-trip failed for {json_name}"); + } + } +}