Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 166 additions & 20 deletions quickwit/quickwit-search/src/search_permit_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::VecDeque;
use std::collections::BinaryHeap;
use std::collections::binary_heap::PeekMut;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -87,7 +88,7 @@ impl SearchPermitProvider {
msg_sender: message_sender.downgrade(),
num_warmup_slots_available: num_download_slots,
total_memory_budget: memory_budget.as_u64(),
permits_requests: VecDeque::new(),
permits_requests: BinaryHeap::new(),
total_memory_allocated: 0u64,
#[cfg(test)]
stopped: state_sender,
Expand Down Expand Up @@ -134,11 +135,85 @@ struct SearchPermitActor {
/// When it happens, new permits will not be assigned until the memory is freed.
total_memory_budget: u64,
total_memory_allocated: u64,
permits_requests: VecDeque<(oneshot::Sender<SearchPermit>, u64)>,
permits_requests: BinaryHeap<LeafPermitRequest>,
#[cfg(test)]
stopped: watch::Sender<bool>,
}

struct SingleSplitPermitRequest {
permit_sender: oneshot::Sender<SearchPermit>,
permit_size: u64,
}

struct LeafPermitRequest {
/// Single split permit requests for this leaf search.
single_split_permit_requests: std::vec::IntoIter<SingleSplitPermitRequest>,
}

impl Ord for LeafPermitRequest {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// we compare other with self and not the other way arround because we want a min-heap and
// Rust's is a max-heap
other
.single_split_permit_requests
.as_slice()
.len()
.cmp(&self.single_split_permit_requests.as_slice().len())
}
}

impl PartialOrd for LeafPermitRequest {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl PartialEq for LeafPermitRequest {
fn eq(&self, other: &Self) -> bool {
self.cmp(other).is_eq()
}
}

impl Eq for LeafPermitRequest {}

impl LeafPermitRequest {
fn from_estimated_costs(permit_sizes: Vec<u64>) -> (Self, Vec<SearchPermitFuture>) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment highlighting that the search permit futures are ordered like the permits?

let mut permits = Vec::with_capacity(permit_sizes.len());
let mut single_split_permit_requests = Vec::with_capacity(permit_sizes.len());
for permit_size in permit_sizes {
let (tx, rx) = oneshot::channel();
// we keep our internal list of permits and the returned wait handles in the
// same order to make sure we emit each permit in the right order. Doing otherwise
// may cause deadlocks
single_split_permit_requests.push(SingleSplitPermitRequest {
permit_sender: tx,
permit_size,
});
permits.push(SearchPermitFuture(rx));
}
(
LeafPermitRequest {
single_split_permit_requests: single_split_permit_requests.into_iter(),
},
permits,
)
}

fn pop_if_smaller_than(&mut self, max_size: u64) -> Option<SingleSplitPermitRequest> {
// IntoIter::as_slice() allows us to peek at the next element without consuming it
match self.single_split_permit_requests.as_slice().first() {
Some(request) if request.permit_size <= max_size => {
self.single_split_permit_requests.next()
}
_ => None,
}
}

fn is_empty(&self) -> bool {
self.single_split_permit_requests.as_slice().is_empty()
}
}

