Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 160 additions & 5 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ use quickwit_common::{Progress, shared_consts};
use quickwit_config::service::QuickwitService;
use quickwit_config::{ClusterConfig, IndexConfig, IndexTemplate, SourceConfig};
use quickwit_ingest::{IngesterPool, LocalShardsUpdate};
use quickwit_proto::ingest::ingester::{
IngesterService, IngesterServiceStream, IngesterStatus, ObservationMessage,
OpenObservationStreamRequest,
};
use quickwit_metastore::{CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadataResponseExt};
use quickwit_proto::control_plane::{
AdviseResetShardsRequest, AdviseResetShardsResponse, ControlPlaneError, ControlPlaneResult,
Expand All @@ -54,7 +58,7 @@ use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId, Source
use serde::Serialize;
use serde_json::{Value as JsonValue, json};
use tokio::sync::watch;
use tracing::{Level, debug, enabled, error, info};
use tracing::{Level, debug, enabled, error, info, warn};

use crate::IndexerPool;
use crate::cooldown_map::{CooldownMap, CooldownStatus};
Expand Down Expand Up @@ -1053,9 +1057,16 @@ impl Handler<IndexerJoined> for ControlPlane {
message: IndexerJoined,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
let node_id: NodeId = message.0.node_id().into();
info!(
"indexer `{}` joined the cluster: rebalancing shards and rebuilding indexing plan",
message.0.node_id()
"indexer `{node_id}` joined the cluster: rebalancing shards and rebuilding indexing \
plan"
);
// Subscribe to the ingester observation stream so we can track decommissioning state.
spawn_observe_ingester_task(
node_id,
self.ingest_controller.ingester_pool().clone(),
ctx.mailbox().downgrade(),
);
// TODO: Update shard table.
if let Err(metastore_error) = self
Expand Down Expand Up @@ -1083,10 +1094,12 @@ impl Handler<IndexerLeft> for ControlPlane {
message: IndexerLeft,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
let node_id: NodeId = message.0.node_id().into();
info!(
"indexer `{}` left the cluster: rebalancing shards and rebuilding indexing plan",
message.0.node_id()
"indexer `{node_id}` left the cluster: rebalancing shards and rebuilding indexing plan"
);
// Clear any decommissioning state tracked for this node's ingester.
self.model.remove_decommissioning_ingester(&node_id);
// TODO: Update shard table.
if let Err(metastore_error) = self
.ingest_controller
Expand Down Expand Up @@ -1164,6 +1177,98 @@ async fn watcher_indexers(
}
}

fn spawn_observe_ingester_task(
node_id: NodeId,
ingester_pool: IngesterPool,
weak_mailbox: WeakMailbox<ControlPlane>,
) {
tokio::spawn(observe_ingester(node_id, ingester_pool, weak_mailbox));
}

async fn observe_ingester(
node_id: NodeId,
ingester_pool: IngesterPool,
weak_mailbox: WeakMailbox<ControlPlane>,
) {
let Some(ingester) = ingester_pool.get(&node_id) else {
warn!(ingester_id=%node_id, "ingester not found in pool, skipping observation stream");
return;
};
let observation_stream = match ingester
.open_observation_stream(OpenObservationStreamRequest {})
.await
{
Ok(stream) => stream,
Err(error) => {
warn!(ingester_id=%node_id, %error, "failed to open observation stream from ingester");
return;
}
};
watch_ingester_observation_stream(node_id, observation_stream, weak_mailbox).await;
}

async fn watch_ingester_observation_stream(
node_id: NodeId,
mut observation_stream: IngesterServiceStream<ObservationMessage>,
weak_mailbox: WeakMailbox<ControlPlane>,
) {
while let Some(observation_message_result) = observation_stream.next().await {
let Some(mailbox) = weak_mailbox.upgrade() else {
return;
};
let status = match observation_message_result {
Ok(observation_message) => observation_message.status(),
Err(error) => {
warn!(ingester_id=%node_id, %error, "observation stream error from ingester");
return;
}
};
if let Err(error) = mailbox
.send_message(IngesterObservation {
node_id: node_id.clone(),
status,
})
.await
{
error!(ingester_id=%node_id, %error, "failed to forward `IngesterObservation` to control plane");
}
// `Decommissioned` and `Failed` are terminal states: no further status changes
// will be emitted. Close our end of the stream so the ingester can shut down cleanly.
match status {
IngesterStatus::Decommissioned | IngesterStatus::Failed => return,
_ => {}
}
}
}

/// Status update received from an ingester's observation stream.
#[derive(Debug)]
struct IngesterObservation {
node_id: NodeId,
status: IngesterStatus,
}

#[async_trait]
impl Handler<IngesterObservation> for ControlPlane {
type Reply = ();

async fn handle(
&mut self,
message: IngesterObservation,
_ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
match message.status {
IngesterStatus::Decommissioning | IngesterStatus::Decommissioned => {
self.model.add_decommissioning_ingester(message.node_id);
}
_ => {
self.model.remove_decommissioning_ingester(&message.node_id);
}
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::num::NonZero;
Expand All @@ -1186,6 +1291,7 @@ mod tests {
ApplyIndexingPlanRequest, ApplyIndexingPlanResponse, CpuCapacity, IndexingServiceClient,
MockIndexingService,
};
use quickwit_common::ServiceStream;
use quickwit_proto::ingest::ingester::{
IngesterServiceClient, InitShardSuccess, InitShardsResponse, MockIngesterService,
RetainShardsResponse,
Expand Down Expand Up @@ -2810,4 +2916,53 @@ mod tests {

universe.assert_quit().await;
}

#[tokio::test]
async fn test_watch_ingester_observation_stream() {
let universe = Universe::with_accelerated_time();
let (control_plane_mailbox, control_plane_inbox) = universe.create_test_mailbox();
let weak_mailbox = control_plane_mailbox.downgrade();

let node_id: NodeId = "test-ingester".into();
let ingester_pool = IngesterPool::default();

let mut mock_ingester = MockIngesterService::new();
mock_ingester
.expect_open_observation_stream()
.return_once(|_| {
let stream = ServiceStream::from(vec![
Ok(ObservationMessage {
node_id: "test-ingester".to_string(),
status: IngesterStatus::Ready as i32,
}),
Ok(ObservationMessage {
node_id: "test-ingester".to_string(),
status: IngesterStatus::Decommissioning as i32,
}),
Ok(ObservationMessage {
node_id: "test-ingester".to_string(),
status: IngesterStatus::Decommissioned as i32,
}),
]);
Ok(stream)
});
let ingester = IngesterServiceClient::from_mock(mock_ingester);
ingester_pool.insert(node_id.clone(), ingester);

spawn_observe_ingester_task(node_id.clone(), ingester_pool, weak_mailbox);

let msg: IngesterObservation = control_plane_inbox.recv_typed_message().await.unwrap();
assert_eq!(msg.node_id, node_id);
assert_eq!(msg.status, IngesterStatus::Ready);

let msg: IngesterObservation = control_plane_inbox.recv_typed_message().await.unwrap();
assert_eq!(msg.node_id, node_id);
assert_eq!(msg.status, IngesterStatus::Decommissioning);

let msg: IngesterObservation = control_plane_inbox.recv_typed_message().await.unwrap();
assert_eq!(msg.node_id, node_id);
assert_eq!(msg.status, IngesterStatus::Decommissioned);

universe.assert_quit().await;
}
}
Loading