Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,24 @@ jobs:
- name: Cleanup Disk Space
run: |
df -h
sudo rm -rf /usr/share/dotnet
sudo rm -rf /usr/local/lib/android
sudo rm -rf /usr/share/swift
sudo rm -rf /usr/local/.ghcup
sudo rm -rf /opt/hostedtoolcache/CodeQL
df -h

if [ "$(df -BG / | awk 'NR==2 {gsub("G","",$4); print $4}')" -lt 30 ]; then
echo "Less than 30GiB available. Running cleanup..."
sudo rm -rf /usr/share/dotnet
sudo rm -rf /usr/local/lib/android
sudo rm -rf /usr/share/swift
sudo rm -rf /usr/local/.ghcup
sudo rm -rf /opt/hostedtoolcache/CodeQL
df -h
else
echo "30GiB or more available. Skipping cleanup."
fi

- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
- name: Install Ubuntu packages
run: sudo apt-get -y install protobuf-compiler
run: |
sudo apt-get update
sudo apt-get -y install protobuf-compiler
- uses: actions/setup-python@83679a892e2d95755f2dac6acb0bfd1e9ac5d548 # v.6.1.0
with:
python-version: '3.11'
Expand All @@ -90,7 +98,7 @@ jobs:
workspaces: "./quickwit -> target"
- name: Install nextest
if: always() && steps.modified.outputs.rust_src == 'true'
uses: taiki-e/cache-cargo-install-action@34ce5120836e5f9f1508d8713d7fdea0e8facd6f # v3.0.1
uses: taiki-e/install-action@aba36d755ec7ca22d38b12111787c26115943952
with:
tool: cargo-nextest
- name: cargo build
Expand Down Expand Up @@ -132,7 +140,9 @@ jobs:
- .github/workflows/ci.yml
- name: Install Ubuntu packages
if: always() && steps.modified.outputs.rust_src == 'true'
run: sudo apt-get -y install protobuf-compiler
run: |
sudo apt-get update
sudo apt-get -y install protobuf-compiler
- name: Setup nightly Rust Toolchain (for rustfmt)
if: steps.modified.outputs.rust_src == 'true'
uses: dtolnay/rust-toolchain@f7ccc83f9ed1e5b9c81d8a67d7ad1a747e22a561 # master
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,100 +18,17 @@ use anyhow::{Context, Result};
use bytesize::ByteSize;
use quickwit_cluster::{Cluster, ListenerHandle};
use quickwit_common::pubsub::{Event, EventBroker};
use quickwit_common::ring_buffer::RingBuffer;
use quickwit_common::shared_consts::INGESTER_CAPACITY_SCORE_PREFIX;
use quickwit_proto::ingest::ingester::IngesterStatus;
use quickwit_proto::types::{IndexUid, NodeId, SourceId, SourceUid};
use quickwit_proto::types::{NodeId, SourceUid};
use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle;
use tracing::{info, warn};

use super::{BROADCAST_INTERVAL_PERIOD, make_key, parse_key};
use crate::OpenShardCounts;
use crate::ingest_v2::state::WeakIngesterState;

pub type OpenShardCounts = Vec<(IndexUid, SourceId, usize)>;

/// The lookback window length is meant to capture readings far enough back in time to give
/// a rough rate of change estimate. At size 6, with broadcast interval of 5 seconds, this would be
/// 30 seconds of readings.
const WAL_CAPACITY_LOOKBACK_WINDOW_LEN: usize = 6;

/// The ring buffer stores one extra element so that `delta()` can compare the newest reading
/// with the one that is exactly `WAL_CAPACITY_LOOKBACK_WINDOW_LEN` steps ago. Otherwise, that
/// reading would be discarded when the next reading is inserted.
const WAL_CAPACITY_READINGS_LEN: usize = WAL_CAPACITY_LOOKBACK_WINDOW_LEN + 1;

struct WalDiskCapacityTimeSeries {
memory_capacity: ByteSize,
readings: RingBuffer<f64, WAL_CAPACITY_READINGS_LEN>,
}

