Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 14 additions & 27 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ use crate::metrics::increment_bytes_scanned_in_query_by_date;
use crate::option::Mode;
use crate::parseable::{DEFAULT_TENANT, PARSEABLE};
use crate::storage::{ObjectStorageProvider, ObjectStoreFormat};
use crate::utils::time::TimeRange;
use crate::utils::time::{DATE_BIN_EPOCH_ANCHOR, TimeRange, count_api_bin_interval};

/// Boxed record-batch stream used as the streaming half of query results.
type BoxedBatchStream = SendableRecordBatchStream;
Expand Down Expand Up @@ -523,7 +523,7 @@ impl CountsRequest {
let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?;
let all_manifest_files = get_manifest_list(&self.stream, &time_range, tenant_id).await?;
// get bounds
let counts = self.get_bounds(&time_range);
let counts = self.get_bounds(&time_range)?;

// we have start and end times for each bin
// we also have all the manifest files for the given time range
Expand Down Expand Up @@ -566,7 +566,7 @@ impl CountsRequest {
}

/// Calculate the end time for each bin based on the number of bins
fn get_bounds(&self, time_range: &TimeRange) -> Vec<TimeBounds> {
fn get_bounds(&self, time_range: &TimeRange) -> Result<Vec<TimeBounds>, QueryError> {
let total_minutes = time_range
.end
.signed_duration_since(time_range.start)
Expand All @@ -591,6 +591,12 @@ impl CountsRequest {
}
};

if num_bins == 0 {
return Err(QueryError::CustomError(
"numBins must be greater than 0".to_string(),
));
}

// divide minutes by num bins to get minutes per bin
let quotient = total_minutes / num_bins;
let remainder = total_minutes % num_bins;
Expand Down Expand Up @@ -628,7 +634,7 @@ impl CountsRequest {
});
}

bounds
Ok(bounds)
}

/// This function will get executed only if self.conditions is some
Expand All @@ -638,32 +644,13 @@ impl CountsRequest {

let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?;

let dur = time_range.end.signed_duration_since(time_range.start);

let table_name = &self.stream;
let start_time_col_name = "_bin_start_time_";
let end_time_col_name = "_bin_end_time_";
let date_bin = if dur.num_minutes() <= 60 * 5 {
// less than 5 hour = 1 min bin
format!(
"CAST(DATE_BIN('1m', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as {start_time_col_name}, DATE_BIN('1m', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1m' as {end_time_col_name}"
)
} else if dur.num_minutes() <= 60 * 24 {
// 1 day = 5 min bin
format!(
"CAST(DATE_BIN('5m', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as {start_time_col_name}, DATE_BIN('5m', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '5m' as {end_time_col_name}"
)
} else if dur.num_minutes() < 60 * 24 * 10 {
// 10 days = 1 hour bin
format!(
"CAST(DATE_BIN('1h', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as {start_time_col_name}, DATE_BIN('1h', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1h' as {end_time_col_name}"
)
} else {
// 1 day
format!(
"CAST(DATE_BIN('1d', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as {start_time_col_name}, DATE_BIN('1d', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1d' as {end_time_col_name}"
)
};
let bin_interval = count_api_bin_interval(&time_range.start, &time_range.end);
let date_bin = format!(
"CAST(DATE_BIN('{bin_interval}', \"{table_name}\".\"{time_column}\", TIMESTAMP '{DATE_BIN_EPOCH_ANCHOR}') AS TEXT) as {start_time_col_name}, DATE_BIN('{bin_interval}', \"{table_name}\".\"{time_column}\", TIMESTAMP '{DATE_BIN_EPOCH_ANCHOR}') + INTERVAL '{bin_interval}' as {end_time_col_name}"
);

let group_by_cols = count_conditions
.group_by
Expand Down
168 changes: 168 additions & 0 deletions src/utils/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, TimeDelta, TimeZone, Timelike, Utc};

pub const DATE_BIN_EPOCH_ANCHOR: &str = "1970-01-01 00:00:00+00";

#[derive(Debug, thiserror::Error)]
pub enum TimeParseError {
#[error("Parsing humantime")]
Expand All @@ -30,6 +32,14 @@ pub enum TimeParseError {
StartTimeAfterEndTime,
}

#[derive(Debug, thiserror::Error)]
pub enum TimeBinError {
#[error("num_bins must be greater than 0")]
InvalidNumBins,
#[error("time bin interval is out of range")]
IntervalOutOfRange,
}

type Prefix = String;

#[derive(Clone, Copy)]
Expand All @@ -55,6 +65,107 @@ pub struct TimeRange {
pub end: DateTime<Utc>,
}

pub fn count_api_bin_interval(start: &DateTime<Utc>, end: &DateTime<Utc>) -> &'static str {
let dur = end.signed_duration_since(*start);

if dur.num_minutes() <= 60 * 5 {
"1m"
} else if dur.num_minutes() <= 60 * 24 {
"5m"
} else if dur.num_minutes() < 60 * 24 * 10 {
"1h"
} else {
"1d"
}
}

pub fn interval_for_num_bins(
start: &DateTime<Utc>,
end: &DateTime<Utc>,
num_bins: u64,
) -> Result<String, TimeBinError> {
if num_bins == 0 {
return Err(TimeBinError::InvalidNumBins);
}

let total_seconds = end.signed_duration_since(*start).num_seconds().max(1) as u64;
let bin_seconds = (total_seconds / num_bins).max(1);
Ok(format!("{bin_seconds} seconds"))
}

