Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use quickwit_indexing::models::{
use quickwit_ingest::IngesterPool;
use quickwit_metastore::IndexMetadataResponseExt;
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::ingest::ingester::IngesterStatus;
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::search::{CountHits, SearchResponse};
use quickwit_proto::types::{IndexId, PipelineUid, SourceId, SplitId};
Expand Down Expand Up @@ -936,8 +937,9 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
enabled_services: HashSet::new(),
gossip_advertise_addr: config.gossip_advertise_addr,
grpc_advertise_addr: config.grpc_advertise_addr,
indexing_cpu_capacity: CpuCapacity::zero(),
indexing_tasks: Vec::new(),
indexing_cpu_capacity: CpuCapacity::zero(),
ingester_status: IngesterStatus::default(),
availability_zone: None,
};
let client_grpc_config = make_client_grpc_config(&config.grpc_config)?;
Expand Down
24 changes: 15 additions & 9 deletions quickwit/quickwit-cluster/src/change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use crate::member::NodeStateExt;
#[derive(Debug, Clone)]
pub enum ClusterChange {
Add(ClusterNode),
Update(ClusterNode),
Update {
previous: ClusterNode,
updated: ClusterNode,
},
Remove(ClusterNode),
}

Expand Down Expand Up @@ -246,7 +249,10 @@ async fn compute_cluster_change_events_on_updated(
);
Some(ClusterChange::Remove(updated_node))
} else if previous_node.is_ready() && updated_node.is_ready() {
Some(ClusterChange::Update(updated_node))
Some(ClusterChange::Update {
previous: previous_node,
updated: updated_node,
})
} else {
None
}
Expand Down Expand Up @@ -681,16 +687,16 @@ pub(crate) mod tests {
.await
.unwrap();

let ClusterChange::Update(node) = event else {
let ClusterChange::Update { updated, .. } = event else {
panic!("expected `ClusterChange::Remove` event, got `{event:?}`");
};
assert_eq!(node.chitchat_id(), &updated_chitchat_id);
assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr);
assert!(!node.is_self_node());
assert!(node.is_ready());
assert_eq!(updated.chitchat_id(), &updated_chitchat_id);
assert_eq!(updated.grpc_advertise_addr(), grpc_advertise_addr);
assert!(!updated.is_self_node());
assert!(updated.is_ready());
assert_eq!(
previous_nodes.get(&updated_chitchat_id.node_id).unwrap(),
&node
&updated
);
}
{
Expand Down Expand Up @@ -1009,7 +1015,7 @@ pub(crate) mod tests {
.await;
assert_eq!(events.len(), 1);

let ClusterChange::Update(_node) = events[0].clone() else {
let ClusterChange::Update { .. } = events[0].clone() else {
panic!(
"Expected `ClusterChange::Update` event, got `{:?}`",
events[0]
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@ pub async fn create_cluster_for_test_with_id(
self_node_readiness: bool,
) -> anyhow::Result<Cluster> {
use quickwit_proto::indexing::PIPELINE_FULL_CAPACITY;
use quickwit_proto::ingest::ingester::IngesterStatus;
let gossip_advertise_addr: SocketAddr = ([127, 0, 0, 1], gossip_advertise_port).into();
let self_node = ClusterMember {
node_id,
Expand All @@ -707,6 +708,7 @@ pub async fn create_cluster_for_test_with_id(
grpc_advertise_addr: grpc_addr_from_listen_addr_for_test(gossip_advertise_addr),
indexing_tasks: Vec::new(),
indexing_cpu_capacity: PIPELINE_FULL_CAPACITY,
ingester_status: IngesterStatus::default(),
availability_zone: None,
};
let failure_detector_config = create_failure_detector_config_for_test();
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use quickwit_common::tower::ClientGrpcConfig;
use quickwit_config::service::QuickwitService;
use quickwit_config::{GrpcConfig, NodeConfig, TlsConfig};
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::ingest::ingester::IngesterStatus;
use quickwit_proto::tonic::transport::{Certificate, ClientTlsConfig, Identity};
use time::OffsetDateTime;

Expand Down Expand Up @@ -143,6 +144,7 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result<C
grpc_advertise_addr: node_config.grpc_advertise_addr,
indexing_tasks,
indexing_cpu_capacity,
ingester_status: IngesterStatus::default(),
availability_zone: node_config.availability_zone.clone(),
};
let failure_detector_config = FailureDetectorConfig {
Expand Down
19 changes: 18 additions & 1 deletion quickwit/quickwit-cluster/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use std::str::FromStr;

use anyhow::Context;
use chitchat::{ChitchatId, NodeState, Version};
use quickwit_common::shared_consts::INGESTER_STATUS_KEY;
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use quickwit_proto::ingest::ingester::IngesterStatus;
use quickwit_proto::types::NodeId;
use tracing::{error, warn};

Expand Down Expand Up @@ -47,6 +49,8 @@ pub(crate) trait NodeStateExt {

fn size_bytes(&self) -> usize;

fn ingester_status(&self) -> IngesterStatus;

fn availability_zone(&self) -> Option<String>;
}

Expand Down Expand Up @@ -79,6 +83,12 @@ impl NodeStateExt for NodeState {
.sum()
}

fn ingester_status(&self) -> IngesterStatus {
self.get(INGESTER_STATUS_KEY)
.and_then(IngesterStatus::from_json_str_name)
.unwrap_or_default()
}

fn availability_zone(&self) -> Option<String> {
self.get(AVAILABILITY_ZONE_KEY).map(|az| az.to_string())
}
Expand Down Expand Up @@ -108,6 +118,10 @@ pub struct ClusterMember {
pub indexing_tasks: Vec<IndexingTask>,
/// Indexing cpu capacity of the node expressed in milli cpu.
pub indexing_cpu_capacity: CpuCapacity,
/// Status of the ingester service running on the node. `IngesterStatus::Unspecified` if the
/// node is not an ingester.
pub ingester_status: IngesterStatus,
/// Whether the node is ready to serve requests.
pub is_ready: bool,
/// Availability zone the node is running in, if enabled.
pub availability_zone: Option<String>,
Expand Down Expand Up @@ -159,10 +173,12 @@ pub(crate) fn build_cluster_member(
.map(|enabled_services_str| {
parse_enabled_services_str(enabled_services_str, &chitchat_id.node_id)
})?;
let availability_zone = node_state.availability_zone();
let grpc_advertise_addr = node_state.grpc_advertise_addr()?;
let indexing_tasks = parse_indexing_tasks(node_state);
let indexing_cpu_capacity = parse_indexing_cpu_capacity(node_state);
let ingester_status = node_state.ingester_status();
let availability_zone = node_state.availability_zone();

let member = ClusterMember {
node_id: chitchat_id.node_id.into(),
generation_id: chitchat_id.generation_id.into(),
Expand All @@ -172,6 +188,7 @@ pub(crate) fn build_cluster_member(
grpc_advertise_addr,
indexing_tasks,
indexing_cpu_capacity,
ingester_status,
availability_zone,
};
Ok(member)
Expand Down
29 changes: 29 additions & 0 deletions quickwit/quickwit-cluster/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use chitchat::{ChitchatId, NodeState};
use quickwit_config::service::QuickwitService;
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use quickwit_proto::ingest::ingester::IngesterStatus;
use quickwit_proto::types::NodeIdRef;
use tonic::transport::Channel;

Expand All @@ -46,6 +47,7 @@ impl ClusterNode {
grpc_advertise_addr: member.grpc_advertise_addr,
indexing_tasks: member.indexing_tasks,
indexing_capacity: member.indexing_cpu_capacity,
ingester_status: member.ingester_status,
is_ready: member.is_ready,
is_self_node,
};
Expand All @@ -63,6 +65,27 @@ impl ClusterNode {
enabled_services: &[&str],
indexing_tasks: &[IndexingTask],
) -> Self {
Self::for_test_with_ingester_status(
node_id,
port,
is_self_node,
enabled_services,
indexing_tasks,
IngesterStatus::default(),
)
.await
}

#[cfg(any(test, feature = "testsuite"))]
pub async fn for_test_with_ingester_status(
node_id: &str,
port: u16,
is_self_node: bool,
enabled_services: &[&str],
indexing_tasks: &[IndexingTask],
ingester_status: IngesterStatus,
) -> Self {
use quickwit_common::shared_consts::INGESTER_STATUS_KEY;
use quickwit_common::tower::{ClientGrpcConfig, make_channel};

use crate::cluster::set_indexing_tasks_in_node_state;
Expand All @@ -75,6 +98,7 @@ impl ClusterNode {
let mut node_state = NodeState::for_test();
node_state.set(ENABLED_SERVICES_KEY, enabled_services.join(","));
node_state.set(GRPC_ADVERTISE_ADDR_KEY, grpc_advertise_addr.to_string());
node_state.set(INGESTER_STATUS_KEY, ingester_status.as_json_str_name());
set_indexing_tasks_in_node_state(indexing_tasks, &mut node_state);
Self::try_new(chitchat_id, &node_state, channel, is_self_node).unwrap()
}
Expand Down Expand Up @@ -125,6 +149,10 @@ impl ClusterNode {
self.inner.indexing_capacity
}

pub fn ingester_status(&self) -> IngesterStatus {
self.inner.ingester_status
}

pub fn is_ready(&self) -> bool {
self.inner.is_ready
}
Expand Down Expand Up @@ -163,6 +191,7 @@ struct InnerNode {
grpc_advertise_addr: SocketAddr,
indexing_tasks: Vec<IndexingTask>,
indexing_capacity: CpuCapacity,
ingester_status: IngesterStatus,
is_ready: bool,
is_self_node: bool,
}
5 changes: 4 additions & 1 deletion quickwit/quickwit-common/src/shared_consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ pub fn split_deletion_grace_period() -> Duration {
/// being requested.
pub const SCROLL_BATCH_LEN: usize = 1_000;

/// Prefix used in chitchat to broadcast the list of primary shards hosted by a leader.
/// Key prefix used in chitchat to broadcast the list of primary shards hosted by a leader.
pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:";

/// Key used in chitchat to broadcast the status of an ingester.
pub const INGESTER_STATUS_KEY: &str = "ingester.status";

/// File name for the encoded list of fields in the split
pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields";

Expand Down
10 changes: 10 additions & 0 deletions quickwit/quickwit-common/src/tower/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ where
.collect()
}

/// Returns all the key-value pairs in the pool.
pub fn keys_values(&self) -> Vec<(K, V)> {
self.pool
.read()
.expect("lock should not be poisoned")
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
}

/// Returns all the values in the pool.
pub fn values(&self) -> Vec<V> {
self.pool
Expand Down
Loading