impl WalDiskCapacityTimeSeries {
fn new(memory_capacity: ByteSize) -> Self {
#[cfg(not(test))]
assert!(memory_capacity.as_u64() > 0);
Self {
memory_capacity,
readings: RingBuffer::default(),
}
}

fn record(&mut self, memory_used: ByteSize) {
let remaining = 1.0 - (memory_used.as_u64() as f64 / self.memory_capacity.as_u64() as f64);
self.readings.push_back(remaining.clamp(0.0, 1.0));
}

fn current(&self) -> Option<f64> {
self.readings.last()
}

/// How much remaining capacity changed between the oldest and newest readings.
/// Positive = improving, negative = draining.
fn delta(&self) -> Option<f64> {
let current = self.readings.last()?;
let oldest = self.readings.front()?;
Some(current - oldest)
}
}

/// Computes a capacity score from 0 to 10 using a PD controller.
///
/// The score has two components:
///
/// - **P (proportional):** How much WAL capacity remains right now. An ingester with 100% free
/// capacity gets `PROPORTIONAL_WEIGHT` points; 50% gets half; and so on. If remaining capacity
/// drops to `MIN_PERMISSIBLE_CAPACITY` or below, the score is immediately 0.
///
/// - **D (derivative):** Up to `DERIVATIVE_WEIGHT` bonus points based on how fast remaining
/// capacity is changing over the lookback window. A higher drain rate is worse, so we invert it:
/// `drain / MAX_DRAIN_RATE` normalizes the drain to a 0–1 penalty, and subtracting from 1
/// converts it into a 0–1 bonus. Multiplied by `DERIVATIVE_WEIGHT`, a stable node gets the full
/// bonus and a node draining at `MAX_DRAIN_RATE` or faster gets nothing.
///
/// Putting it together: a completely idle ingester scores 10 (8 + 2).
/// One that is full but stable scores ~2. One that is draining rapidly scores less.
/// A score of 0 means the ingester is at or below minimum permissible capacity.
///
/// Below this remaining capacity fraction, the score is immediately 0.
const MIN_PERMISSIBLE_CAPACITY: f64 = 0.05;
/// Weight of the proportional term (max points from P).
const PROPORTIONAL_WEIGHT: f64 = 8.0;
/// Weight of the derivative term (max points from D).
const DERIVATIVE_WEIGHT: f64 = 2.0;
/// The drain rate (as a fraction of total capacity over the lookback window) at which the
/// derivative penalty is fully applied. Drain rates beyond this are clamped.
const MAX_DRAIN_RATE: f64 = 0.10;