pub fn expected_time_bins(
time_range: &TimeRange,
num_bins: u64,
) -> Result<Vec<(String, String)>, TimeBinError> {
if num_bins == 0 {
return Err(TimeBinError::InvalidNumBins);
}

let total_seconds = time_range
.end
.signed_duration_since(time_range.start)
.num_seconds()
.max(1) as u64;
let bin_seconds = (total_seconds / num_bins).max(1);
let bin_seconds_i64 =
i64::try_from(bin_seconds).map_err(|_| TimeBinError::IntervalOutOfRange)?;
let first_bin_start =
time_range.start.timestamp().div_euclid(bin_seconds_i64) * bin_seconds_i64;
let first_bin_start =
DateTime::from_timestamp(first_bin_start, 0).ok_or(TimeBinError::IntervalOutOfRange)?;

let seconds_to_cover = time_range.end.timestamp() - first_bin_start.timestamp();
let bins_to_cover_range = (seconds_to_cover + bin_seconds_i64 - 1) / bin_seconds_i64;
let bins_to_cover_range =
u64::try_from(bins_to_cover_range).map_err(|_| TimeBinError::IntervalOutOfRange)?;

(0..bins_to_cover_range)
.map(|i| {
let offset = i64::try_from(i)
.ok()
.and_then(|i| i.checked_mul(bin_seconds_i64))
.ok_or(TimeBinError::IntervalOutOfRange)?;
let bin_start = first_bin_start
.checked_add_signed(chrono::Duration::seconds(offset))
.ok_or(TimeBinError::IntervalOutOfRange)?;
let next_bin_start = bin_start
.checked_add_signed(chrono::Duration::seconds(bin_seconds_i64))
.ok_or(TimeBinError::IntervalOutOfRange)?;
let bin_end = next_bin_start.min(time_range.end);
Ok((bin_start.to_rfc3339(), bin_end.to_rfc3339()))
})
.collect()
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

pub fn match_time_bin_key(sql_bin_start: &str, expected_bins: &[(String, String)]) -> String {
let has_timezone = sql_bin_start.ends_with('Z')
|| DateTime::parse_from_rfc3339(sql_bin_start).is_ok()
|| sql_bin_start
.rsplit_once(['+', '-'])
.is_some_and(|(_, suffix)| {
suffix.len() == 5 && suffix.as_bytes().get(2) == Some(&b':')
});
let normalized = if !has_timezone {
format!("{sql_bin_start}+00:00")
} else {
sql_bin_start.to_string()
};
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if let Ok(sql_ts) = DateTime::parse_from_rfc3339(&normalized) {
let sql_ts = sql_ts.with_timezone(&Utc);
for (bs, _) in expected_bins {
if let Ok(expected_ts) = DateTime::parse_from_rfc3339(bs) {
let expected_ts = expected_ts.with_timezone(&Utc);
if (sql_ts - expected_ts).num_seconds().abs() <= 1 {
return bs.clone();
}
}
}
}

sql_bin_start.to_string()
}

impl TimeRange {
pub fn new(start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
TimeRange { start, end }
Expand Down Expand Up @@ -442,6 +553,63 @@ mod tests {
assert!(matches!(result, Err(TimeParseError::HumanTime(_))));
}

#[test]
fn interval_for_num_bins_rejects_zero_bins() {
let start = Utc.with_ymd_and_hms(2023, 1, 1, 12, 0, 0).unwrap();
let end = Utc.with_ymd_and_hms(2023, 1, 1, 12, 10, 0).unwrap();

let result = interval_for_num_bins(&start, &end, 0);

assert!(matches!(result, Err(TimeBinError::InvalidNumBins)));
}

#[test]
fn expected_time_bins_rejects_zero_bins() {
let time_range = time_period_from_str("2023-01-01T12:00:00Z", "2023-01-01T12:10:00Z");

let result = expected_time_bins(&time_range, 0);

assert!(matches!(result, Err(TimeBinError::InvalidNumBins)));
}

#[test]
fn expected_time_bins_aligns_to_epoch_anchor() {
let time_range = time_period_from_str("2023-01-01T12:02:00Z", "2023-01-01T12:12:00Z");

let bins = expected_time_bins(&time_range, 2).unwrap();

assert_eq!(bins.len(), 3);
assert_eq!(bins[0].0, "2023-01-01T12:00:00+00:00");
assert_eq!(bins[0].1, "2023-01-01T12:05:00+00:00");
assert_eq!(bins[1].0, "2023-01-01T12:05:00+00:00");
assert_eq!(bins[1].1, "2023-01-01T12:10:00+00:00");
assert_eq!(bins[2].0, "2023-01-01T12:10:00+00:00");
assert_eq!(bins[2].1, "2023-01-01T12:12:00+00:00");
}

#[test]
fn expected_time_bins_covers_full_range_when_start_is_mid_bin() {
let time_range = time_period_from_str("2026-06-01T05:56:00Z", "2026-06-04T05:56:00Z");

let bins = expected_time_bins(&time_range, 60).unwrap();

assert_eq!(bins.len(), 61);
assert_eq!(bins.last().unwrap().0, "2026-06-04T04:48:00+00:00");
assert_eq!(bins.last().unwrap().1, "2026-06-04T05:56:00+00:00");
}

#[test]
fn match_time_bin_key_handles_negative_timezone_offsets() {
let expected_bins = vec![(
"2023-01-01T05:00:00+00:00".to_string(),
"2023-01-01T06:00:00+00:00".to_string(),
)];

let matched = match_time_bin_key("2023-01-01T00:00:00-05:00", &expected_bins);

assert_eq!(matched, "2023-01-01T05:00:00+00:00");
}

fn time_period_from_str(start: &str, end: &str) -> TimeRange {
TimeRange {
start: DateTime::parse_from_rfc3339(start).unwrap().into(),
Expand Down
Loading