impl SearchPermitActor {
async fn run(mut self) {
// Stops when the last clone of SearchPermitProvider is dropped.
Expand All @@ -155,12 +230,9 @@ impl SearchPermitActor {
permit_sizes,
permit_sender,
} => {
let mut permits = Vec::with_capacity(permit_sizes.len());
for permit_size in permit_sizes {
let (tx, rx) = oneshot::channel();
self.permits_requests.push_back((tx, permit_size));
permits.push(SearchPermitFuture(rx));
}
let (leaf_permit_request, permits) =
LeafPermitRequest::from_estimated_costs(permit_sizes);
self.permits_requests.push(leaf_permit_request);
self.assign_available_permits();
// The receiver could be dropped in the (unlikely) situation
// where the future requesting these permits is cancelled before
Expand Down Expand Up @@ -195,33 +267,38 @@ impl SearchPermitActor {
}
}

fn pop_next_request_if_serviceable(&mut self) -> Option<(oneshot::Sender<SearchPermit>, u64)> {
fn pop_next_request_if_serviceable(&mut self) -> Option<SingleSplitPermitRequest> {
if self.num_warmup_slots_available == 0 {
return None;
}
if let Some((_, next_permit_size)) = self.permits_requests.front()
&& self.total_memory_allocated + next_permit_size <= self.total_memory_budget
{
return self.permits_requests.pop_front();
let available_memory = self
.total_memory_budget
.checked_sub(self.total_memory_allocated)?;
let mut peeked = self.permits_requests.peek_mut()?;

if let Some(permit_request) = peeked.pop_if_smaller_than(available_memory) {
if peeked.is_empty() {
PeekMut::pop(peeked);
}
return Some(permit_request);
}
None
}

fn assign_available_permits(&mut self) {
while let Some((permit_requester_tx, next_permit_size)) =
self.pop_next_request_if_serviceable()
{
while let Some(permit_request) = self.pop_next_request_if_serviceable() {
let mut ongoing_gauge_guard = GaugeGuard::from_gauge(
&crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing,
);
ongoing_gauge_guard.add(1);
self.total_memory_allocated += next_permit_size;
self.total_memory_allocated += permit_request.permit_size;
self.num_warmup_slots_available -= 1;
permit_requester_tx
permit_request
.permit_sender
.send(SearchPermit {
_ongoing_gauge_guard: ongoing_gauge_guard,
msg_sender: self.msg_sender.clone(),
memory_allocation: next_permit_size,
memory_allocation: permit_request.permit_size,
warmup_slot_freed: false,
})
// if the requester dropped its receiver, we drop the newly
Expand Down Expand Up @@ -362,6 +439,75 @@ mod tests {
}
}

#[tokio::test]
async fn test_search_permit_order_with_concurrent_search() {
let permit_provider = SearchPermitProvider::new(4, ByteSize::mb(100));
let mut all_futures = Vec::new();
let first_batch_of_permits = permit_provider
.get_permits(repeat_n(ByteSize::mb(10), 8))
.await;
assert_eq!(first_batch_of_permits.len(), 8);
all_futures.extend(
first_batch_of_permits
.into_iter()
.enumerate()
.map(move |(i, fut)| ((1, i), fut)),
);

let second_batch_of_permits = permit_provider
.get_permits(repeat_n(ByteSize::mb(10), 2))
.await;
all_futures.extend(
second_batch_of_permits
.into_iter()
.enumerate()
.map(move |(i, fut)| ((2, i), fut)),
);

let third_batch_of_permits = permit_provider
.get_permits(repeat_n(ByteSize::mb(10), 6))
.await;
all_futures.extend(
third_batch_of_permits
.into_iter()
.enumerate()
.map(move |(i, fut)| ((3, i), fut)),
);

// not super useful, considering what join set does, but still a tiny bit more sound.
all_futures.shuffle(&mut rand::rng());

let mut join_set = JoinSet::new();
for (res, fut) in all_futures {
join_set.spawn(async move {
let permit = fut.await;
(res, permit)
});
}
let mut ordered_result: Vec<(usize, usize)> = Vec::with_capacity(20);
while let Some(Ok(((batch_id, order), _permit))) = join_set.join_next().await {
ordered_result.push((batch_id, order));
}

let mut counters = [0; 4];
let expected_result: Vec<(usize, usize)> = [
1, 1, 1, 1, // initial 4 permits
2, 2, 1, 1, 1, 1, 3, 3, 3, 3, 3, 3,
]
.into_iter()
.map(|batch_id| {
let order = counters[batch_id];
counters[batch_id] += 1;
(batch_id, order)
})
.collect();

// for the first 4 permits, the order is not well defined as they are all granted at once,
// and we poll futures in a random order. We sort them to fix that artifact
ordered_result[..4].sort();
assert_eq!(ordered_result, expected_result);
}

#[tokio::test]
async fn test_search_permit_early_drops() {
let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100));
Expand Down