diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index 2ceb8ec5d59..3a4c0602326 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::VecDeque; +use std::collections::BinaryHeap; +use std::collections::binary_heap::PeekMut; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -87,7 +88,7 @@ impl SearchPermitProvider { msg_sender: message_sender.downgrade(), num_warmup_slots_available: num_download_slots, total_memory_budget: memory_budget.as_u64(), - permits_requests: VecDeque::new(), + permits_requests: BinaryHeap::new(), total_memory_allocated: 0u64, #[cfg(test)] stopped: state_sender, @@ -134,11 +135,85 @@ struct SearchPermitActor { /// When it happens, new permits will not be assigned until the memory is freed. total_memory_budget: u64, total_memory_allocated: u64, - permits_requests: VecDeque<(oneshot::Sender, u64)>, + permits_requests: BinaryHeap, #[cfg(test)] stopped: watch::Sender, } +struct SingleSplitPermitRequest { + permit_sender: oneshot::Sender, + permit_size: u64, +} + +struct LeafPermitRequest { + /// Single split permit requests for this leaf search. + single_split_permit_requests: std::vec::IntoIter, +} + +impl Ord for LeafPermitRequest { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // we compare other with self and not the other way arround because we want a min-heap and + // Rust's is a max-heap + other + .single_split_permit_requests + .as_slice() + .len() + .cmp(&self.single_split_permit_requests.as_slice().len()) + } +} + +impl PartialOrd for LeafPermitRequest { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for LeafPermitRequest { + fn eq(&self, other: &Self) -> bool { + self.cmp(other).is_eq() + } +} + +impl Eq for LeafPermitRequest {} + +impl LeafPermitRequest { + fn from_estimated_costs(permit_sizes: Vec) -> (Self, Vec) { + let mut permits = Vec::with_capacity(permit_sizes.len()); + let mut single_split_permit_requests = Vec::with_capacity(permit_sizes.len()); + for permit_size in permit_sizes { + let (tx, rx) = oneshot::channel(); + // we keep our internal list of permits and the returned wait handles in the + // same order to make sure we emit each permit in the right order. Doing otherwise + // may cause deadlocks + single_split_permit_requests.push(SingleSplitPermitRequest { + permit_sender: tx, + permit_size, + }); + permits.push(SearchPermitFuture(rx)); + } + ( + LeafPermitRequest { + single_split_permit_requests: single_split_permit_requests.into_iter(), + }, + permits, + ) + } + + fn pop_if_smaller_than(&mut self, max_size: u64) -> Option { + // IntoIter::as_slice() allows us to peek at the next element without consuming it + match self.single_split_permit_requests.as_slice().first() { + Some(request) if request.permit_size <= max_size => { + self.single_split_permit_requests.next() + } + _ => None, + } + } + + fn is_empty(&self) -> bool { + self.single_split_permit_requests.as_slice().is_empty() + } +} + impl SearchPermitActor { async fn run(mut self) { // Stops when the last clone of SearchPermitProvider is dropped. @@ -155,12 +230,9 @@ impl SearchPermitActor { permit_sizes, permit_sender, } => { - let mut permits = Vec::with_capacity(permit_sizes.len()); - for permit_size in permit_sizes { - let (tx, rx) = oneshot::channel(); - self.permits_requests.push_back((tx, permit_size)); - permits.push(SearchPermitFuture(rx)); - } + let (leaf_permit_request, permits) = + LeafPermitRequest::from_estimated_costs(permit_sizes); + self.permits_requests.push(leaf_permit_request); self.assign_available_permits(); // The receiver could be dropped in the (unlikely) situation // where the future requesting these permits is cancelled before @@ -195,33 +267,38 @@ impl SearchPermitActor { } } - fn pop_next_request_if_serviceable(&mut self) -> Option<(oneshot::Sender, u64)> { + fn pop_next_request_if_serviceable(&mut self) -> Option { if self.num_warmup_slots_available == 0 { return None; } - if let Some((_, next_permit_size)) = self.permits_requests.front() - && self.total_memory_allocated + next_permit_size <= self.total_memory_budget - { - return self.permits_requests.pop_front(); + let available_memory = self + .total_memory_budget + .checked_sub(self.total_memory_allocated)?; + let mut peeked = self.permits_requests.peek_mut()?; + + if let Some(permit_request) = peeked.pop_if_smaller_than(available_memory) { + if peeked.is_empty() { + PeekMut::pop(peeked); + } + return Some(permit_request); } None } fn assign_available_permits(&mut self) { - while let Some((permit_requester_tx, next_permit_size)) = - self.pop_next_request_if_serviceable() - { + while let Some(permit_request) = self.pop_next_request_if_serviceable() { let mut ongoing_gauge_guard = GaugeGuard::from_gauge( &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, ); ongoing_gauge_guard.add(1); - self.total_memory_allocated += next_permit_size; + self.total_memory_allocated += permit_request.permit_size; self.num_warmup_slots_available -= 1; - permit_requester_tx + permit_request + .permit_sender .send(SearchPermit { _ongoing_gauge_guard: ongoing_gauge_guard, msg_sender: self.msg_sender.clone(), - memory_allocation: next_permit_size, + memory_allocation: permit_request.permit_size, warmup_slot_freed: false, }) // if the requester dropped its receiver, we drop the newly @@ -362,6 +439,75 @@ mod tests { } } + #[tokio::test] + async fn test_search_permit_order_with_concurrent_search() { + let permit_provider = SearchPermitProvider::new(4, ByteSize::mb(100)); + let mut all_futures = Vec::new(); + let first_batch_of_permits = permit_provider + .get_permits(repeat_n(ByteSize::mb(10), 8)) + .await; + assert_eq!(first_batch_of_permits.len(), 8); + all_futures.extend( + first_batch_of_permits + .into_iter() + .enumerate() + .map(move |(i, fut)| ((1, i), fut)), + ); + + let second_batch_of_permits = permit_provider + .get_permits(repeat_n(ByteSize::mb(10), 2)) + .await; + all_futures.extend( + second_batch_of_permits + .into_iter() + .enumerate() + .map(move |(i, fut)| ((2, i), fut)), + ); + + let third_batch_of_permits = permit_provider + .get_permits(repeat_n(ByteSize::mb(10), 6)) + .await; + all_futures.extend( + third_batch_of_permits + .into_iter() + .enumerate() + .map(move |(i, fut)| ((3, i), fut)), + ); + + // not super useful, considering what join set does, but still a tiny bit more sound. + all_futures.shuffle(&mut rand::rng()); + + let mut join_set = JoinSet::new(); + for (res, fut) in all_futures { + join_set.spawn(async move { + let permit = fut.await; + (res, permit) + }); + } + let mut ordered_result: Vec<(usize, usize)> = Vec::with_capacity(20); + while let Some(Ok(((batch_id, order), _permit))) = join_set.join_next().await { + ordered_result.push((batch_id, order)); + } + + let mut counters = [0; 4]; + let expected_result: Vec<(usize, usize)> = [ + 1, 1, 1, 1, // initial 4 permits + 2, 2, 1, 1, 1, 1, 3, 3, 3, 3, 3, 3, + ] + .into_iter() + .map(|batch_id| { + let order = counters[batch_id]; + counters[batch_id] += 1; + (batch_id, order) + }) + .collect(); + + // for the first 4 permits, the order is not well defined as they are all granted at once, + // and we poll futures in a random order. We sort them to fix that artifact + ordered_result[..4].sort(); + assert_eq!(ordered_result, expected_result); + } + #[tokio::test] async fn test_search_permit_early_drops() { let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100));