From 44c705e7e284a93152ba976212a7fe71c4c85e98 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Tue, 3 Mar 2026 13:23:22 -0500 Subject: [PATCH] Add availability zone awareness to node based routing --- quickwit/quickwit-cluster/src/node.rs | 7 + .../src/ingest_v2/broadcast/mod.rs | 15 ++ quickwit/quickwit-ingest/src/ingest_v2/mod.rs | 4 +- .../src/ingest_v2/node_routing_table.rs | 157 ++++++++++++------ .../quickwit-ingest/src/ingest_v2/router.rs | 143 +++++++++++++--- quickwit/quickwit-serve/src/lib.rs | 24 ++- 6 files changed, 271 insertions(+), 79 deletions(-) diff --git a/quickwit/quickwit-cluster/src/node.rs b/quickwit/quickwit-cluster/src/node.rs index 4a8b11dbafc..52029348eb1 100644 --- a/quickwit/quickwit-cluster/src/node.rs +++ b/quickwit/quickwit-cluster/src/node.rs @@ -48,6 +48,7 @@ impl ClusterNode { indexing_capacity: member.indexing_cpu_capacity, is_ready: member.is_ready, is_self_node, + availability_zone: member.availability_zone, }; let node = ClusterNode { inner: Arc::new(inner), @@ -132,6 +133,10 @@ impl ClusterNode { pub fn is_self_node(&self) -> bool { self.inner.is_self_node } + + pub fn availability_zone(&self) -> Option<&str> { + self.inner.availability_zone.as_deref() + } } impl Debug for ClusterNode { @@ -153,6 +158,7 @@ impl PartialEq for ClusterNode { && self.inner.indexing_tasks == other.inner.indexing_tasks && self.inner.is_ready == other.inner.is_ready && self.inner.is_self_node == other.inner.is_self_node + && self.inner.availability_zone == other.inner.availability_zone } } @@ -165,4 +171,5 @@ struct InnerNode { indexing_capacity: CpuCapacity, is_ready: bool, is_self_node: bool, + availability_zone: Option, } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs index 18a00209de1..2558bc65321 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs @@ -34,6 +34,21 @@ pub use local_shards::{ BroadcastLocalShardsTask, LocalShardsUpdate, ShardInfo, ShardInfos, setup_local_shards_update_listener, }; +use quickwit_common::pubsub::Event; +use quickwit_proto::types::NodeId; + +#[derive(Debug, Clone)] +pub enum IngesterNodeInfo { + Add { + node_id: NodeId, + availability_zone: String, + }, + Remove { + node_id: NodeId, + }, +} + +impl Event for IngesterNodeInfo {} fn make_key(prefix: &str, source_uid: &SourceUid) -> String { format!("{prefix}{}:{}", source_uid.index_uid, source_uid.source_id) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index 34051e62cd0..a02faa08946 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -40,8 +40,8 @@ use std::time::Duration; use std::{env, fmt}; pub use broadcast::{ - LocalShardsUpdate, ShardInfo, ShardInfos, setup_ingester_capacity_update_listener, - setup_local_shards_update_listener, + IngesterNodeInfo, LocalShardsUpdate, ShardInfo, ShardInfos, + setup_ingester_capacity_update_listener, setup_local_shards_update_listener, }; use bytes::buf::Writer; use bytes::{BufMut, BytesMut}; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs index 72ee2ded5ec..16ed28f7df8 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs @@ -57,42 +57,83 @@ fn power_of_two_choices<'a>(candidates: &[&'a IngesterNode]) -> &'a IngesterNode } } +fn pick_from(candidates: Vec<&IngesterNode>) -> Option<&IngesterNode> { + match candidates.len() { + 0 => None, + 1 => Some(candidates[0]), + _ => Some(power_of_two_choices(&candidates)), + } +} + impl RoutingEntry { /// Pick an ingester node to persist the request to. Uses power of two choices based on reported - /// ingester capacity, if more than one eligible node exists. - pub fn pick_node( + /// ingester capacity, if more than one eligible node exists. Prefers nodes in the same + /// availability zone, falling back to remote nodes. + fn pick_node( &self, ingester_pool: &IngesterPool, unavailable_leaders: &HashSet, + self_availability_zone: &Option, + node_availability_zone_map: &HashMap, ) -> Option<&IngesterNode> { - let eligible: Vec<&IngesterNode> = self + let (local_ingesters, remote_ingesters): (Vec<&IngesterNode>, Vec<&IngesterNode>) = self .nodes .values() - .filter(|node| { + .filter(|&node| { node.capacity_score > 0 && node.open_shard_count > 0 && ingester_pool.contains_key(&node.node_id) && !unavailable_leaders.contains(&node.node_id) }) - .collect(); + .partition(|&node| { + self_availability_zone.as_ref() == node_availability_zone_map.get(&node.node_id) + }); - match eligible.len() { - 0 => None, - 1 => Some(eligible[0]), - _ => Some(power_of_two_choices(&eligible)), - } + pick_from(local_ingesters).or_else(|| pick_from(remote_ingesters)) } } #[derive(Debug, Default)] pub(super) struct NodeBasedRoutingTable { table: HashMap<(IndexId, SourceId), RoutingEntry>, + node_availability_zone_map: HashMap, + self_availability_zone: Option, } impl NodeBasedRoutingTable { - pub fn find_entry(&self, index_id: &str, source_id: &str) -> Option<&RoutingEntry> { + pub fn new(self_availability_zone: Option) -> Self { + Self { + self_availability_zone, + ..Default::default() + } + } + + pub fn update_node_availability_zone(&mut self, node_id: NodeId, availability_zone: String) { + self.node_availability_zone_map + .insert(node_id, availability_zone); + } + + pub fn remove_node_availability_zone(&mut self, node_id: &NodeId) { + self.node_availability_zone_map.remove(node_id); + } + + pub fn pick_node( + &self, + index_id: &str, + source_id: &str, + ingester_pool: &IngesterPool, + unavailable_leaders: &HashSet, + ) -> Option<&IngesterNode> { let key = (index_id.to_string(), source_id.to_string()); - self.table.get(&key) + let Some(entry) = self.table.get(&key) else { + return None; + }; + entry.pick_node( + ingester_pool, + unavailable_leaders, + &self.self_availability_zone, + &self.node_availability_zone_map, + ) } pub fn debug_info(&self) -> HashMap> { @@ -107,6 +148,7 @@ impl NodeBasedRoutingTable { "node_id": node_id, "capacity_score": node.capacity_score, "open_shard_count": node.open_shard_count, + "availability_zone": self.node_availability_zone_map.get(node_id), })); } } @@ -304,77 +346,86 @@ mod tests { } #[test] - fn test_pick_node() { - let mut table = NodeBasedRoutingTable::default(); + fn test_pick_node_prefers_same_az() { + let mut table = NodeBasedRoutingTable::new(Some("az-1".to_string())); let pool = IngesterPool::default(); - let key = ("test-index".to_string(), "test-source".to_string()); - // Node exists but not in pool → None. table.apply_capacity_update( "node-1".into(), IndexUid::for_test("test-index", 0), "test-source".into(), - 8, - 3, + 5, + 1, ); - assert!( - table - .table - .get(&key) - .unwrap() - .pick_node(&pool, &HashSet::new()) - .is_none() + table.apply_capacity_update( + "node-2".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 5, + 1, ); + table.update_node_availability_zone("node-1".into(), "az-1".to_string()); + table.update_node_availability_zone("node-2".into(), "az-2".to_string()); - // Single node in pool → picks it. pool.insert("node-1".into(), IngesterServiceClient::mocked()); + pool.insert("node-2".into(), IngesterServiceClient::mocked()); + let picked = table - .table - .get(&key) - .unwrap() - .pick_node(&pool, &HashSet::new()) + .pick_node("test-index", "test-source", &pool, &HashSet::new()) .unwrap(); assert_eq!(picked.node_id, NodeId::from("node-1")); + } + + #[test] + fn test_pick_node_falls_back_to_cross_az() { + let mut table = NodeBasedRoutingTable::new(Some("az-1".to_string())); + let pool = IngesterPool::default(); - // Multiple nodes → something is returned. table.apply_capacity_update( "node-2".into(), IndexUid::for_test("test-index", 0), "test-source".into(), - 2, + 5, 1, ); + table.update_node_availability_zone("node-2".into(), "az-2".to_string()); pool.insert("node-2".into(), IngesterServiceClient::mocked()); - assert!( - table - .table - .get(&key) - .unwrap() - .pick_node(&pool, &HashSet::new()) - .is_some() - ); - // Node with capacity_score=0 is skipped. + let picked = table + .pick_node("test-index", "test-source", &pool, &HashSet::new()) + .unwrap(); + assert_eq!(picked.node_id, NodeId::from("node-2")); + } + + #[test] + fn test_pick_node_no_az_awareness() { + let mut table = NodeBasedRoutingTable::default(); + let pool = IngesterPool::default(); + table.apply_capacity_update( "node-1".into(), IndexUid::for_test("test-index", 0), "test-source".into(), - 0, - 3, - ); - table.apply_capacity_update( - "node-2".into(), - IndexUid::for_test("test-index", 0), - "test-source".into(), - 0, + 5, 1, ); + table.update_node_availability_zone("node-1".into(), "az-1".to_string()); + pool.insert("node-1".into(), IngesterServiceClient::mocked()); + + let picked = table + .pick_node("test-index", "test-source", &pool, &HashSet::new()) + .unwrap(); + assert_eq!(picked.node_id, NodeId::from("node-1")); + } + + #[test] + fn test_pick_node_missing_entry() { + let table = NodeBasedRoutingTable::new(Some("az-1".to_string())); + let pool = IngesterPool::default(); + assert!( table - .table - .get(&key) - .unwrap() - .pick_node(&pool, &HashSet::new()) + .pick_node("nonexistent", "source", &pool, &HashSet::new()) .is_none() ); } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 5a473a1adb1..58426d9466d 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -40,7 +40,7 @@ use tokio::sync::{Mutex, Semaphore}; use tokio::time::error::Elapsed; use tracing::{error, info}; -use super::broadcast::IngesterCapacityScoreUpdate; +use super::broadcast::{IngesterCapacityScoreUpdate, IngesterNodeInfo}; use super::debouncing::{ DebouncedGetOrCreateOpenShardsRequest, GetOrCreateOpenShardsRequestDebouncer, }; @@ -120,10 +120,11 @@ impl IngestRouter { ingester_pool: IngesterPool, replication_factor: usize, event_broker: EventBroker, + self_availability_zone: Option, ) -> Self { let state = Arc::new(Mutex::new(RouterState { debouncer: GetOrCreateOpenShardsRequestDebouncer::default(), - node_routing_table: NodeBasedRoutingTable::default(), + node_routing_table: NodeBasedRoutingTable::new(self_availability_zone), })); let ingest_semaphore_permits = get_ingest_router_buffer_size().as_u64() as usize; let ingest_semaphore = Arc::new(Semaphore::new(ingest_semaphore_permits)); @@ -142,7 +143,10 @@ impl IngestRouter { pub fn subscribe(&self) { let weak_router_state = WeakRouterState(Arc::downgrade(&self.state)); self.event_broker - .subscribe::(weak_router_state) + .subscribe::(weak_router_state.clone()) + .forever(); + self.event_broker + .subscribe::(weak_router_state) .forever(); } @@ -352,10 +356,12 @@ impl IngestRouter { let state_guard = self.state.lock().await; for subrequest in pending_subrequests(&workbench.subworkbenches) { - let ingester_node = state_guard - .node_routing_table - .find_entry(&subrequest.index_id, &subrequest.source_id) - .and_then(|entry| entry.pick_node(&self.ingester_pool, unavailable_leaders)); + let ingester_node = state_guard.node_routing_table.pick_node( + &subrequest.index_id, + &subrequest.source_id, + &self.ingester_pool, + unavailable_leaders, + ); let ingester_node = match ingester_node { Some(node) => node, @@ -612,6 +618,31 @@ impl EventSubscriber for WeakRouterState { } } +#[async_trait] +impl EventSubscriber for WeakRouterState { + async fn handle_event(&mut self, info: IngesterNodeInfo) { + let Some(state) = self.0.upgrade() else { + return; + }; + let mut state_guard = state.lock().await; + match info { + IngesterNodeInfo::Add { + node_id, + availability_zone, + } => { + state_guard + .node_routing_table + .update_node_availability_zone(node_id, availability_zone); + } + IngesterNodeInfo::Remove { node_id } => { + state_guard + .node_routing_table + .remove_node_availability_zone(&node_id); + } + } + } +} + pub(super) struct PersistRequestSummary { pub leader_id: NodeId, pub subrequest_ids: Vec, @@ -649,6 +680,7 @@ mod tests { ingester_pool.clone(), replication_factor, EventBroker::default(), + Some("test-az".to_string()), ); let mut workbench = IngestWorkbench::default(); let (get_or_create_open_shard_request_opt, rendezvous) = router @@ -859,6 +891,7 @@ mod tests { ingester_pool.clone(), replication_factor, EventBroker::default(), + Some("test-az".to_string()), ); let ingest_subrequests = vec![ IngestSubrequest { @@ -957,6 +990,7 @@ mod tests { ingester_pool.clone(), replication_factor, EventBroker::default(), + Some("test-az".to_string()), ); let ingest_subrequests = vec![IngestSubrequest { subrequest_id: 0, @@ -1016,6 +1050,7 @@ mod tests { ingester_pool.clone(), replication_factor, EventBroker::default(), + Some("test-az".to_string()), ); let ingest_subrequests = vec![IngestSubrequest { subrequest_id: 0, @@ -1046,6 +1081,7 @@ mod tests { ingester_pool.clone(), replication_factor, EventBroker::default(), + Some("test-az".to_string()), ); let ingest_subrequests = vec![IngestSubrequest { subrequest_id: 0, @@ -1103,6 +1139,7 @@ mod tests { ingester_pool.clone(), replication_factor, EventBroker::default(), + Some("test-az".to_string()), ); let ingest_subrequests = vec![IngestSubrequest { subrequest_id: 0, @@ -1163,6 +1200,7 @@ mod tests { ingester_pool.clone(), replication_factor, EventBroker::default(), + Some("test-az".to_string()), ); let ingest_subrequests = vec![ IngestSubrequest { @@ -1246,6 +1284,7 @@ mod tests { ingester_pool.clone(), 1, EventBroker::default(), + Some("test-az".to_string()), ); let index_uid_0: IndexUid = IndexUid::for_test("test-index-0", 0); @@ -1388,6 +1427,7 @@ mod tests { ingester_pool.clone(), 1, EventBroker::default(), + Some("test-az".to_string()), ); let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); { @@ -1491,6 +1531,7 @@ mod tests { ingester_pool.clone(), replication_factor, EventBroker::default(), + Some("test-az".to_string()), ); let index_uid_0: IndexUid = IndexUid::for_test("test-index-0", 0); let index_uid_1: IndexUid = IndexUid::for_test("test-index-1", 0); @@ -1547,6 +1588,7 @@ mod tests { ingester_pool.clone(), replication_factor, EventBroker::default(), + Some("test-az".to_string()), ); let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); { @@ -1624,12 +1666,14 @@ mod tests { #[tokio::test] async fn test_router_updates_node_routing_table_on_capacity_update() { let event_broker = EventBroker::default(); + let ingester_pool = IngesterPool::default(); let router = IngestRouter::new( "test-router".into(), ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()), - IngesterPool::default(), + ingester_pool.clone(), 1, event_broker.clone(), + Some("test-az".to_string()), ); router.subscribe(); @@ -1645,14 +1689,73 @@ mod tests { // Give the async subscriber a moment to process. tokio::time::sleep(Duration::from_millis(10)).await; + ingester_pool.insert("test-ingester-0".into(), IngesterServiceClient::mocked()); let state_guard = router.state.lock().await; - let entry = state_guard + let node = state_guard .node_routing_table - .find_entry("test-index", "test-source") + .pick_node("test-index", "test-source", &ingester_pool, &HashSet::new()) .unwrap(); - let node = entry.nodes.get("test-ingester-0").unwrap(); - assert_eq!(node.capacity_score, 7); - assert_eq!(node.open_shard_count, 3); + assert_eq!(node.node_id, NodeId::from("test-ingester-0")); + } + + #[tokio::test] + async fn test_router_ingester_node_info_add_and_remove() { + let event_broker = EventBroker::default(); + let ingester_pool = IngesterPool::default(); + let router = IngestRouter::new( + "test-router".into(), + ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()), + ingester_pool.clone(), + 1, + event_broker.clone(), + Some("test-az".to_string()), + ); + router.subscribe(); + + for node_id in ["test-ingester-0", "test-ingester-1"] { + event_broker.publish(IngesterCapacityScoreUpdate { + node_id: node_id.into(), + source_uid: SourceUid { + index_uid: IndexUid::for_test("test-index", 0), + source_id: "test-source".to_string(), + }, + capacity_score: 5, + open_shard_count: 1, + }); + ingester_pool.insert(node_id.into(), IngesterServiceClient::mocked()); + } + + // ingester-0 is in our AZ. + event_broker.publish(IngesterNodeInfo::Add { + node_id: "test-ingester-0".into(), + availability_zone: "test-az".to_string(), + }); + tokio::time::sleep(Duration::from_millis(10)).await; + + let state_guard = router.state.lock().await; + let node = state_guard + .node_routing_table + .pick_node("test-index", "test-source", &ingester_pool, &HashSet::new()) + .unwrap(); + assert_eq!(node.node_id, NodeId::from("test-ingester-0")); + drop(state_guard); + + // Remove ingester-0's AZ, add ingester-1 as local instead. + event_broker.publish(IngesterNodeInfo::Remove { + node_id: "test-ingester-0".into(), + }); + event_broker.publish(IngesterNodeInfo::Add { + node_id: "test-ingester-1".into(), + availability_zone: "test-az".to_string(), + }); + tokio::time::sleep(Duration::from_millis(10)).await; + + let state_guard = router.state.lock().await; + let node = state_guard + .node_routing_table + .pick_node("test-index", "test-source", &ingester_pool, &HashSet::new()) + .unwrap(); + assert_eq!(node.node_id, NodeId::from("test-ingester-1")); } #[tokio::test] @@ -1663,6 +1766,7 @@ mod tests { IngesterPool::default(), 1, EventBroker::default(), + Some("test-az".to_string()), ); let ingest_subrequests = vec![ IngestSubrequest { @@ -1749,12 +1853,14 @@ mod tests { #[tokio::test] async fn test_router_process_persist_results_applies_piggybacked_routing_updates() { + let ingester_pool = IngesterPool::default(); let router = IngestRouter::new( "test-router".into(), ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()), - IngesterPool::default(), + ingester_pool.clone(), 1, EventBroker::default(), + Some("test-az".to_string()), ); let ingest_subrequests = vec![IngestSubrequest { subrequest_id: 0, @@ -1790,13 +1896,12 @@ mod tests { .process_persist_results(&mut workbench, persist_futures) .await; + ingester_pool.insert("test-ingester-0".into(), IngesterServiceClient::mocked()); let state_guard = router.state.lock().await; - let entry = state_guard + let node = state_guard .node_routing_table - .find_entry("test-index", "test-source") + .pick_node("test-index", "test-source", &ingester_pool, &HashSet::new()) .unwrap(); - let node = entry.nodes.get("test-ingester-0").unwrap(); - assert_eq!(node.capacity_score, 3); - assert_eq!(node.open_shard_count, 2); + assert_eq!(node.node_id, NodeId::from("test-ingester-0")); } } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 60515bc819f..1b18bb343eb 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -81,10 +81,10 @@ use quickwit_indexing::actors::IndexingService; use quickwit_indexing::models::ShardPositionsService; use quickwit_indexing::start_indexing_service; use quickwit_ingest::{ - GetMemoryCapacity, IngestRequest, IngestRouter, IngestServiceClient, Ingester, IngesterPool, - LocalShardsUpdate, get_idle_shard_timeout, setup_ingester_capacity_update_listener, - setup_local_shards_update_listener, start_ingest_api_service, wait_for_ingester_decommission, - wait_for_ingester_status, + GetMemoryCapacity, IngestRequest, IngestRouter, IngestServiceClient, Ingester, + IngesterNodeInfo, IngesterPool, LocalShardsUpdate, get_idle_shard_timeout, + setup_ingester_capacity_update_listener, setup_local_shards_update_listener, + start_ingest_api_service, wait_for_ingester_decommission, wait_for_ingester_status, }; use quickwit_jaeger::JaegerService; use quickwit_janitor::{JanitorService, start_janitor_service}; @@ -905,6 +905,7 @@ async fn setup_ingest_v2( ingester_pool.clone(), replication_factor, event_broker.clone(), + node_config.availability_zone.clone(), ); ingest_router.subscribe(); setup_ingester_capacity_update_listener(cluster.clone(), event_broker.clone()) @@ -954,9 +955,11 @@ async fn setup_ingest_v2( }; // Setup ingester pool change stream. let ingester_opt_clone = ingester_opt.clone(); + let event_broker_clone = event_broker.clone(); let max_message_size = node_config.grpc_config.max_message_size; let ingester_change_stream = cluster.change_stream().filter_map(move |cluster_change| { let ingester_opt_clone_clone = ingester_opt_clone.clone(); + let event_broker_clone_clone = event_broker_clone.clone(); Box::pin(async move { match cluster_change { ClusterChange::Add(node) if node.is_indexer() => { @@ -969,6 +972,13 @@ async fn setup_ingest_v2( ); let node_id: NodeId = node.node_id().into(); + if let Some(availability_zone) = node.availability_zone() { + event_broker_clone_clone.publish(IngesterNodeInfo::Add { + node_id: node_id.clone(), + availability_zone: availability_zone.to_string(), + }); + } + if node.is_self_node() { // Here, since the service is available locally, we bypass the network stack // and use the instance directly. However, we still want client-side @@ -1002,7 +1012,11 @@ async fn setup_ingest_v2( "removing node `{}` from ingester pool", chitchat_id.node_id, ); - Some(Change::Remove(node.node_id().into())) + let node_id: NodeId = node.node_id().into(); + event_broker_clone_clone.publish(IngesterNodeInfo::Remove { + node_id: node_id.clone(), + }); + Some(Change::Remove(node_id)) } _ => None, }