Skip to content
9 changes: 9 additions & 0 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,15 @@ impl Cluster {
tokio::time::sleep(self.gossip_interval * 2).await;
}

pub async fn wait_for_ingester_status_propagation(&self) {
let propagation_delay = self.gossip_interval * 2;
info!(
"waiting {:?} for cluster to propagate ingester status change",
propagation_delay
);
tokio::time::sleep(propagation_delay).await;
}

pub async fn initiate_shutdown(&self) -> anyhow::Result<()> {
self.inner.read().await.chitchat_handle.initiate_shutdown()
}
Expand Down
58 changes: 58 additions & 0 deletions quickwit/quickwit-cluster/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ use std::net::SocketAddr;
use std::sync::Arc;

use chitchat::{ChitchatId, NodeState};
use quickwit_common::shared_consts::INGESTER_STATUS_KEY;
use quickwit_config::service::QuickwitService;
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use quickwit_proto::ingest::ingester::IngesterStatus;
use quickwit_proto::types::NodeIdRef;
use tonic::transport::Channel;

Expand All @@ -39,13 +41,17 @@ impl ClusterNode {
is_self_node: bool,
) -> anyhow::Result<Self> {
let member = build_cluster_member(chitchat_id.clone(), node_state)?;
let ingester_status = node_state
.get(INGESTER_STATUS_KEY)
.and_then(IngesterStatus::from_json_str_name);
let inner = InnerNode {
chitchat_id,
channel,
enabled_services: member.enabled_services,
grpc_advertise_addr: member.grpc_advertise_addr,
indexing_tasks: member.indexing_tasks,
indexing_capacity: member.indexing_cpu_capacity,
ingester_status,
is_ready: member.is_ready,
is_self_node,
};
Expand All @@ -62,6 +68,26 @@ impl ClusterNode {
is_self_node: bool,
enabled_services: &[&str],
indexing_tasks: &[IndexingTask],
) -> Self {
Self::for_test_with_ingester_status(
node_id,
port,
is_self_node,
enabled_services,
indexing_tasks,
None,
)
.await
}

#[cfg(any(test, feature = "testsuite"))]
pub async fn for_test_with_ingester_status(
node_id: &str,
port: u16,
is_self_node: bool,
enabled_services: &[&str],
indexing_tasks: &[IndexingTask],
ingester_status: Option<IngesterStatus>,
) -> Self {
use quickwit_common::tower::{ClientGrpcConfig, make_channel};

Expand All @@ -76,6 +102,9 @@ impl ClusterNode {
node_state.set(ENABLED_SERVICES_KEY, enabled_services.join(","));
node_state.set(GRPC_ADVERTISE_ADDR_KEY, grpc_advertise_addr.to_string());
set_indexing_tasks_in_node_state(indexing_tasks, &mut node_state);
if let Some(status) = ingester_status {
node_state.set(INGESTER_STATUS_KEY, status.as_json_str_name());
}
Self::try_new(chitchat_id, &node_state, channel, is_self_node).unwrap()
}

Expand Down Expand Up @@ -125,6 +154,10 @@ impl ClusterNode {
self.inner.indexing_capacity
}

pub fn ingester_status(&self) -> Option<IngesterStatus> {
self.inner.ingester_status
}

pub fn is_ready(&self) -> bool {
self.inner.is_ready
}
Expand All @@ -151,6 +184,7 @@ impl PartialEq for ClusterNode {
&& self.inner.enabled_services == other.inner.enabled_services
&& self.inner.grpc_advertise_addr == other.inner.grpc_advertise_addr
&& self.inner.indexing_tasks == other.inner.indexing_tasks
&& self.inner.ingester_status == other.inner.ingester_status
&& self.inner.is_ready == other.inner.is_ready
&& self.inner.is_self_node == other.inner.is_self_node
}
Expand All @@ -163,6 +197,30 @@ struct InnerNode {
grpc_advertise_addr: SocketAddr,
indexing_tasks: Vec<IndexingTask>,
indexing_capacity: CpuCapacity,
ingester_status: Option<IngesterStatus>,
is_ready: bool,
is_self_node: bool,
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_cluster_node_ingester_status_from_chitchat() {
let node = ClusterNode::for_test_with_ingester_status(
"test-node",
1337,
false,
&["indexer"],
&[],
Some(IngesterStatus::Decommissioning),
)
.await;
assert_eq!(
node.ingester_status(),
Some(IngesterStatus::Decommissioning)
);
}

}
3 changes: 3 additions & 0 deletions quickwit/quickwit-common/src/shared_consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ pub const SCROLL_BATCH_LEN: usize = 1_000;
/// Prefix used in chitchat to broadcast the list of primary shards hosted by a leader.
pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:";

/// Key used in chitchat to broadcast the ingester status (lifecycle state).
pub const INGESTER_STATUS_KEY: &str = "ingester.status";

/// File name for the encoded list of fields in the split
pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields";

Expand Down
Loading