Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ All notable changes to this project will be documented in this file.

- Improve tracing for running queries on Trino, adding spans for the request to Trino and parsing ([#71]).
- Improve performance by using [`serde_json::value::RawValue`](https://docs.rs/serde_json/latest/serde_json/value/struct.RawValue.html) for the `data` and `columns` attributes to avoid unneeded deserialization and serialization of them ([#73]).
- Bumped to Rust 2024 edition ([#76]).

[#70]: https://github.com/stackabletech/trino-lb/pull/70
[#71]: https://github.com/stackabletech/trino-lb/pull/71
[#73]: https://github.com/stackabletech/trino-lb/pull/73
[#74]: https://github.com/stackabletech/trino-lb/pull/74
[#76]: https://github.com/stackabletech/trino-lb/pull/76

## [0.4.1] - 2025-03-03

Expand Down
29 changes: 14 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ resolver = "2"
version = "0.4.1"
authors = ["Stackable GmbH <info@stackable.tech>"]
license = "Apache-2.0"
edition = "2021"
edition = "2024"
repository = "https://github.com/stackabletech/trino-lb"

[workspace.dependencies]
Expand Down
2 changes: 1 addition & 1 deletion trino-lb-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration};
use args::Args;
use clap::Parser;
use indicatif::{MultiProgress, ProgressBar};
use prusto::{auth::Auth, ClientBuilder, Row};
use prusto::{ClientBuilder, Row, auth::Auth};
use tokio::time;

mod args;
Expand Down
2 changes: 1 addition & 1 deletion trino-lb-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use serde::Deserialize;
use snafu::{ResultExt, Snafu};
use url::Url;

use crate::{trino_query_plan::QueryPlanEstimation, TrinoClusterName};
use crate::{TrinoClusterName, trino_query_plan::QueryPlanEstimation};

#[derive(Snafu, Debug)]
pub enum Error {
Expand Down
2 changes: 1 addition & 1 deletion trino-lb-core/src/sanitization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ impl Sanitize for http::HeaderMap {
#[cfg(test)]
mod tests {
use http::{
header::{CONTENT_LENGTH, HOST},
HeaderMap, HeaderValue,
header::{CONTENT_LENGTH, HOST},
};

use super::*;
Expand Down
10 changes: 7 additions & 3 deletions trino-lb-core/src/trino_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use snafu::{ResultExt, Snafu};
use tracing::instrument;
use url::Url;

use crate::{trino_query::QueuedQuery, TrinoQueryId};
use crate::{TrinoQueryId, trino_query::QueuedQuery};

#[derive(Snafu, Debug)]
pub enum Error {
Expand All @@ -23,10 +23,14 @@ pub enum Error {
#[snafu(display("Failed to parse nextUri Trino send us"))]
ParseNextUriFromTrino { source: url::ParseError },

#[snafu(display("Failed to determine the elapsed time of a queued query. Are all system clocks of trino-lb instances in sync?"))]
#[snafu(display(
"Failed to determine the elapsed time of a queued query. Are all system clocks of trino-lb instances in sync?"
))]
DetermineElapsedTime { source: SystemTimeError },

#[snafu(display("The queued time {queued_time:?} is too big to be send to trino, as the trino API only accepts an 64bit number for queued_time_millis"))]
#[snafu(display(
"The queued time {queued_time:?} is too big to be send to trino, as the trino API only accepts an 64bit number for queued_time_millis"
))]
ElapsedTimeTooBig {
source: TryFromIntError,
queued_time: Duration,
Expand Down
2 changes: 1 addition & 1 deletion trino-lb-core/src/trino_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
use tracing::instrument;
use url::Url;

use crate::{sanitization::Sanitize, TrinoClusterName, TrinoLbQueryId, TrinoQueryId};
use crate::{TrinoClusterName, TrinoLbQueryId, TrinoQueryId, sanitization::Sanitize};

pub const QUEUED_QUERY_ID_PREFIX: &str = "trino_lb_";

Expand Down
101 changes: 57 additions & 44 deletions trino-lb-persistence/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use snafu::{OptionExt, ResultExt, Snafu};
use tokio::sync::RwLock;
use tracing::{error, info, instrument};
use trino_lb_core::{
TrinoClusterName, TrinoLbQueryId, TrinoQueryId,
trino_cluster::ClusterState,
trino_query::{QueuedQuery, TrinoQuery},
TrinoClusterName, TrinoLbQueryId, TrinoQueryId,
};

use crate::Persistence;
Expand All @@ -35,7 +35,9 @@ pub enum Error {
#[snafu(display("Failed to determined elapsed time since last queryCountFetcher update"))]
DetermineElapsedTimeSinceLastUpdate { source: SystemTimeError },

#[snafu(display("Failed to store determined elapsed time since last queryCountFetcher update as millis in a u64"))]
#[snafu(display(
"Failed to store determined elapsed time since last queryCountFetcher update as millis in a u64"
))]
ConvertElapsedTimeSinceLastUpdateToMillis { source: TryFromIntError },
}

Expand Down Expand Up @@ -115,37 +117,40 @@ impl Persistence for InMemoryPersistence {
) -> Result<bool, super::Error> {
let current_counts = self.cluster_query_counts.read().await;

if let Some(count) = current_counts.get(cluster_name) {
let mut current = count.load(Ordering::SeqCst);
loop {
if current + 1 > max_allowed_count {
return Ok(false);
}
match current_counts.get(cluster_name) {
Some(count) => {
let mut current = count.load(Ordering::SeqCst);
loop {
if current + 1 > max_allowed_count {
return Ok(false);
}

match count.compare_exchange_weak(
current,
current + 1,
Ordering::SeqCst,
// [`Ordering::Relaxed`] should be sufficient here, but better safe than sorry
Ordering::SeqCst,
) {
Ok(_) => {
return Ok(true);
match count.compare_exchange_weak(
current,
current + 1,
Ordering::SeqCst,
// [`Ordering::Relaxed`] should be sufficient here, but better safe than sorry
Ordering::SeqCst,
) {
Ok(_) => {
return Ok(true);
}
Err(x) => current = x,
}
Err(x) => current = x,
}
}
} else {
// We need to drop `current_counts` here to release the read lock it holds :)
// Otherwise the [`RwLock::write`] call will block forever.
drop(current_counts);
_ => {
// We need to drop `current_counts` here to release the read lock it holds :)
// Otherwise the [`RwLock::write`] call will block forever.
drop(current_counts);

self.cluster_query_counts
.write()
.await
.insert(cluster_name.clone(), AtomicU64::new(1));
self.cluster_query_counts
.write()
.await
.insert(cluster_name.clone(), AtomicU64::new(1));

Ok(true)
Ok(true)
}
}
}

Expand All @@ -154,16 +159,21 @@ impl Persistence for InMemoryPersistence {
&self,
cluster_name: &TrinoClusterName,
) -> Result<(), super::Error> {
if let Some(count) = self.cluster_query_counts.read().await.get(cluster_name) {
if count.fetch_sub(1, Ordering::SeqCst) == 0 {
match self.cluster_query_counts.read().await.get(cluster_name) {
Some(count) => {
if count.fetch_sub(1, Ordering::SeqCst) == 0 {
error!(
cluster_name,
"Persistence was asked to decrement the number of queries for the given cluster, but it would result in a negative amount of queries. Setting it to 0 instead."
);
count.store(0, Ordering::SeqCst);
}
}
_ => {
error!(
cluster_name,
"Persistence was asked to decrement the number of queries for the given cluster, but it would result in a negative amount of queries. Setting it to 0 instead."
);
count.store(0, Ordering::SeqCst);
"Persistence was asked to decrement the number of queries, but no query count for this cluster was not known. This should not happen."
)
}
} else {
error!("Persistence was asked to decrement the number of queries, but no query count for this cluster was not known. This should not happen.")
}

Ok(())
Expand All @@ -176,15 +186,18 @@ impl Persistence for InMemoryPersistence {
count: u64,
) -> Result<(), super::Error> {
let current_counts = self.cluster_query_counts.read().await;
if let Some(current_count) = current_counts.get(cluster_name) {
current_count.store(count, Ordering::SeqCst);
} else {
drop(current_counts);

self.cluster_query_counts
.write()
.await
.insert(cluster_name.clone(), AtomicU64::from(count));
match current_counts.get(cluster_name) {
Some(current_count) => {
current_count.store(count, Ordering::SeqCst);
}
_ => {
drop(current_counts);

self.cluster_query_counts
.write()
.await
.insert(cluster_name.clone(), AtomicU64::from(count));
}
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion trino-lb-persistence/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::{fmt::Debug, time::SystemTime};
use enum_dispatch::enum_dispatch;
use snafu::Snafu;
use trino_lb_core::{
TrinoClusterName, TrinoLbQueryId, TrinoQueryId,
trino_cluster::ClusterState,
trino_query::{QueuedQuery, TrinoQuery},
TrinoClusterName, TrinoLbQueryId, TrinoQueryId,
};

pub mod in_memory;
Expand Down
15 changes: 10 additions & 5 deletions trino-lb-persistence/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ use http::HeaderMap;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use sqlx::{
Pool, Postgres,
migrate::MigrateError,
postgres::PgPoolOptions,
query,
types::chrono::{DateTime, Utc},
Pool, Postgres,
};
use tracing::{debug, info, instrument, warn};
use trino_lb_core::{
TrinoClusterName, TrinoLbQueryId, TrinoQueryId,
config::PostgresConfig,
trino_cluster::ClusterState,
trino_query::{QueuedQuery, TrinoQuery},
TrinoClusterName, TrinoLbQueryId, TrinoQueryId,
};
use url::Url;

Expand Down Expand Up @@ -111,7 +111,9 @@ pub struct PostgresPersistence {

impl PostgresPersistence {
pub async fn new(config: &PostgresConfig) -> Result<Self, Error> {
warn!("Please note that the Postgres persistence is experimental! We have seen a few queries too much being send to the Trino clusters, probably related to some transactional problems");
warn!(
"Please note that the Postgres persistence is experimental! We have seen a few queries too much being send to the Trino clusters, probably related to some transactional problems"
);

let pool = PgPoolOptions::new()
.max_connections(config.max_connections)
Expand Down Expand Up @@ -281,8 +283,11 @@ impl Persistence for PostgresPersistence {
debug!(?current, "Current counter is");

if current + 1 > max_allowed_count {
debug!(current, max_allowed_count,
"Rejected increasing the cluster query count, as the current count + 1 is bigger than the max allowed count");
debug!(
current,
max_allowed_count,
"Rejected increasing the cluster query count, as the current count + 1 is bigger than the max allowed count"
);
transaction
.rollback()
.await
Expand Down
Loading
Loading