Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions quickwit/quickwit-cluster/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl ClusterNode {
indexing_capacity: member.indexing_cpu_capacity,
is_ready: member.is_ready,
is_self_node,
availability_zone: member.availability_zone,
};
let node = ClusterNode {
inner: Arc::new(inner),
Expand Down Expand Up @@ -132,6 +133,10 @@ impl ClusterNode {
pub fn is_self_node(&self) -> bool {
self.inner.is_self_node
}

pub fn availability_zone(&self) -> Option<&str> {
self.inner.availability_zone.as_deref()
}
}

impl Debug for ClusterNode {
Expand All @@ -153,6 +158,7 @@ impl PartialEq for ClusterNode {
&& self.inner.indexing_tasks == other.inner.indexing_tasks
&& self.inner.is_ready == other.inner.is_ready
&& self.inner.is_self_node == other.inner.is_self_node
&& self.inner.availability_zone == other.inner.availability_zone
}
}

Expand All @@ -165,4 +171,5 @@ struct InnerNode {
indexing_capacity: CpuCapacity,
is_ready: bool,
is_self_node: bool,
availability_zone: Option<String>,
}
15 changes: 15 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ pub use local_shards::{
BroadcastLocalShardsTask, LocalShardsUpdate, ShardInfo, ShardInfos,
setup_local_shards_update_listener,
};
use quickwit_common::pubsub::Event;
use quickwit_proto::types::NodeId;

#[derive(Debug, Clone)]
pub enum IngesterNodeInfo {
Add {
node_id: NodeId,
availability_zone: String,
},
Remove {
node_id: NodeId,
},
}

impl Event for IngesterNodeInfo {}

fn make_key(prefix: &str, source_uid: &SourceUid) -> String {
format!("{prefix}{}:{}", source_uid.index_uid, source_uid.source_id)
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use std::time::Duration;
use std::{env, fmt};

pub use broadcast::{
LocalShardsUpdate, ShardInfo, ShardInfos, setup_ingester_capacity_update_listener,
setup_local_shards_update_listener,
IngesterNodeInfo, LocalShardsUpdate, ShardInfo, ShardInfos,
setup_ingester_capacity_update_listener, setup_local_shards_update_listener,
};
use bytes::buf::Writer;
use bytes::{BufMut, BytesMut};
Expand Down
157 changes: 104 additions & 53 deletions quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,42 +57,83 @@ fn power_of_two_choices<'a>(candidates: &[&'a IngesterNode]) -> &'a IngesterNode
}
}

fn pick_from(candidates: Vec<&IngesterNode>) -> Option<&IngesterNode> {
match candidates.len() {
0 => None,
1 => Some(candidates[0]),
_ => Some(power_of_two_choices(&candidates)),
}
}