fn compute_capacity_score(remaining_capacity: f64, capacity_delta: f64) -> usize {
if remaining_capacity <= MIN_PERMISSIBLE_CAPACITY {
return 0;
}
let p = PROPORTIONAL_WEIGHT * remaining_capacity;
let drain = (-capacity_delta).clamp(0.0, MAX_DRAIN_RATE);
let d = DERIVATIVE_WEIGHT * (1.0 - drain / MAX_DRAIN_RATE);
(p + d).clamp(0.0, 10.0) as usize
}

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct IngesterCapacityScore {
pub capacity_score: usize,
Expand All @@ -123,24 +40,18 @@ pub struct IngesterCapacityScore {
pub struct BroadcastIngesterCapacityScoreTask {
cluster: Cluster,
weak_state: WeakIngesterState,
wal_capacity_time_series: WalDiskCapacityTimeSeries,
}

impl BroadcastIngesterCapacityScoreTask {
pub fn spawn(
cluster: Cluster,
weak_state: WeakIngesterState,
disk_capacity: ByteSize,
) -> JoinHandle<()> {
pub fn spawn(cluster: Cluster, weak_state: WeakIngesterState) -> JoinHandle<()> {
let mut broadcaster = Self {
cluster,
weak_state,
wal_capacity_time_series: WalDiskCapacityTimeSeries::new(disk_capacity),
};
tokio::spawn(async move { broadcaster.run().await })
}

async fn snapshot(&self) -> Result<Option<(ByteSize, OpenShardCounts)>> {
async fn snapshot(&self) -> Result<Option<(usize, OpenShardCounts)>> {
let state = self
.weak_state
.upgrade()
Expand All @@ -152,15 +63,16 @@ impl BroadcastIngesterCapacityScoreTask {
return Ok(None);
}

let guard = state
let mut guard = state
.lock_fully()
.await
.map_err(|_| anyhow::anyhow!("failed to acquire ingester state lock"))?;
let usage = guard.mrecordlog.resource_usage();
let disk_used = ByteSize::b(usage.disk_used_bytes as u64);
let open_shard_counts = guard.get_open_shard_counts();
let capacity_score = guard.wal_capacity_time_series.record_and_score(disk_used);
let (open_shard_counts, _) = guard.get_shard_snapshot();

Ok(Some((disk_used, open_shard_counts)))
Ok(Some((capacity_score, open_shard_counts)))
}

async fn run(&mut self) {
Expand All @@ -170,7 +82,7 @@ impl BroadcastIngesterCapacityScoreTask {
loop {
interval.tick().await;

let (disk_used, open_shard_counts) = match self.snapshot().await {
let (capacity_score, open_shard_counts) = match self.snapshot().await {
Ok(Some(snapshot)) => snapshot,
Ok(None) => continue,
Err(error) => {
Expand All @@ -179,12 +91,6 @@ impl BroadcastIngesterCapacityScoreTask {
}
};

self.wal_capacity_time_series.record(disk_used);

let remaining_capacity = self.wal_capacity_time_series.current().unwrap_or(1.0);
let capacity_delta = self.wal_capacity_time_series.delta().unwrap_or(0.0);
let capacity_score = compute_capacity_score(remaining_capacity, capacity_delta);

previous_sources = self
.broadcast_capacity(capacity_score, &open_shard_counts, &previous_sources)
.await;
Expand Down Expand Up @@ -266,90 +172,12 @@ mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};

use quickwit_cluster::{ChannelTransport, create_cluster_for_test};
use quickwit_proto::types::ShardId;
use quickwit_proto::types::{IndexUid, ShardId, SourceId};

use super::*;
use crate::ingest_v2::models::IngesterShard;
use crate::ingest_v2::state::IngesterState;

fn ts() -> WalDiskCapacityTimeSeries {
WalDiskCapacityTimeSeries::new(ByteSize::b(100))
}

/// Helper: record a reading with `used` bytes against the series' fixed capacity.
fn record(series: &mut WalDiskCapacityTimeSeries, used: u64) {
series.record(ByteSize::b(used));
}

#[test]
fn test_wal_disk_capacity_current_after_record() {
let mut series = WalDiskCapacityTimeSeries::new(ByteSize::b(256));
// 192 of 256 used => 25% remaining
series.record(ByteSize::b(192));
assert_eq!(series.current(), Some(0.25));

// 16 of 256 used => 93.75% remaining
series.record(ByteSize::b(16));
assert_eq!(series.current(), Some(0.9375));
}

#[test]
fn test_wal_disk_capacity_record_saturates_at_zero() {
let mut series = ts();
// 200 used out of 100 capacity => clamped to 0.0
record(&mut series, 200);
assert_eq!(series.current(), Some(0.0));
}

#[test]
fn test_wal_disk_capacity_delta_growing() {
let mut series = ts();
// oldest: 60 of 100 used => 40% remaining
record(&mut series, 60);
// current: 20 of 100 used => 80% remaining
record(&mut series, 20);
// delta = 0.80 - 0.40 = 0.40
assert_eq!(series.delta(), Some(0.40));
}

#[test]
fn test_wal_disk_capacity_delta_shrinking() {
let mut series = ts();
// oldest: 20 of 100 used => 80% remaining
record(&mut series, 20);
// current: 60 of 100 used => 40% remaining
record(&mut series, 60);
// delta = 0.40 - 0.80 = -0.40
assert_eq!(series.delta(), Some(-0.40));
}

#[test]
fn test_capacity_score_draining_vs_stable() {
// Node A: capacity draining — usage increases 10, 20, ..., 70 over 7 ticks.
let mut node_a = ts();
for used in (10..=70).step_by(10) {
record(&mut node_a, used);
}
let a_remaining = node_a.current().unwrap();
let a_delta = node_a.delta().unwrap();
let a_score = compute_capacity_score(a_remaining, a_delta);

// Node B: steady at 50% usage over 7 ticks.
let mut node_b = ts();
for _ in 0..7 {
record(&mut node_b, 50);
}
let b_remaining = node_b.current().unwrap();
let b_delta = node_b.delta().unwrap();
let b_score = compute_capacity_score(b_remaining, b_delta);

// p=2.4, d=0 (max drain) => 2
assert_eq!(a_score, 2);
// p=4, d=2 (stable) => 6
assert_eq!(b_score, 6);
assert!(b_score > a_score);
}

#[tokio::test]
async fn test_snapshot_state_dropped() {
let transport = ChannelTransport::default();
Expand All @@ -363,7 +191,6 @@ mod tests {
let task = BroadcastIngesterCapacityScoreTask {
cluster,
weak_state,
wal_capacity_time_series: WalDiskCapacityTimeSeries::new(ByteSize::mb(1)),
};
assert!(task.snapshot().await.is_err());
}
Expand All @@ -376,7 +203,9 @@ mod tests {
.unwrap();
let event_broker = EventBroker::default();

let (_temp_dir, state) = IngesterState::for_test().await;
// Use 1000 bytes disk capacity so 500 used => 50% remaining, 0 delta => score = 6
let (_temp_dir, state) =
IngesterState::for_test_with_disk_capacity(ByteSize::b(1000)).await;
let index_uid = IndexUid::for_test("test-index", 0);
let mut state_guard = state.lock_partially().await.unwrap();
let shard = IngesterShard::new_solo(
Expand All @@ -387,21 +216,18 @@ mod tests {
.advertisable()
.build();
state_guard.shards.insert(shard.queue_id(), shard);
let open_shard_counts = state_guard.get_open_shard_counts();
let (open_shard_counts, _) = state_guard.get_shard_snapshot();
let capacity_score = state_guard
.wal_capacity_time_series
.record_and_score(ByteSize::b(500));
drop(state_guard);

// Simulate 500 of 1000 bytes capacity used => 50% remaining, 0 delta => score = 6
let mut task = BroadcastIngesterCapacityScoreTask {
assert_eq!(capacity_score, 6);

let task = BroadcastIngesterCapacityScoreTask {
cluster: cluster.clone(),
weak_state: state.weak(),
wal_capacity_time_series: WalDiskCapacityTimeSeries::new(ByteSize::b(1000)),
};
task.wal_capacity_time_series.record(ByteSize::b(500));

let remaining = task.wal_capacity_time_series.current().unwrap();
let delta = task.wal_capacity_time_series.delta().unwrap();
let capacity_score = compute_capacity_score(remaining, delta);
assert_eq!(capacity_score, 6);

let update_counter = Arc::new(AtomicUsize::new(0));
let update_counter_clone = update_counter.clone();
Expand Down Expand Up @@ -434,23 +260,4 @@ mod tests {
assert_eq!(deserialized.capacity_score, 6);
assert_eq!(deserialized.open_shard_count, 1);
}

#[test]
fn test_wal_disk_capacity_delta_spans_lookback_window() {
let mut series = ts();

// Fill to exactly the lookback window length (6 readings), all same value.
for _ in 0..WAL_CAPACITY_LOOKBACK_WINDOW_LEN {
record(&mut series, 50);
}
assert_eq!(series.delta(), Some(0.0));

// 7th reading fills the ring buffer. Delta spans 6 intervals.
record(&mut series, 0);
assert_eq!(series.delta(), Some(0.50));

// 8th reading evicts the oldest 50-remaining. Delta still spans 6 intervals.
record(&mut series, 0);
assert_eq!(series.delta(), Some(0.50));
}
}
Loading
Loading