Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1648,7 +1648,6 @@ mod tests {
index_id: "test-index".to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
}],
closed_shards: Vec::new(),
unavailable_leaders: Vec::new(),
};
let get_open_shards_response = control_plane_mailbox
Expand Down Expand Up @@ -2411,7 +2410,6 @@ mod tests {
index_id: "test-index-foo".to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
}],
closed_shards: Vec::new(),
unavailable_leaders: Vec::new(),
})
.await
Expand Down Expand Up @@ -2626,7 +2624,6 @@ mod tests {
index_id: "test-index".to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
}],
closed_shards: Vec::new(),
unavailable_leaders: Vec::new(),
};
control_plane_mailbox
Expand Down Expand Up @@ -2780,7 +2777,6 @@ mod tests {
index_id: "test-index".to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
}],
closed_shards: Vec::new(),
unavailable_leaders: Vec::new(),
};
control_plane_mailbox
Expand Down
90 changes: 2 additions & 88 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use futures::stream::FuturesUnordered;
use itertools::{Itertools as _, MinMaxResult};
use quickwit_actors::Mailbox;
use quickwit_common::Progress;
use quickwit_common::pretty::PrettySample;
use quickwit_ingest::{IngesterPool, LeaderId, LocalShardsUpdate};
use quickwit_proto::control_plane::{
AdviseResetShardsRequest, AdviseResetShardsResponse, GetOrCreateOpenShardsFailureReason,
Expand All @@ -45,7 +44,7 @@ use quickwit_proto::metastore::{
MetastoreResult, MetastoreService, MetastoreServiceClient, OpenShardSubrequest,
OpenShardsRequest, OpenShardsResponse, serde_utils,
};
use quickwit_proto::types::{IndexUid, NodeId, NodeIdRef, Position, ShardId, SourceUid};
use quickwit_proto::types::{NodeId, NodeIdRef, Position, ShardId, SourceUid};
use rand::prelude::IndexedRandom;
use rand::rngs::ThreadRng;
use rand::seq::SliceRandom;
Expand Down Expand Up @@ -363,29 +362,6 @@ impl IngestController {
wait_handle
}

fn handle_closed_shards(&self, closed_shards: Vec<ShardIds>, model: &mut ControlPlaneModel) {
for closed_shard in closed_shards {
let index_uid: IndexUid = closed_shard.index_uid().clone();
let source_id = closed_shard.source_id;

let source_uid = SourceUid {
index_uid,
source_id,
};
let closed_shard_ids = model.close_shards(&source_uid, &closed_shard.shard_ids);

if !closed_shard_ids.is_empty() {
info!(
index_id=%source_uid.index_uid.index_id,
source_id=%source_uid.source_id,
shard_ids=?PrettySample::new(&closed_shard_ids, 5),
"closed {} shards reported by router",
closed_shard_ids.len()
);
}
}
}

pub(crate) async fn handle_local_shards_update(
&mut self,
local_shards_update: LocalShardsUpdate,
Expand Down Expand Up @@ -442,13 +418,6 @@ impl IngestController {
model: &mut ControlPlaneModel,
progress: &Progress,
) -> MetastoreResult<GetOrCreateOpenShardsResponse> {
// Closing shards is an operation performed by ingesters,
// so the control plane is not necessarily aware that they are closed.
//
// Routers can report closed shards so that we can update our
// internal state.
self.handle_closed_shards(get_open_shards_request.closed_shards, model);

let num_subrequests = get_open_shards_request.subrequests.len();
let mut get_or_create_open_shards_successes = Vec::with_capacity(num_subrequests);
let mut get_or_create_open_shards_failures = Vec::new();
Expand Down Expand Up @@ -1324,7 +1293,7 @@ mod tests {
use quickwit_proto::metastore::{
self, MetastoreError, MockMetastoreService, OpenShardSubresponse,
};
use quickwit_proto::types::{DocMappingUid, Position, SourceId};
use quickwit_proto::types::{DocMappingUid, IndexUid, Position, SourceId};

use super::*;

Expand Down Expand Up @@ -1461,7 +1430,6 @@ mod tests {

let request = GetOrCreateOpenShardsRequest {
subrequests: Vec::new(),
closed_shards: Vec::new(),
unavailable_leaders: Vec::new(),
};
let response = controller
Expand Down Expand Up @@ -1494,11 +1462,9 @@ mod tests {
source_id: "source-not-found".to_string(),
},
];
let closed_shards = Vec::new();
let unavailable_leaders = vec!["test-ingester-0".to_string()];
let request = GetOrCreateOpenShardsRequest {
subrequests,
closed_shards,
unavailable_leaders,
};
let response = controller
Expand Down Expand Up @@ -1627,7 +1593,6 @@ mod tests {
}];
let request = GetOrCreateOpenShardsRequest {
subrequests,
closed_shards: Vec::new(),
unavailable_leaders: Vec::new(),
};

Expand All @@ -1639,57 +1604,6 @@ mod tests {
assert!(!metastore_error.is_transaction_certainly_aborted());
}

#[tokio::test]
async fn test_ingest_controller_get_open_shards_handles_closed_shards() {
let metastore = MetastoreServiceClient::mocked();
let ingester_pool = IngesterPool::default();
let replication_factor = 2;

let mut controller = IngestController::new(
metastore,
ingester_pool,
replication_factor,
TEST_SHARD_THROUGHPUT_LIMIT_MIB,
1.001,
);
let mut model = ControlPlaneModel::default();

let index_uid = IndexUid::for_test("test-index-0", 0);
let source_id: SourceId = "test-source".to_string();

let shards = vec![Shard {
shard_id: Some(ShardId::from(1)),
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
leader_id: "test-ingester-0".to_string(),
shard_state: ShardState::Open as i32,
..Default::default()
}];
model.insert_shards(&index_uid, &source_id, shards);

let request = GetOrCreateOpenShardsRequest {
subrequests: Vec::new(),
closed_shards: vec![ShardIds {
index_uid: index_uid.clone().into(),
source_id: source_id.clone(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
}],
unavailable_leaders: Vec::new(),
};
let progress = Progress::default();

controller
.get_or_create_open_shards(request, &mut model, &progress)
.await
.unwrap();

let shard_1 = model
.all_shards()
.find(|shard| shard.shard_id() == ShardId::from(1))
.unwrap();
assert!(shard_1.is_closed());
}

#[test]
fn test_ingest_controller_allocate_shards() {
let metastore = MetastoreServiceClient::mocked();
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/debouncing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ impl DebouncedGetOrCreateOpenShardsRequest {
let request = GetOrCreateOpenShardsRequest {
subrequests: self.subrequests,
unavailable_leaders: self.unavailable_leaders,
..Default::default()
};
(Some(request), self.rendezvous)
}
Expand Down
Loading