From b75e33272d1b88fb60ab30085b661bdba38c42ab Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 2 Jun 2026 13:20:54 +0700 Subject: [PATCH 1/3] chore: reuse time bin logic across apis remove binning logic in counts api make reusable component reuse in counts api, errors api, agent-observability related apis --- src/query/mod.rs | 29 ++++----------------- src/utils/time.rs | 65 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 24 deletions(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index effa5e44f..d871c57a4 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -73,7 +73,7 @@ use crate::metrics::increment_bytes_scanned_in_query_by_date; use crate::option::Mode; use crate::parseable::{DEFAULT_TENANT, PARSEABLE}; use crate::storage::{ObjectStorageProvider, ObjectStoreFormat}; -use crate::utils::time::TimeRange; +use crate::utils::time::{DATE_BIN_EPOCH_ANCHOR, TimeRange, count_api_bin_interval}; /// Boxed record-batch stream used as the streaming half of query results. type BoxedBatchStream = SendableRecordBatchStream; @@ -638,32 +638,13 @@ impl CountsRequest { let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?; - let dur = time_range.end.signed_duration_since(time_range.start); - let table_name = &self.stream; let start_time_col_name = "_bin_start_time_"; let end_time_col_name = "_bin_end_time_"; - let date_bin = if dur.num_minutes() <= 60 * 5 { - // less than 5 hour = 1 min bin - format!( - "CAST(DATE_BIN('1m', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as {start_time_col_name}, DATE_BIN('1m', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1m' as {end_time_col_name}" - ) - } else if dur.num_minutes() <= 60 * 24 { - // 1 day = 5 min bin - format!( - "CAST(DATE_BIN('5m', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as {start_time_col_name}, DATE_BIN('5m', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '5m' as {end_time_col_name}" - ) - } else if dur.num_minutes() < 60 * 24 * 10 { - // 10 days = 1 hour bin - format!( - "CAST(DATE_BIN('1h', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as {start_time_col_name}, DATE_BIN('1h', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1h' as {end_time_col_name}" - ) - } else { - // 1 day - format!( - "CAST(DATE_BIN('1d', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as {start_time_col_name}, DATE_BIN('1d', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1d' as {end_time_col_name}" - ) - }; + let bin_interval = count_api_bin_interval(&time_range.start, &time_range.end); + let date_bin = format!( + "CAST(DATE_BIN('{bin_interval}', \"{table_name}\".\"{time_column}\", TIMESTAMP '{DATE_BIN_EPOCH_ANCHOR}') AS TEXT) as {start_time_col_name}, DATE_BIN('{bin_interval}', \"{table_name}\".\"{time_column}\", TIMESTAMP '{DATE_BIN_EPOCH_ANCHOR}') + INTERVAL '{bin_interval}' as {end_time_col_name}" + ); let group_by_cols = count_conditions .group_by diff --git a/src/utils/time.rs b/src/utils/time.rs index a57dac631..745896984 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -18,6 +18,8 @@ use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, TimeDelta, TimeZone, Timelike, Utc}; +pub const DATE_BIN_EPOCH_ANCHOR: &str = "1970-01-01 00:00:00+00"; + #[derive(Debug, thiserror::Error)] pub enum TimeParseError { #[error("Parsing humantime")] @@ -55,6 +57,69 @@ pub struct TimeRange { pub end: DateTime, } +pub fn count_api_bin_interval(start: &DateTime, end: &DateTime) -> &'static str { + let dur = end.signed_duration_since(*start); + + if dur.num_minutes() <= 60 * 5 { + "1m" + } else if dur.num_minutes() <= 60 * 24 { + "5m" + } else if dur.num_minutes() < 60 * 24 * 10 { + "1h" + } else { + "1d" + } +} + +pub fn interval_for_num_bins(start: &DateTime, end: &DateTime, num_bins: u64) -> String { + let total_seconds = end.signed_duration_since(*start).num_seconds().max(1) as u64; + let bin_seconds = (total_seconds / num_bins).max(1); + format!("{bin_seconds} seconds") +} + +pub fn expected_time_bins(time_range: &TimeRange, num_bins: u64) -> Vec<(String, String)> { + let total_seconds = time_range + .end + .signed_duration_since(time_range.start) + .num_seconds() + .max(1) as u64; + let bin_seconds = (total_seconds / num_bins).max(1); + + (0..num_bins) + .map(|i| { + let bin_start = time_range.start + chrono::Duration::seconds((i * bin_seconds) as i64); + let bin_end = if i == num_bins - 1 { + time_range.end + } else { + time_range.start + chrono::Duration::seconds(((i + 1) * bin_seconds) as i64) + }; + (bin_start.to_rfc3339(), bin_end.to_rfc3339()) + }) + .collect() +} + +pub fn match_time_bin_key(sql_bin_start: &str, expected_bins: &[(String, String)]) -> String { + let normalized = if !sql_bin_start.contains('+') && !sql_bin_start.ends_with('Z') { + format!("{sql_bin_start}+00:00") + } else { + sql_bin_start.to_string() + }; + + if let Ok(sql_ts) = DateTime::parse_from_rfc3339(&normalized) { + let sql_ts = sql_ts.with_timezone(&Utc); + for (bs, _) in expected_bins { + if let Ok(expected_ts) = DateTime::parse_from_rfc3339(bs) { + let expected_ts = expected_ts.with_timezone(&Utc); + if (sql_ts - expected_ts).num_seconds().abs() <= 1 { + return bs.clone(); + } + } + } + } + + sql_bin_start.to_string() +} + impl TimeRange { pub fn new(start: DateTime, end: DateTime) -> Self { TimeRange { start, end } From cf8d7ef8db880aa0e7ce87f9d56251b2451098d3 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 4 Jun 2026 09:52:55 +0700 Subject: [PATCH 2/3] add validations --- src/query/mod.rs | 12 ++++-- src/utils/time.rs | 101 ++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 103 insertions(+), 10 deletions(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index d871c57a4..8484d9702 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -523,7 +523,7 @@ impl CountsRequest { let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?; let all_manifest_files = get_manifest_list(&self.stream, &time_range, tenant_id).await?; // get bounds - let counts = self.get_bounds(&time_range); + let counts = self.get_bounds(&time_range)?; // we have start and end times for each bin // we also have all the manifest files for the given time range @@ -566,7 +566,7 @@ impl CountsRequest { } /// Calculate the end time for each bin based on the number of bins - fn get_bounds(&self, time_range: &TimeRange) -> Vec { + fn get_bounds(&self, time_range: &TimeRange) -> Result, QueryError> { let total_minutes = time_range .end .signed_duration_since(time_range.start) @@ -591,6 +591,12 @@ impl CountsRequest { } }; + if num_bins == 0 { + return Err(QueryError::CustomError( + "numBins must be greater than 0".to_string(), + )); + } + // divide minutes by num bins to get minutes per bin let quotient = total_minutes / num_bins; let remainder = total_minutes % num_bins; @@ -628,7 +634,7 @@ impl CountsRequest { }); } - bounds + Ok(bounds) } /// This function will get executed only if self.conditions is some diff --git a/src/utils/time.rs b/src/utils/time.rs index 745896984..39e8780fe 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -32,6 +32,14 @@ pub enum TimeParseError { StartTimeAfterEndTime, } +#[derive(Debug, thiserror::Error)] +pub enum TimeBinError { + #[error("num_bins must be greater than 0")] + InvalidNumBins, + #[error("time bin interval is out of range")] + IntervalOutOfRange, +} + type Prefix = String; #[derive(Clone, Copy)] @@ -71,35 +79,71 @@ pub fn count_api_bin_interval(start: &DateTime, end: &DateTime) -> &'s } } -pub fn interval_for_num_bins(start: &DateTime, end: &DateTime, num_bins: u64) -> String { +pub fn interval_for_num_bins( + start: &DateTime, + end: &DateTime, + num_bins: u64, +) -> Result { + if num_bins == 0 { + return Err(TimeBinError::InvalidNumBins); + } + let total_seconds = end.signed_duration_since(*start).num_seconds().max(1) as u64; let bin_seconds = (total_seconds / num_bins).max(1); - format!("{bin_seconds} seconds") + Ok(format!("{bin_seconds} seconds")) } -pub fn expected_time_bins(time_range: &TimeRange, num_bins: u64) -> Vec<(String, String)> { +pub fn expected_time_bins( + time_range: &TimeRange, + num_bins: u64, +) -> Result, TimeBinError> { + if num_bins == 0 { + return Err(TimeBinError::InvalidNumBins); + } + let total_seconds = time_range .end .signed_duration_since(time_range.start) .num_seconds() .max(1) as u64; let bin_seconds = (total_seconds / num_bins).max(1); + let bin_seconds_i64 = + i64::try_from(bin_seconds).map_err(|_| TimeBinError::IntervalOutOfRange)?; + let first_bin_start = + time_range.start.timestamp().div_euclid(bin_seconds_i64) * bin_seconds_i64; + let first_bin_start = + DateTime::from_timestamp(first_bin_start, 0).ok_or(TimeBinError::IntervalOutOfRange)?; (0..num_bins) .map(|i| { - let bin_start = time_range.start + chrono::Duration::seconds((i * bin_seconds) as i64); + let offset = i64::try_from(i) + .ok() + .and_then(|i| i.checked_mul(bin_seconds_i64)) + .ok_or(TimeBinError::IntervalOutOfRange)?; + let bin_start = first_bin_start + .checked_add_signed(chrono::Duration::seconds(offset)) + .ok_or(TimeBinError::IntervalOutOfRange)?; let bin_end = if i == num_bins - 1 { time_range.end } else { - time_range.start + chrono::Duration::seconds(((i + 1) * bin_seconds) as i64) + bin_start + .checked_add_signed(chrono::Duration::seconds(bin_seconds_i64)) + .ok_or(TimeBinError::IntervalOutOfRange)? }; - (bin_start.to_rfc3339(), bin_end.to_rfc3339()) + Ok((bin_start.to_rfc3339(), bin_end.to_rfc3339())) }) .collect() } pub fn match_time_bin_key(sql_bin_start: &str, expected_bins: &[(String, String)]) -> String { - let normalized = if !sql_bin_start.contains('+') && !sql_bin_start.ends_with('Z') { + let has_timezone = sql_bin_start.ends_with('Z') + || DateTime::parse_from_rfc3339(sql_bin_start).is_ok() + || sql_bin_start + .rsplit_once(['+', '-']) + .is_some_and(|(_, suffix)| { + suffix.len() == 5 && suffix.as_bytes().get(2) == Some(&b':') + }); + let normalized = if !has_timezone { format!("{sql_bin_start}+00:00") } else { sql_bin_start.to_string() @@ -507,6 +551,49 @@ mod tests { assert!(matches!(result, Err(TimeParseError::HumanTime(_)))); } + #[test] + fn interval_for_num_bins_rejects_zero_bins() { + let start = Utc.with_ymd_and_hms(2023, 1, 1, 12, 0, 0).unwrap(); + let end = Utc.with_ymd_and_hms(2023, 1, 1, 12, 10, 0).unwrap(); + + let result = interval_for_num_bins(&start, &end, 0); + + assert!(matches!(result, Err(TimeBinError::InvalidNumBins))); + } + + #[test] + fn expected_time_bins_rejects_zero_bins() { + let time_range = time_period_from_str("2023-01-01T12:00:00Z", "2023-01-01T12:10:00Z"); + + let result = expected_time_bins(&time_range, 0); + + assert!(matches!(result, Err(TimeBinError::InvalidNumBins))); + } + + #[test] + fn expected_time_bins_aligns_to_epoch_anchor() { + let time_range = time_period_from_str("2023-01-01T12:02:00Z", "2023-01-01T12:12:00Z"); + + let bins = expected_time_bins(&time_range, 2).unwrap(); + + assert_eq!(bins[0].0, "2023-01-01T12:00:00+00:00"); + assert_eq!(bins[0].1, "2023-01-01T12:05:00+00:00"); + assert_eq!(bins[1].0, "2023-01-01T12:05:00+00:00"); + assert_eq!(bins[1].1, "2023-01-01T12:12:00+00:00"); + } + + #[test] + fn match_time_bin_key_handles_negative_timezone_offsets() { + let expected_bins = vec![( + "2023-01-01T05:00:00+00:00".to_string(), + "2023-01-01T06:00:00+00:00".to_string(), + )]; + + let matched = match_time_bin_key("2023-01-01T00:00:00-05:00", &expected_bins); + + assert_eq!(matched, "2023-01-01T05:00:00+00:00"); + } + fn time_period_from_str(start: &str, end: &str) -> TimeRange { TimeRange { start: DateTime::parse_from_rfc3339(start).unwrap().into(), From d0631f834461aad1e176e62ee0c39e91f5e3e6f4 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 4 Jun 2026 13:13:55 +0700 Subject: [PATCH 3/3] fix bin logic for the last bin --- src/utils/time.rs | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/src/utils/time.rs b/src/utils/time.rs index 39e8780fe..35e8c9b53 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -114,7 +114,12 @@ pub fn expected_time_bins( let first_bin_start = DateTime::from_timestamp(first_bin_start, 0).ok_or(TimeBinError::IntervalOutOfRange)?; - (0..num_bins) + let seconds_to_cover = time_range.end.timestamp() - first_bin_start.timestamp(); + let bins_to_cover_range = (seconds_to_cover + bin_seconds_i64 - 1) / bin_seconds_i64; + let bins_to_cover_range = + u64::try_from(bins_to_cover_range).map_err(|_| TimeBinError::IntervalOutOfRange)?; + + (0..bins_to_cover_range) .map(|i| { let offset = i64::try_from(i) .ok() @@ -123,13 +128,10 @@ pub fn expected_time_bins( let bin_start = first_bin_start .checked_add_signed(chrono::Duration::seconds(offset)) .ok_or(TimeBinError::IntervalOutOfRange)?; - let bin_end = if i == num_bins - 1 { - time_range.end - } else { - bin_start - .checked_add_signed(chrono::Duration::seconds(bin_seconds_i64)) - .ok_or(TimeBinError::IntervalOutOfRange)? - }; + let next_bin_start = bin_start + .checked_add_signed(chrono::Duration::seconds(bin_seconds_i64)) + .ok_or(TimeBinError::IntervalOutOfRange)?; + let bin_end = next_bin_start.min(time_range.end); Ok((bin_start.to_rfc3339(), bin_end.to_rfc3339())) }) .collect() @@ -576,10 +578,24 @@ mod tests { let bins = expected_time_bins(&time_range, 2).unwrap(); + assert_eq!(bins.len(), 3); assert_eq!(bins[0].0, "2023-01-01T12:00:00+00:00"); assert_eq!(bins[0].1, "2023-01-01T12:05:00+00:00"); assert_eq!(bins[1].0, "2023-01-01T12:05:00+00:00"); - assert_eq!(bins[1].1, "2023-01-01T12:12:00+00:00"); + assert_eq!(bins[1].1, "2023-01-01T12:10:00+00:00"); + assert_eq!(bins[2].0, "2023-01-01T12:10:00+00:00"); + assert_eq!(bins[2].1, "2023-01-01T12:12:00+00:00"); + } + + #[test] + fn expected_time_bins_covers_full_range_when_start_is_mid_bin() { + let time_range = time_period_from_str("2026-06-01T05:56:00Z", "2026-06-04T05:56:00Z"); + + let bins = expected_time_bins(&time_range, 60).unwrap(); + + assert_eq!(bins.len(), 61); + assert_eq!(bins.last().unwrap().0, "2026-06-04T04:48:00+00:00"); + assert_eq!(bins.last().unwrap().1, "2026-06-04T05:56:00+00:00"); } #[test]