From 17535cc8f29752ab8305ff08ed921d9e01df1dcf Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Mon, 26 Jan 2026 19:03:22 +0100 Subject: [PATCH 1/4] grant search permits with a Shortest Remaining Time First policy --- .../src/search_permit_provider.rs | 163 ++++++++++++++++-- 1 file changed, 150 insertions(+), 13 deletions(-) diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index 2ceb8ec5d59..fc7ded9f44d 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::VecDeque; +use std::collections::BinaryHeap; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -87,7 +87,7 @@ impl SearchPermitProvider { msg_sender: message_sender.downgrade(), num_warmup_slots_available: num_download_slots, total_memory_budget: memory_budget.as_u64(), - permits_requests: VecDeque::new(), + permits_requests: BinaryHeap::new(), total_memory_allocated: 0u64, #[cfg(test)] stopped: state_sender, @@ -134,11 +134,72 @@ struct SearchPermitActor { /// When it happens, new permits will not be assigned until the memory is freed. total_memory_budget: u64, total_memory_allocated: u64, - permits_requests: VecDeque<(oneshot::Sender, u64)>, + permits_requests: BinaryHeap, #[cfg(test)] stopped: watch::Sender, } +struct LeafPermitRequest { + single_split_permit_requests: Vec<(oneshot::Sender, u64)>, +} + +impl Ord for LeafPermitRequest { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // we compare other with self and not the other way arround because we want a min-heap and + // Rust's is a max-heap + other + .single_split_permit_requests + .len() + .cmp(&self.single_split_permit_requests.len()) + } +} + +impl PartialOrd for LeafPermitRequest { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for LeafPermitRequest { + fn eq(&self, other: &Self) -> bool { + self.cmp(other).is_eq() + } +} + +impl Eq for LeafPermitRequest {} + +impl LeafPermitRequest { + fn from_estimated_costs(permit_sizes: Vec) -> (Self, Vec) { + let mut permits = Vec::with_capacity(permit_sizes.len()); + let mut single_split_permit_requests = Vec::with_capacity(permit_sizes.len()); + for permit_size in permit_sizes { + let (tx, rx) = oneshot::channel(); + single_split_permit_requests.push((tx, permit_size)); + permits.push(SearchPermitFuture(rx)); + } + // we do this so we can pop instead of using a vecdeque or continually call remove(0) + single_split_permit_requests.reverse(); + ( + LeafPermitRequest { + single_split_permit_requests, + }, + permits, + ) + } + + fn pop_if_smaller_than( + &mut self, + max_size: u64, + ) -> Option<(oneshot::Sender, u64)> { + self.single_split_permit_requests + .pop_if(|(_, permit_size)| *permit_size <= max_size) + } + + fn is_empty(&self) -> bool { + self.single_split_permit_requests.is_empty() + } +} + impl SearchPermitActor { async fn run(mut self) { // Stops when the last clone of SearchPermitProvider is dropped. @@ -155,12 +216,9 @@ impl SearchPermitActor { permit_sizes, permit_sender, } => { - let mut permits = Vec::with_capacity(permit_sizes.len()); - for permit_size in permit_sizes { - let (tx, rx) = oneshot::channel(); - self.permits_requests.push_back((tx, permit_size)); - permits.push(SearchPermitFuture(rx)); - } + let (leaf_permit_request, permits) = + LeafPermitRequest::from_estimated_costs(permit_sizes); + self.permits_requests.push(leaf_permit_request); self.assign_available_permits(); // The receiver could be dropped in the (unlikely) situation // where the future requesting these permits is cancelled before @@ -196,13 +254,23 @@ impl SearchPermitActor { } fn pop_next_request_if_serviceable(&mut self) -> Option<(oneshot::Sender, u64)> { - if self.num_warmup_slots_available == 0 { + if self.num_warmup_slots_available == 0 + || self.total_memory_budget <= self.total_memory_allocated + { return None; } - if let Some((_, next_permit_size)) = self.permits_requests.front() - && self.total_memory_allocated + next_permit_size <= self.total_memory_budget + let mut peeked = self.permits_requests.peek_mut()?; + + if let Some(permit_request) = + peeked.pop_if_smaller_than(self.total_memory_budget - self.total_memory_allocated) { - return self.permits_requests.pop_front(); + if peeked.is_empty() { + drop(peeked); + // our modification can only have made our peeked element "higher priority", so the + // element we'll pop must be the one we just put back + self.permits_requests.pop(); + } + return Some(permit_request); } None } @@ -362,6 +430,75 @@ mod tests { } } + #[tokio::test] + async fn test_search_permit_order_with_concurrent_search() { + let permit_provider = SearchPermitProvider::new(4, ByteSize::mb(100)); + let mut all_futures = Vec::new(); + let first_batch_of_permits = permit_provider + .get_permits(repeat_n(ByteSize::mb(10), 8)) + .await; + assert_eq!(first_batch_of_permits.len(), 8); + all_futures.extend( + first_batch_of_permits + .into_iter() + .enumerate() + .map(move |(i, fut)| ((1, i), fut)), + ); + + let second_batch_of_permits = permit_provider + .get_permits(repeat_n(ByteSize::mb(10), 2)) + .await; + all_futures.extend( + second_batch_of_permits + .into_iter() + .enumerate() + .map(move |(i, fut)| ((2, i), fut)), + ); + + let third_batch_of_permits = permit_provider + .get_permits(repeat_n(ByteSize::mb(10), 6)) + .await; + all_futures.extend( + third_batch_of_permits + .into_iter() + .enumerate() + .map(move |(i, fut)| ((3, i), fut)), + ); + + // not super useful, considering what join set does, but still a tiny bit more sound. + all_futures.shuffle(&mut rand::rng()); + + let mut join_set = JoinSet::new(); + for (res, fut) in all_futures { + join_set.spawn(async move { + let permit = fut.await; + (res, permit) + }); + } + let mut ordered_result: Vec<(usize, usize)> = Vec::with_capacity(20); + while let Some(Ok(((batch_id, order), _permit))) = join_set.join_next().await { + ordered_result.push((batch_id, order)); + } + + let mut counters = [0; 4]; + let expected_result: Vec<(usize, usize)> = [ + 1, 1, 1, 1, // initial 4 permits + 2, 2, 1, 1, 1, 1, 3, 3, 3, 3, 3, 3, + ] + .into_iter() + .map(|batch_id| { + let order = counters[batch_id]; + counters[batch_id] += 1; + (batch_id, order) + }) + .collect(); + + // for the first 4 permits, the order is not well defined as they are all granted at once, + // and we poll futures in a random order. We sort them to fix that artifact + ordered_result[..4].sort(); + assert_eq!(ordered_result, expected_result); + } + #[tokio::test] async fn test_search_permit_early_drops() { let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100)); From ac0c2ab1534313fc3e00f769168ed2e9502e5729 Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Fri, 30 Jan 2026 11:48:46 +0100 Subject: [PATCH 2/4] use PeekMut::pop --- quickwit/quickwit-search/src/search_permit_provider.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index fc7ded9f44d..1775252b2b3 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::BinaryHeap; +use std::collections::binary_heap::PeekMut; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -265,10 +266,7 @@ impl SearchPermitActor { peeked.pop_if_smaller_than(self.total_memory_budget - self.total_memory_allocated) { if peeked.is_empty() { - drop(peeked); - // our modification can only have made our peeked element "higher priority", so the - // element we'll pop must be the one we just put back - self.permits_requests.pop(); + PeekMut::pop(peeked); } return Some(permit_request); } From 6d0e4c43755ba5671d86d1bb92c0e8234a775225 Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Tue, 10 Feb 2026 12:40:48 +0100 Subject: [PATCH 3/4] address rc --- .../src/search_permit_provider.rs | 70 ++++++++++++------- 1 file changed, 44 insertions(+), 26 deletions(-) diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index 1775252b2b3..e785f5d92a5 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -140,8 +140,14 @@ struct SearchPermitActor { stopped: watch::Sender, } +struct SingleSplitPermitRequest { + permit_sender: oneshot::Sender, + permit_size: u64, +} + struct LeafPermitRequest { - single_split_permit_requests: Vec<(oneshot::Sender, u64)>, + /// Single split permit requests for this leaf search. + single_split_permit_requests: std::vec::IntoIter, } impl Ord for LeafPermitRequest { @@ -150,8 +156,9 @@ impl Ord for LeafPermitRequest { // Rust's is a max-heap other .single_split_permit_requests + .as_slice() .len() - .cmp(&self.single_split_permit_requests.len()) + .cmp(&self.single_split_permit_requests.as_slice().len()) } } @@ -175,29 +182,39 @@ impl LeafPermitRequest { let mut single_split_permit_requests = Vec::with_capacity(permit_sizes.len()); for permit_size in permit_sizes { let (tx, rx) = oneshot::channel(); - single_split_permit_requests.push((tx, permit_size)); + // we keep our internal list of permits and the returned wait handles in the + // same order to make sure we emit each permit in the right order. Doing otherwise + // may cause deadlocks + single_split_permit_requests.push(SingleSplitPermitRequest { + permit_sender: tx, + permit_size, + }); permits.push(SearchPermitFuture(rx)); } - // we do this so we can pop instead of using a vecdeque or continually call remove(0) - single_split_permit_requests.reverse(); ( LeafPermitRequest { - single_split_permit_requests, + single_split_permit_requests: single_split_permit_requests.into_iter(), }, permits, ) } - fn pop_if_smaller_than( - &mut self, - max_size: u64, - ) -> Option<(oneshot::Sender, u64)> { - self.single_split_permit_requests - .pop_if(|(_, permit_size)| *permit_size <= max_size) + fn pop_if_smaller_than(&mut self, max_size: u64) -> Option { + // IntoIter::as_slice() allows us to peek at the next element without consuming it + if self + .single_split_permit_requests + .as_slice() + .first() + .is_some_and(|request| request.permit_size <= max_size) + { + self.single_split_permit_requests.next() + } else { + None + } } fn is_empty(&self) -> bool { - self.single_split_permit_requests.is_empty() + self.single_split_permit_requests.as_slice().is_empty() } } @@ -254,17 +271,19 @@ impl SearchPermitActor { } } - fn pop_next_request_if_serviceable(&mut self) -> Option<(oneshot::Sender, u64)> { - if self.num_warmup_slots_available == 0 - || self.total_memory_budget <= self.total_memory_allocated - { + fn pop_next_request_if_serviceable(&mut self) -> Option { + if self.num_warmup_slots_available == 0 { return None; } + let Some(available_memory) = self + .total_memory_budget + .checked_sub(self.total_memory_allocated) + else { + return None; + }; let mut peeked = self.permits_requests.peek_mut()?; - if let Some(permit_request) = - peeked.pop_if_smaller_than(self.total_memory_budget - self.total_memory_allocated) - { + if let Some(permit_request) = peeked.pop_if_smaller_than(available_memory) { if peeked.is_empty() { PeekMut::pop(peeked); } @@ -274,20 +293,19 @@ impl SearchPermitActor { } fn assign_available_permits(&mut self) { - while let Some((permit_requester_tx, next_permit_size)) = - self.pop_next_request_if_serviceable() - { + while let Some(permit_request) = self.pop_next_request_if_serviceable() { let mut ongoing_gauge_guard = GaugeGuard::from_gauge( &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, ); ongoing_gauge_guard.add(1); - self.total_memory_allocated += next_permit_size; + self.total_memory_allocated += permit_request.permit_size; self.num_warmup_slots_available -= 1; - permit_requester_tx + permit_request + .permit_sender .send(SearchPermit { _ongoing_gauge_guard: ongoing_gauge_guard, msg_sender: self.msg_sender.clone(), - memory_allocation: next_permit_size, + memory_allocation: permit_request.permit_size, warmup_slot_freed: false, }) // if the requester dropped its receiver, we drop the newly From 5279ef24676d35cb271ee50da76ba3936af904ac Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Tue, 10 Feb 2026 13:51:37 +0100 Subject: [PATCH 4/4] clippy --- .../src/search_permit_provider.rs | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index e785f5d92a5..3a4c0602326 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -201,15 +201,11 @@ impl LeafPermitRequest { fn pop_if_smaller_than(&mut self, max_size: u64) -> Option { // IntoIter::as_slice() allows us to peek at the next element without consuming it - if self - .single_split_permit_requests - .as_slice() - .first() - .is_some_and(|request| request.permit_size <= max_size) - { - self.single_split_permit_requests.next() - } else { - None + match self.single_split_permit_requests.as_slice().first() { + Some(request) if request.permit_size <= max_size => { + self.single_split_permit_requests.next() + } + _ => None, } } @@ -275,12 +271,9 @@ impl SearchPermitActor { if self.num_warmup_slots_available == 0 { return None; } - let Some(available_memory) = self + let available_memory = self .total_memory_budget - .checked_sub(self.total_memory_allocated) - else { - return None; - }; + .checked_sub(self.total_memory_allocated)?; let mut peeked = self.permits_requests.peek_mut()?; if let Some(permit_request) = peeked.pop_if_smaller_than(available_memory) {