From 1f90e6417b5e1cd85d3049ccafe56f5377745b22 Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Sat, 30 May 2026 10:30:32 +0200 Subject: [PATCH 1/2] add last_poll_seconds_ago scanner metric --- crates/fluss/Cargo.toml | 4 + crates/fluss/src/client/table/scanner.rs | 402 ++++++++++++++++++++++- crates/fluss/src/metrics.rs | 56 +++- 3 files changed, 451 insertions(+), 11 deletions(-) diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index feac8309..821ee52e 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -82,6 +82,10 @@ jiff = { workspace = true, features = ["js"] } [dev-dependencies] metrics-util = "0.20" fluss-test-cluster = { path = "../fluss-test-cluster" } +# `test-util` (paused clock + `tokio::time::advance`) is not part of the +# workspace `full` feature set; enable it for tests so the +# `last_poll_seconds_ago` ticker loop can be driven deterministically. +tokio = { workspace = true, features = ["test-util"] } [build-dependencies] prost-build = "0.14" diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 35cc52e3..9e4cc58d 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -48,10 +48,15 @@ use prost::Message; use std::{ collections::{HashMap, HashSet}, slice::from_ref, - sync::Arc, - time::{Duration, Instant}, + sync::{ + Arc, + atomic::{AtomicI64, Ordering}, + }, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use tempfile::TempDir; +use tokio::task::JoinHandle; +use tokio::time::MissedTickBehavior; pub struct TableScan<'a> { conn: &'a FlussConnection, @@ -388,6 +393,23 @@ struct LogScannerInner { /// Per-table scanner metric handles, pre-bound with `database`/`table` /// labels. metrics: Arc, + /// Wall-clock millis (since `UNIX_EPOCH`) of the most recent + /// `record_poll_start`. Sentinel `0` means "no poll yet" — the + /// `last_poll_seconds_ago` ticker skips emission while this is `0`, + /// deviating from Java's unguarded `(now - 0)/1000` startup value. + /// + /// Written by `record_poll_start` with `Release` ordering, read by + /// the ticker task with `Acquire` ordering. Cloned (`Arc`) into the + /// ticker so the task does not hold a back-reference to + /// `LogScannerInner` (avoids a reference cycle that would block + /// `Drop`, and hence the ticker abort, until tokio runtime + /// shutdown). + last_poll_unix_ms: Arc, + /// Handle to the 1-second background tokio task that pushes + /// `last_poll_seconds_ago` into the gauge. Aborted from + /// `impl Drop for LogScannerInner` so the gauge stops emitting once + /// the scanner is closed. + last_poll_seconds_ago_task: JoinHandle<()>, } /// Snapshot state used to derive the scanner poll-timing metrics. @@ -437,6 +459,64 @@ impl Drop for PollGuard<'_> { } } +/// Single-tick emission for the `last_poll_seconds_ago` gauge. Reads the +/// last-poll timestamp from the shared atomic and pushes the elapsed +/// integer-seconds into the gauge. +/// +/// Emission is skipped while the atomic still holds the sentinel `0` (no +/// `record_poll_start` yet) — Java's `(System.currentTimeMillis() - 0) / +/// 1000` startup nonsense (see `ScannerMetricGroup.java:121`) would trip +/// every consumer-liveness alert on startup. Java parity note: Java's +/// expression is integer-truncating (`long / long`); we preserve that with +/// `i64` division before the `f64` cast so dashboards built against Java +/// behave the same. +/// +/// Extracted from the ticker loop so unit tests can exercise the emission +/// logic without depending on real-time scheduling. +fn emit_last_poll_seconds_ago_once(last_poll_unix_ms: &AtomicI64, metrics: &ScannerMetrics) { + let stored = last_poll_unix_ms.load(Ordering::Acquire); + if stored == 0 { + return; + } + let Ok(now_ms) = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + else { + return; + }; + let seconds = ((now_ms - stored).max(0) / 1000) as f64; + metrics.record_last_poll_seconds_ago(seconds); +} + +/// Spawn the 1-second background tokio task that pushes +/// `last_poll_seconds_ago` into the gauge. The task holds only the shared +/// atomic timestamp and the metric handle — never an `Arc` +/// — so it does not create a reference cycle that would block the scanner's +/// `Drop` (and hence the abort that stops this task). +/// +/// `MissedTickBehavior::Delay` is used so a stalled runtime (e.g. test +/// pausing/advancing time) does not produce a burst of catch-up ticks when +/// it resumes. +fn spawn_last_poll_seconds_ago_ticker( + last_poll_unix_ms: Arc, + metrics: Arc, +) -> JoinHandle<()> { + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + loop { + interval.tick().await; + emit_last_poll_seconds_ago_once(&last_poll_unix_ms, &metrics); + } + }) +} + +impl Drop for LogScannerInner { + fn drop(&mut self) { + self.last_poll_seconds_ago_task.abort(); + } +} + impl LogScannerInner { fn new( table_info: &TableInfo, @@ -461,6 +541,11 @@ impl LogScannerInner { }; let metrics = Arc::new(ScannerMetrics::new(&table_info.table_path)); + let last_poll_unix_ms = Arc::new(AtomicI64::new(0)); + let last_poll_seconds_ago_task = spawn_last_poll_seconds_ago_ticker( + Arc::clone(&last_poll_unix_ms), + Arc::clone(&metrics), + ); Ok(Self { table_path: table_info.table_path.clone(), table_id: table_info.table_id, @@ -480,6 +565,8 @@ impl LogScannerInner { reader_active: std::sync::atomic::AtomicBool::new(false), poll_state: Mutex::new(PollState::default()), metrics, + last_poll_unix_ms, + last_poll_seconds_ago_task, }) } @@ -584,6 +671,21 @@ impl LogScannerInner { ); } self.metrics.record_time_between_poll_ms(between_ms); + + // Publish the wall-clock timestamp the ticker uses to compute + // `last_poll_seconds_ago`. Use `SystemTime` rather than `Instant` + // because the ticker needs an absolute clock to diff against + // `SystemTime::now()` at arbitrary moments. `Release` pairs with the + // ticker's `Acquire` load. If the system clock is somehow before + // `UNIX_EPOCH` (vanishingly rare; pre-1970 wall clock), we keep the + // existing value so we never publish a negative timestamp that would + // produce a bogus gauge reading on the next tick. + if let Ok(unix_ms) = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + { + self.last_poll_unix_ms.store(unix_ms, Ordering::Release); + } } /// Computes `poll_idle_ratio = poll_time / (poll_time + between_time)`. @@ -2755,4 +2857,300 @@ mod tests { // `ScannerMetrics`), this assertion catches it. assert_scanner_entries_labeled(&entries, "db", "tbl"); } + + /// `emit_last_poll_seconds_ago_once` must skip emission while the + /// shared atomic still holds the sentinel `0` — that's the + /// pre-first-poll guard that prevents Java's + /// `(System.currentTimeMillis() - 0) / 1000` startup nonsense from + /// tripping consumer-liveness alerts before any poll happens. + /// + /// `ScannerMetrics::new` already registers the gauge with the + /// recorder, so it appears in the snapshot with the default `0.0` + /// even without any emission. The discriminating assertion is that + /// the value stays near zero rather than blowing up to ~1.7 billion + /// (current Unix-epoch seconds), which is what a broken skip would + /// produce. + #[test] + fn emit_last_poll_seconds_ago_skips_sentinel_value() { + use crate::metrics::SCANNER_LAST_POLL_SECONDS_AGO; + use metrics_util::debugging::DebuggingRecorder; + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let metrics = ScannerMetrics::new(&table_path); + let last_poll = AtomicI64::new(0); + + for _ in 0..3 { + emit_last_poll_seconds_ago_once(&last_poll, &metrics); + } + }); + + let value = snapshot_gauge(&snapshotter, SCANNER_LAST_POLL_SECONDS_AGO) + .expect("ScannerMetrics::new registers the gauge so it appears in the snapshot"); + assert!( + value < 1.0, + "pre-first-poll emission must be skipped; broken skip would push ~unix-epoch \ + seconds (~1.7e9) into the gauge, got {value}" + ); + } + + /// Once a real timestamp has been published, the helper must emit + /// `floor((now - stored) / 1000)` matching Java's integer-truncating + /// `(System.currentTimeMillis() - lastPollMs) / 1000`. Tolerance + /// allows for real wall-clock progression between the test setting + /// up `stored` and the helper reading `SystemTime::now()`. + /// + /// Also covers the reset-after-fresh-poll case: updating the + /// stored timestamp to "now" must drop the next emission back near + /// zero, matching the property "gauge resets when a new poll + /// happens". + #[test] + fn emit_last_poll_seconds_ago_publishes_integer_truncated_elapsed() { + use crate::metrics::SCANNER_LAST_POLL_SECONDS_AGO; + use metrics_util::debugging::DebuggingRecorder; + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let metrics = ScannerMetrics::new(&table_path); + + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .expect("wall clock after UNIX_EPOCH"); + let last_poll = AtomicI64::new(now_ms - 5_500); + + emit_last_poll_seconds_ago_once(&last_poll, &metrics); + + let value = snapshot_gauge(&snapshotter, SCANNER_LAST_POLL_SECONDS_AGO) + .expect("gauge must emit once a real timestamp is published"); + assert!( + (5.0..=6.0).contains(&value), + "gauge must be ~5 (5500ms truncated to 5s, plus test scheduling slack), got {value}" + ); + + // Simulate a fresh poll: update the shared atomic to "right + // now". The next emission must collapse the gauge back near + // zero. + let fresh_now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .expect("wall clock after UNIX_EPOCH"); + last_poll.store(fresh_now_ms, Ordering::Release); + + emit_last_poll_seconds_ago_once(&last_poll, &metrics); + + let reset = snapshot_gauge(&snapshotter, SCANNER_LAST_POLL_SECONDS_AGO) + .expect("gauge must still be present after the second emission"); + assert!( + (0.0..=1.0).contains(&reset), + "fresh poll must reset gauge near zero, got {reset}" + ); + }); + + assert_scanner_entries_labeled(&snapshotter.snapshot().into_vec(), "db", "tbl"); + } + + /// Negative `now - stored` (e.g. wall-clock jumps backwards via NTP) + /// must clamp to 0, not produce a negative gauge reading. + #[test] + fn emit_last_poll_seconds_ago_clamps_negative_delta_to_zero() { + use crate::metrics::SCANNER_LAST_POLL_SECONDS_AGO; + use metrics_util::debugging::DebuggingRecorder; + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let metrics = ScannerMetrics::new(&table_path); + + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .expect("wall clock after UNIX_EPOCH"); + // Stored timestamp in the future → negative delta. + let last_poll = AtomicI64::new(now_ms + 60_000); + + emit_last_poll_seconds_ago_once(&last_poll, &metrics); + }); + + let value = snapshot_gauge(&snapshotter, SCANNER_LAST_POLL_SECONDS_AGO) + .expect("gauge must emit even when delta is clamped to 0"); + assert_eq!(value, 0.0, "negative delta must clamp to 0, got {value}"); + } + + /// `spawn_last_poll_seconds_ago_ticker` returns a `JoinHandle` whose + /// `abort()` cleanly terminates the loop. Pins the lifecycle pattern + /// that `impl Drop for LogScannerInner` relies on. + #[tokio::test(flavor = "current_thread")] + async fn spawn_last_poll_seconds_ago_ticker_aborts_cleanly() { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let metrics = Arc::new(ScannerMetrics::new(&table_path)); + let last_poll_unix_ms = Arc::new(AtomicI64::new(0)); + + let handle = spawn_last_poll_seconds_ago_ticker( + Arc::clone(&last_poll_unix_ms), + Arc::clone(&metrics), + ); + assert!( + !handle.is_finished(), + "freshly spawned ticker must be alive" + ); + + handle.abort(); + let join_result = handle.await; + assert!( + join_result.is_err() && join_result.unwrap_err().is_cancelled(), + "abort must cancel the ticker, not let it complete normally" + ); + } + + /// End-to-end test of the *spawned* ticker (not just the extracted + /// `emit_last_poll_seconds_ago_once` helper): the interval loop must + /// emit on its first tick and keep emitting on subsequent ticks, + /// reflecting the latest published timestamp each time. + /// + /// Uses a paused-clock `current_thread` runtime so the second tick can + /// be driven deterministically with `tokio::time::advance` instead of + /// sleeping a real second. Note the elapsed value is derived from + /// wall-clock `SystemTime`, which `advance` does *not* move — so the + /// gauge value is controlled by what we store in the atomic (a known + /// past / present wall-clock timestamp), and `advance` is used only to + /// fire the parked 1-second interval timer. + /// + /// Built inside `with_local_recorder` (rather than `#[tokio::test]`) + /// because the metrics facade installs a thread-local recorder; the + /// spawned task is polled on the same thread during `block_on`, so its + /// `gauge!` calls resolve to this local recorder. + #[test] + fn spawned_ticker_emits_on_first_and_subsequent_ticks() { + use crate::metrics::SCANNER_LAST_POLL_SECONDS_AGO; + use metrics_util::debugging::DebuggingRecorder; + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .start_paused(true) + .build() + .expect("build paused current_thread runtime"); + + rt.block_on(async { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let metrics = Arc::new(ScannerMetrics::new(&table_path)); + let last_poll_unix_ms = Arc::new(AtomicI64::new(0)); + + // Simulate a poll that started ~5s ago (wall clock) before + // the ticker runs, so the first (immediate) tick emits ~5. + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .expect("wall clock after UNIX_EPOCH"); + last_poll_unix_ms.store(now_ms - 5_000, Ordering::Release); + + let handle = spawn_last_poll_seconds_ago_ticker( + Arc::clone(&last_poll_unix_ms), + Arc::clone(&metrics), + ); + + // `tokio::time::interval` fires its first tick immediately, + // so a few yields let the spawned task run that first + // emission without advancing the clock. + for _ in 0..8 { + tokio::task::yield_now().await; + } + + let first = snapshot_gauge(&snapshotter, SCANNER_LAST_POLL_SECONDS_AGO) + .expect("spawned ticker must emit on its first (immediate) tick"); + assert!( + (5.0..=6.0).contains(&first), + "first tick must reflect ~5s elapsed, got {first}" + ); + + // Simulate a fresh poll "now", then advance the paused clock + // by 1s to fire the parked second interval tick. The loop + // must emit again, this time near zero. + let fresh_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .expect("wall clock after UNIX_EPOCH"); + last_poll_unix_ms.store(fresh_ms, Ordering::Release); + + tokio::time::advance(Duration::from_secs(1)).await; + for _ in 0..8 { + tokio::task::yield_now().await; + } + + let second = snapshot_gauge(&snapshotter, SCANNER_LAST_POLL_SECONDS_AGO) + .expect("spawned ticker must keep emitting on subsequent ticks"); + assert!( + (0.0..=1.0).contains(&second), + "second tick after a fresh poll must reset gauge near zero, got {second}" + ); + + handle.abort(); + }); + }); + + assert_scanner_entries_labeled(&snapshotter.snapshot().into_vec(), "db", "tbl"); + } + + /// `LogScannerInner::drop` must abort the ticker task so the gauge + /// stops emitting once the scanner is closed — mirrors Java's + /// `ScannerMetricGroup.close()`. The atomic is shared with the task, + /// so we use its `Arc::strong_count` as an indirect liveness probe: + /// once the runtime processes the abort and drops the task's future, + /// the task's clone of the `Arc` is released, leaving only the one + /// we hold here. + #[test] + fn log_scanner_inner_drop_aborts_ticker_task() { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("build current_thread runtime"); + rt.block_on(async { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = build_table_info(table_path.clone(), 1, 1); + let cluster = build_cluster_arc(&table_path, 1, 1); + let metadata = Arc::new(Metadata::new_for_test(cluster)); + let inner = LogScannerInner::new( + &table_info, + metadata, + Arc::new(RpcClient::new()), + &Config::default(), + None, + ) + .expect("build LogScannerInner"); + + let abort_handle = inner.last_poll_seconds_ago_task.abort_handle(); + assert!( + !abort_handle.is_finished(), + "ticker must be alive before scanner drop" + ); + + drop(inner); + + // Yield repeatedly so the runtime can process the abort. + // Cap at a generous iteration count to avoid hanging the test + // if Drop ever stops calling `abort()`. + for _ in 0..32 { + tokio::task::yield_now().await; + if abort_handle.is_finished() { + break; + } + } + assert!( + abort_handle.is_finished(), + "Drop for LogScannerInner must abort the last_poll_seconds_ago ticker" + ); + }); + } } diff --git a/crates/fluss/src/metrics.rs b/crates/fluss/src/metrics.rs index 7c62738c..f20fddd8 100644 --- a/crates/fluss/src/metrics.rs +++ b/crates/fluss/src/metrics.rs @@ -60,16 +60,15 @@ pub const CLIENT_REQUESTS_IN_FLIGHT: &str = "fluss.client.requests_in_flight"; // Java reference: ScannerMetricGroup.java, LogScannerImpl.java // // These track consumer liveness and processing efficiency at the `poll()` -// boundary. Java records via `volatile long` fields read by gauge suppliers; -// Rust snapshots the values at poll start/end. +// boundary. Java records via `volatile long` fields read by gauge suppliers +// at scrape time; Rust pushes values via the `metrics` facade. // -// Java's `lastPollSecondsAgo` gauge is intentionally NOT ported. Java -// implements it as a gauge supplier evaluated at scrape time, which the -// `metrics` crate facade has no equivalent for. A snapshot-at-poll-start -// port would just duplicate `time_between_poll_ms / 1000` and would not -// advance while a consumer is hung — defeating the metric's purpose -// (detecting a stuck consumer). Revisit if the `metrics` crate gains a -// supplier abstraction or we add a background liveness task. +// `time_between_poll_ms` and `poll_idle_ratio` are snapshot at poll +// start / poll end. `last_poll_seconds_ago` must keep advancing between +// polls (it measures elapsed time, not activity), so it is emitted by a +// per-scanner 1-second background tokio task spawned in +// `LogScannerInner::new`. The task is aborted when the last scanner +// `Arc` is dropped, matching Java's `ScannerMetricGroup.close()`. // --------------------------------------------------------------------------- /// Gauge: milliseconds between the start of consecutive `poll()` calls. A @@ -82,6 +81,18 @@ pub const SCANNER_TIME_BETWEEN_POLL_MS: &str = "fluss.client.scanner.time_betwee /// the bottleneck. pub const SCANNER_POLL_IDLE_RATIO: &str = "fluss.client.scanner.poll_idle_ratio"; +/// Gauge: integer seconds since the most recent `poll()` started. Advances +/// monotonically between polls — the primary stuck-consumer signal. +/// +/// Pushed every second by a per-scanner background tokio task. Emission is +/// skipped until the first `poll()` happens; Java's equivalent +/// `lastPollSecondsAgo` returns roughly the current Unix-epoch seconds +/// before the first poll (an unguarded `(now - 0)/1000`), which would trip +/// every consumer-liveness alert on startup. Classified as `can-differ` per +/// `java-rust-parity-criteria.mdc` since this is purely client-side display. +/// +pub const SCANNER_LAST_POLL_SECONDS_AGO: &str = "fluss.client.scanner.last_poll_seconds_ago"; + // --------------------------------------------------------------------------- // Scanner fetch + remote download metrics // @@ -143,6 +154,7 @@ pub const SCANNER_REMOTE_FETCH_ERRORS_TOTAL: &str = pub(crate) struct ScannerMetrics { time_between_poll_ms: metrics::Gauge, poll_idle_ratio: metrics::Gauge, + last_poll_seconds_ago: metrics::Gauge, fetch_requests_total: metrics::Counter, fetch_latency_ms: metrics::Histogram, bytes_per_request: metrics::Histogram, @@ -160,6 +172,7 @@ impl ScannerMetrics { Self { time_between_poll_ms: scanner_gauge(SCANNER_TIME_BETWEEN_POLL_MS, database, table), poll_idle_ratio: scanner_gauge(SCANNER_POLL_IDLE_RATIO, database, table), + last_poll_seconds_ago: scanner_gauge(SCANNER_LAST_POLL_SECONDS_AGO, database, table), fetch_requests_total: scanner_counter(SCANNER_FETCH_REQUESTS_TOTAL, database, table), fetch_latency_ms: scanner_histogram(SCANNER_FETCH_LATENCY_MS, database, table), bytes_per_request: scanner_histogram(SCANNER_BYTES_PER_REQUEST, database, table), @@ -189,6 +202,10 @@ impl ScannerMetrics { self.poll_idle_ratio.set(value); } + pub(crate) fn record_last_poll_seconds_ago(&self, value: f64) { + self.last_poll_seconds_ago.set(value); + } + pub(crate) fn record_fetch_request(&self) { self.fetch_requests_total.increment(1); } @@ -486,6 +503,27 @@ mod tests { assert_scanner_entries_labeled(&entries, "db", "tbl"); } + #[test] + fn scanner_last_poll_seconds_ago_emits_correctly() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + let table_path = TablePath::new("db", "tbl"); + let m = ScannerMetrics::new(&table_path); + m.record_last_poll_seconds_ago(42.0); + }); + + let snapshot = snapshotter.snapshot(); + let entries: Vec<_> = snapshot.into_vec(); + + assert_eq!( + find_gauge!(entries, SCANNER_LAST_POLL_SECONDS_AGO), + Some(42.0) + ); + assert_scanner_entries_labeled(&entries, "db", "tbl"); + } + #[test] fn scanner_fetch_metrics_emit_correctly() { let recorder = DebuggingRecorder::new(); From a0a1a6116fb079d06cf3cc79f069e69ebfe20257 Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Sat, 30 May 2026 10:35:54 +0200 Subject: [PATCH 2/2] commit --- crates/fluss/src/metrics.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/fluss/src/metrics.rs b/crates/fluss/src/metrics.rs index f20fddd8..9f820ffe 100644 --- a/crates/fluss/src/metrics.rs +++ b/crates/fluss/src/metrics.rs @@ -88,8 +88,7 @@ pub const SCANNER_POLL_IDLE_RATIO: &str = "fluss.client.scanner.poll_idle_ratio" /// skipped until the first `poll()` happens; Java's equivalent /// `lastPollSecondsAgo` returns roughly the current Unix-epoch seconds /// before the first poll (an unguarded `(now - 0)/1000`), which would trip -/// every consumer-liveness alert on startup. Classified as `can-differ` per -/// `java-rust-parity-criteria.mdc` since this is purely client-side display. +/// every consumer-liveness alert on startup. /// pub const SCANNER_LAST_POLL_SECONDS_AGO: &str = "fluss.client.scanner.last_poll_seconds_ago";