impl RoutingEntry {
/// Pick an ingester node to persist the request to. Uses power of two choices based on reported
/// ingester capacity, if more than one eligible node exists.
pub fn pick_node(
/// ingester capacity, if more than one eligible node exists. Prefers nodes in the same
/// availability zone, falling back to remote nodes.
fn pick_node(
&self,
ingester_pool: &IngesterPool,
unavailable_leaders: &HashSet<NodeId>,
self_availability_zone: &Option<String>,
node_availability_zone_map: &HashMap<NodeId, String>,
) -> Option<&IngesterNode> {
let eligible: Vec<&IngesterNode> = self
let (local_ingesters, remote_ingesters): (Vec<&IngesterNode>, Vec<&IngesterNode>) = self
.nodes
.values()
.filter(|node| {
.filter(|&node| {
node.capacity_score > 0
&& node.open_shard_count > 0
&& ingester_pool.contains_key(&node.node_id)
&& !unavailable_leaders.contains(&node.node_id)
})
.collect();
.partition(|&node| {
self_availability_zone.as_ref() == node_availability_zone_map.get(&node.node_id)
});

match eligible.len() {
0 => None,
1 => Some(eligible[0]),
_ => Some(power_of_two_choices(&eligible)),
}
pick_from(local_ingesters).or_else(|| pick_from(remote_ingesters))
}
}

#[derive(Debug, Default)]
pub(super) struct NodeBasedRoutingTable {
table: HashMap<(IndexId, SourceId), RoutingEntry>,
node_availability_zone_map: HashMap<NodeId, String>,
self_availability_zone: Option<String>,
}

impl NodeBasedRoutingTable {
pub fn find_entry(&self, index_id: &str, source_id: &str) -> Option<&RoutingEntry> {
pub fn new(self_availability_zone: Option<String>) -> Self {
Self {
self_availability_zone,
..Default::default()
}
}

pub fn update_node_availability_zone(&mut self, node_id: NodeId, availability_zone: String) {
self.node_availability_zone_map
.insert(node_id, availability_zone);
}

pub fn remove_node_availability_zone(&mut self, node_id: &NodeId) {
self.node_availability_zone_map.remove(node_id);
}

pub fn pick_node(
&self,
index_id: &str,
source_id: &str,
ingester_pool: &IngesterPool,
unavailable_leaders: &HashSet<NodeId>,
) -> Option<&IngesterNode> {
let key = (index_id.to_string(), source_id.to_string());
self.table.get(&key)
let Some(entry) = self.table.get(&key) else {
return None;
};
entry.pick_node(
ingester_pool,
unavailable_leaders,
&self.self_availability_zone,
&self.node_availability_zone_map,
)
}

pub fn debug_info(&self) -> HashMap<IndexId, Vec<serde_json::Value>> {
Expand All @@ -107,6 +148,7 @@ impl NodeBasedRoutingTable {
"node_id": node_id,
"capacity_score": node.capacity_score,
"open_shard_count": node.open_shard_count,
"availability_zone": self.node_availability_zone_map.get(node_id),
}));
}
}
Expand Down Expand Up @@ -304,77 +346,86 @@ mod tests {
}

#[test]
fn test_pick_node() {
let mut table = NodeBasedRoutingTable::default();
fn test_pick_node_prefers_same_az() {
let mut table = NodeBasedRoutingTable::new(Some("az-1".to_string()));
let pool = IngesterPool::default();
let key = ("test-index".to_string(), "test-source".to_string());

// Node exists but not in pool → None.
table.apply_capacity_update(
"node-1".into(),
IndexUid::for_test("test-index", 0),
"test-source".into(),
8,
3,
5,
1,
);
assert!(
table
.table
.get(&key)
.unwrap()
.pick_node(&pool, &HashSet::new())
.is_none()
table.apply_capacity_update(
"node-2".into(),
IndexUid::for_test("test-index", 0),
"test-source".into(),
5,
1,
);
table.update_node_availability_zone("node-1".into(), "az-1".to_string());
table.update_node_availability_zone("node-2".into(), "az-2".to_string());

// Single node in pool → picks it.
pool.insert("node-1".into(), IngesterServiceClient::mocked());
pool.insert("node-2".into(), IngesterServiceClient::mocked());

let picked = table
.table
.get(&key)
.unwrap()
.pick_node(&pool, &HashSet::new())
.pick_node("test-index", "test-source", &pool, &HashSet::new())
.unwrap();
assert_eq!(picked.node_id, NodeId::from("node-1"));
}

#[test]
fn test_pick_node_falls_back_to_cross_az() {
let mut table = NodeBasedRoutingTable::new(Some("az-1".to_string()));
let pool = IngesterPool::default();

// Multiple nodes → something is returned.
table.apply_capacity_update(
"node-2".into(),
IndexUid::for_test("test-index", 0),
"test-source".into(),
2,
5,
1,
);
table.update_node_availability_zone("node-2".into(), "az-2".to_string());
pool.insert("node-2".into(), IngesterServiceClient::mocked());
assert!(
table
.table
.get(&key)
.unwrap()
.pick_node(&pool, &HashSet::new())
.is_some()
);

// Node with capacity_score=0 is skipped.
let picked = table
.pick_node("test-index", "test-source", &pool, &HashSet::new())
.unwrap();
assert_eq!(picked.node_id, NodeId::from("node-2"));
}

#[test]
fn test_pick_node_no_az_awareness() {
let mut table = NodeBasedRoutingTable::default();
let pool = IngesterPool::default();

table.apply_capacity_update(
"node-1".into(),
IndexUid::for_test("test-index", 0),
"test-source".into(),
0,
3,
);
table.apply_capacity_update(
"node-2".into(),
IndexUid::for_test("test-index", 0),
"test-source".into(),
0,
5,
1,
);
table.update_node_availability_zone("node-1".into(), "az-1".to_string());
pool.insert("node-1".into(), IngesterServiceClient::mocked());

let picked = table
.pick_node("test-index", "test-source", &pool, &HashSet::new())
.unwrap();
assert_eq!(picked.node_id, NodeId::from("node-1"));
}

#[test]
fn test_pick_node_missing_entry() {
let table = NodeBasedRoutingTable::new(Some("az-1".to_string()));
let pool = IngesterPool::default();

assert!(
table
.table
.get(&key)
.unwrap()
.pick_node(&pool, &HashSet::new())
.pick_node("nonexistent", "source", &pool, &HashSet::new())
.is_none()
);
}
Expand Down
Loading
Loading