Skip to content

Commit 00a5f3e

Browse files
authored
Propagate cancellation within leaf search (#6002)
1 parent 977ed84 commit 00a5f3e

File tree

2 files changed

+35
-42
lines changed

2 files changed

+35
-42
lines changed

quickwit/quickwit-search/src/leaf.rs

Lines changed: 31 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use tantivy::directory::FileSlice;
4444
use tantivy::fastfield::FastFieldReaders;
4545
use tantivy::schema::Field;
4646
use tantivy::{DateTime, Index, ReloadPolicy, Searcher, TantivyError, Term};
47-
use tokio::task::JoinError;
47+
use tokio::task::{JoinError, JoinSet};
4848
use tracing::*;
4949

5050
use crate::collector::{IncrementalCollector, make_collector_for_split, make_merge_collector};
@@ -1232,8 +1232,7 @@ pub async fn multi_index_leaf_search(
12321232
//
12331233
// It is a little bit tricky how to handle which is now the incremental_merge_collector, one
12341234
// per index, e.g. when to merge results and how to avoid lock contention.
1235-
let mut leaf_request_tasks = Vec::new();
1236-
1235+
let mut leaf_request_futures = JoinSet::new();
12371236
for leaf_search_request_ref in leaf_search_request.leaf_requests.into_iter() {
12381237
let index_uri = quickwit_common::uri::Uri::from_str(
12391238
leaf_search_request
@@ -1256,7 +1255,7 @@ pub async fn multi_index_leaf_search(
12561255
})?
12571256
.clone();
12581257

1259-
let leaf_request_future = tokio::spawn({
1258+
leaf_request_futures.spawn({
12601259
let storage_resolver = storage_resolver.clone();
12611260
let searcher_context = searcher_context.clone();
12621261
let search_request = search_request.clone();
@@ -1271,24 +1270,20 @@ pub async fn multi_index_leaf_search(
12711270
doc_mapper,
12721271
aggregation_limits,
12731272
)
1273+
.in_current_span()
12741274
.await
12751275
}
1276-
.in_current_span()
12771276
});
1278-
leaf_request_tasks.push(leaf_request_future);
12791277
}
12801278

1281-
let leaf_responses: Vec<crate::Result<LeafSearchResponse>> = tokio::time::timeout(
1282-
searcher_context.searcher_config.request_timeout(),
1283-
try_join_all(leaf_request_tasks),
1284-
)
1285-
.await??;
12861279
let merge_collector = make_merge_collector(&search_request, aggregation_limits)?;
12871280
let mut incremental_merge_collector = IncrementalCollector::new(merge_collector);
1288-
for result in leaf_responses {
1289-
match result {
1290-
Ok(result) => {
1291-
incremental_merge_collector.add_result(result)?;
1281+
while let Some(leaf_response_join_result) = leaf_request_futures.join_next().await {
1282+
// abort the search on join errors
1283+
let leaf_response_result = leaf_response_join_result?;
1284+
match leaf_response_result {
1285+
Ok(leaf_response) => {
1286+
incremental_merge_collector.add_result(leaf_response)?;
12921287
}
12931288
Err(err) => {
12941289
incremental_merge_collector.add_failed_split(SplitSearchError {
@@ -1379,9 +1374,6 @@ pub async fn single_doc_mapping_leaf_search(
13791374

13801375
let split_filter = Arc::new(RwLock::new(split_filter));
13811376

1382-
let mut leaf_search_single_split_join_handles: Vec<(String, tokio::task::JoinHandle<()>)> =
1383-
Vec::with_capacity(split_with_req.len());
1384-
13851377
let merge_collector = make_merge_collector(&request, aggregations_limits.clone())?;
13861378
let incremental_merge_collector = IncrementalCollector::new(merge_collector);
13871379
let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector));
@@ -1409,6 +1401,8 @@ pub async fn single_doc_mapping_leaf_search(
14091401
split_filter: split_filter.clone(),
14101402
});
14111403

1404+
let mut split_search_futures = JoinSet::new();
1405+
let mut task_id_to_split_id_map = HashMap::with_capacity(split_with_req.len());
14121406
for ((split, search_request), permit_fut) in
14131407
split_with_req.into_iter().zip(permit_futures.into_iter())
14141408
{
@@ -1424,41 +1418,39 @@ pub async fn single_doc_mapping_leaf_search(
14241418
leaf_search_state_guard.set_state(SplitSearchState::PrunedBeforeWarmup);
14251419
continue;
14261420
};
1427-
1428-
leaf_search_single_split_join_handles.push((
1429-
split.split_id.clone(),
1430-
tokio::spawn(
1431-
leaf_search_single_split_wrapper(
1432-
simplified_search_request,
1433-
leaf_search_context.clone(),
1434-
index_storage.clone(),
1435-
split,
1436-
leaf_split_search_permit,
1437-
aggregations_limits.clone(),
1438-
)
1439-
.in_current_span(),
1440-
),
1441-
));
1421+
let split_id = split.split_id.clone();
1422+
let handle = split_search_futures.spawn(
1423+
leaf_search_single_split_wrapper(
1424+
simplified_search_request,
1425+
leaf_search_context.clone(),
1426+
index_storage.clone(),
1427+
split,
1428+
leaf_split_search_permit,
1429+
aggregations_limits.clone(),
1430+
)
1431+
.in_current_span(),
1432+
);
1433+
task_id_to_split_id_map.insert(handle.id(), split_id);
14421434
}
14431435

14441436
// TODO we could cancel running splits when !run_all_splits and the running split can no
14451437
// longer give better results after some other split answered.
14461438
let mut split_search_join_errors: Vec<(String, JoinError)> = Vec::new();
14471439

1448-
// There is no need to use `join_all`, as these are spawned tasks.
1449-
for (split, leaf_search_join_handle) in leaf_search_single_split_join_handles {
1440+
while let Some(leaf_search_join_result) = split_search_futures.join_next().await {
14501441
// splits that did not panic were already added to the collector
1451-
if let Err(join_error) = leaf_search_join_handle.await {
1442+
if let Err(join_error) = leaf_search_join_result {
14521443
if join_error.is_cancelled() {
14531444
// An explicit task cancellation is not an error.
14541445
continue;
14551446
}
1447+
let split_id = task_id_to_split_id_map.get(&join_error.id()).unwrap();
14561448
if join_error.is_panic() {
1457-
error!(split=%split, "leaf search task panicked");
1449+
error!(split=%split_id, "leaf search task panicked");
14581450
} else {
1459-
error!(split=%split, "please report: leaf search was not cancelled, and could not extract panic. this should never happen");
1451+
error!(split=%split_id, "please report: leaf search was not cancelled, and could not extract panic. this should never happen");
14601452
}
1461-
split_search_join_errors.push((split, join_error));
1453+
split_search_join_errors.push((split_id.clone(), join_error));
14621454
}
14631455
}
14641456

quickwit/quickwit-search/src/service.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ impl SearchService for SearchServiceImpl {
186186
.map(|req| req.split_offsets.len())
187187
.sum::<usize>();
188188

189-
LeafSearchMetricsFuture {
189+
let tracked_future = LeafSearchMetricsFuture {
190190
tracked: multi_index_leaf_search(
191191
self.searcher_context.clone(),
192192
leaf_search_request,
@@ -195,8 +195,9 @@ impl SearchService for SearchServiceImpl {
195195
start: Instant::now(),
196196
targeted_splits: num_splits,
197197
status: None,
198-
}
199-
.await
198+
};
199+
let timeout = self.searcher_context.searcher_config.request_timeout();
200+
tokio::time::timeout(timeout, tracked_future).await?
200201
}
201202

202203
async fn fetch_docs(

0 commit comments

Comments
 (0)