diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c5083d..bedc7d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,11 +16,13 @@ All notable changes to this project will be documented in this file. - Improve tracing for running queries on Trino, adding spans for the request to Trino and parsing ([#71]). - Improve performance by using [`serde_json::value::RawValue`](https://docs.rs/serde_json/latest/serde_json/value/struct.RawValue.html) for the `data` and `columns` attributes to avoid unneeded deserialization and serialization of them ([#73]). +- Bumped to Rust 2024 edition ([#76]). [#70]: https://github.com/stackabletech/trino-lb/pull/70 [#71]: https://github.com/stackabletech/trino-lb/pull/71 [#73]: https://github.com/stackabletech/trino-lb/pull/73 [#74]: https://github.com/stackabletech/trino-lb/pull/74 +[#76]: https://github.com/stackabletech/trino-lb/pull/76 ## [0.4.1] - 2025-03-03 diff --git a/Cargo.lock b/Cargo.lock index 01c65a0..bdaa858 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -428,9 +428,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "base64ct" -version = "1.7.1" +version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb97d56060ee67d285efb8001fec9d2a4c710c32efd2e14b5cbb5ba71930fc2d" +checksum = "89e25b6adfb930f02d1981565a6e5d9c547ac15a96606256d3b59040e5cd4ca3" [[package]] name = "basic-toml" @@ -2026,9 +2026,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.9.2" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db9c683daf087dc577b7506e9695b3d556a9f3849903fa28186283afd6809e9" +checksum = "fe7db12097d22ec582439daf8618b8fdd1a7bef6270e9af3b1ebcd30893cf413" [[package]] name = "litemap" @@ -2262,9 +2262,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.21.0" +version = "1.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cde51589ab56b20a6f686b2c68f7a0bd6add753d697abf720d63f8db3ab7b1ad" +checksum = "d75b0bedcc4fe52caa0e03d9f1151a323e4aa5e2d78ba3580400cd3c9e2bc4bc" [[package]] name = "openssl-probe" @@ -3914,11 +3914,10 @@ checksum = "e502f78cdbb8ba4718f566c418c52bc729126ffd16baee5baa718cf25dd5a69a" [[package]] name = "tempfile" -version = "3.18.0" +version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c317e0a526ee6120d8dabad239c8dadca62b24b6f168914bbbc8e2fb1f0e567" +checksum = "488960f40a3fd53d72c2a29a58722561dee8afdd175bd88e3db4677d7b2ba600" dependencies = [ - "cfg-if", "fastrand", "getrandom 0.3.1", "once_cell", @@ -4034,9 +4033,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.44.0" +version = "1.44.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9975ea0f48b5aa3972bf2d888c238182458437cc2a19374b81b25cdf1023fb3a" +checksum = "f382da615b842244d4b8738c82ed1275e6c5dd90c459a30941cd07080b06c91a" dependencies = [ "backtrace", "bytes", @@ -4094,9 +4093,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.13" +version = "0.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" +checksum = "6b9590b93e6fcc1739458317cccd391ad3955e2bde8913edf6f95f9e65a8f034" dependencies = [ "bytes", "futures-core", @@ -4539,9 +4538,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.15.1" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0f540e3240398cce6128b64ba83fdbdd86129c16a3aa1a3a252efd66eb3d587" +checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" dependencies = [ "getrandom 0.3.1", "serde", diff --git a/Cargo.toml b/Cargo.toml index e3f9d21..a6b7bbe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ resolver = "2" version = "0.4.1" authors = ["Stackable GmbH "] license = "Apache-2.0" -edition = "2021" +edition = "2024" repository = "https://github.com/stackabletech/trino-lb" [workspace.dependencies] diff --git a/trino-lb-bench/src/main.rs b/trino-lb-bench/src/main.rs index 6bafa7b..fa06f54 100644 --- a/trino-lb-bench/src/main.rs +++ b/trino-lb-bench/src/main.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration}; use args::Args; use clap::Parser; use indicatif::{MultiProgress, ProgressBar}; -use prusto::{auth::Auth, ClientBuilder, Row}; +use prusto::{ClientBuilder, Row, auth::Auth}; use tokio::time; mod args; diff --git a/trino-lb-core/src/config.rs b/trino-lb-core/src/config.rs index 3a5355c..40ba782 100644 --- a/trino-lb-core/src/config.rs +++ b/trino-lb-core/src/config.rs @@ -10,7 +10,7 @@ use serde::Deserialize; use snafu::{ResultExt, Snafu}; use url::Url; -use crate::{trino_query_plan::QueryPlanEstimation, TrinoClusterName}; +use crate::{TrinoClusterName, trino_query_plan::QueryPlanEstimation}; #[derive(Snafu, Debug)] pub enum Error { diff --git a/trino-lb-core/src/sanitization.rs b/trino-lb-core/src/sanitization.rs index dd8f893..6bafd0c 100644 --- a/trino-lb-core/src/sanitization.rs +++ b/trino-lb-core/src/sanitization.rs @@ -15,8 +15,8 @@ impl Sanitize for http::HeaderMap { #[cfg(test)] mod tests { use http::{ - header::{CONTENT_LENGTH, HOST}, HeaderMap, HeaderValue, + header::{CONTENT_LENGTH, HOST}, }; use super::*; diff --git a/trino-lb-core/src/trino_api.rs b/trino-lb-core/src/trino_api.rs index 5867151..d03dcc5 100644 --- a/trino-lb-core/src/trino_api.rs +++ b/trino-lb-core/src/trino_api.rs @@ -10,7 +10,7 @@ use snafu::{ResultExt, Snafu}; use tracing::instrument; use url::Url; -use crate::{trino_query::QueuedQuery, TrinoQueryId}; +use crate::{TrinoQueryId, trino_query::QueuedQuery}; #[derive(Snafu, Debug)] pub enum Error { @@ -23,10 +23,14 @@ pub enum Error { #[snafu(display("Failed to parse nextUri Trino send us"))] ParseNextUriFromTrino { source: url::ParseError }, - #[snafu(display("Failed to determine the elapsed time of a queued query. Are all system clocks of trino-lb instances in sync?"))] + #[snafu(display( + "Failed to determine the elapsed time of a queued query. Are all system clocks of trino-lb instances in sync?" + ))] DetermineElapsedTime { source: SystemTimeError }, - #[snafu(display("The queued time {queued_time:?} is too big to be send to trino, as the trino API only accepts an 64bit number for queued_time_millis"))] + #[snafu(display( + "The queued time {queued_time:?} is too big to be send to trino, as the trino API only accepts an 64bit number for queued_time_millis" + ))] ElapsedTimeTooBig { source: TryFromIntError, queued_time: Duration, diff --git a/trino-lb-core/src/trino_query.rs b/trino-lb-core/src/trino_query.rs index 55513c8..9701ac8 100644 --- a/trino-lb-core/src/trino_query.rs +++ b/trino-lb-core/src/trino_query.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use tracing::instrument; use url::Url; -use crate::{sanitization::Sanitize, TrinoClusterName, TrinoLbQueryId, TrinoQueryId}; +use crate::{TrinoClusterName, TrinoLbQueryId, TrinoQueryId, sanitization::Sanitize}; pub const QUEUED_QUERY_ID_PREFIX: &str = "trino_lb_"; diff --git a/trino-lb-persistence/src/in_memory.rs b/trino-lb-persistence/src/in_memory.rs index 5d120d8..6f01139 100644 --- a/trino-lb-persistence/src/in_memory.rs +++ b/trino-lb-persistence/src/in_memory.rs @@ -9,9 +9,9 @@ use snafu::{OptionExt, ResultExt, Snafu}; use tokio::sync::RwLock; use tracing::{error, info, instrument}; use trino_lb_core::{ + TrinoClusterName, TrinoLbQueryId, TrinoQueryId, trino_cluster::ClusterState, trino_query::{QueuedQuery, TrinoQuery}, - TrinoClusterName, TrinoLbQueryId, TrinoQueryId, }; use crate::Persistence; @@ -35,7 +35,9 @@ pub enum Error { #[snafu(display("Failed to determined elapsed time since last queryCountFetcher update"))] DetermineElapsedTimeSinceLastUpdate { source: SystemTimeError }, - #[snafu(display("Failed to store determined elapsed time since last queryCountFetcher update as millis in a u64"))] + #[snafu(display( + "Failed to store determined elapsed time since last queryCountFetcher update as millis in a u64" + ))] ConvertElapsedTimeSinceLastUpdateToMillis { source: TryFromIntError }, } @@ -115,37 +117,40 @@ impl Persistence for InMemoryPersistence { ) -> Result { let current_counts = self.cluster_query_counts.read().await; - if let Some(count) = current_counts.get(cluster_name) { - let mut current = count.load(Ordering::SeqCst); - loop { - if current + 1 > max_allowed_count { - return Ok(false); - } + match current_counts.get(cluster_name) { + Some(count) => { + let mut current = count.load(Ordering::SeqCst); + loop { + if current + 1 > max_allowed_count { + return Ok(false); + } - match count.compare_exchange_weak( - current, - current + 1, - Ordering::SeqCst, - // [`Ordering::Relaxed`] should be sufficient here, but better safe than sorry - Ordering::SeqCst, - ) { - Ok(_) => { - return Ok(true); + match count.compare_exchange_weak( + current, + current + 1, + Ordering::SeqCst, + // [`Ordering::Relaxed`] should be sufficient here, but better safe than sorry + Ordering::SeqCst, + ) { + Ok(_) => { + return Ok(true); + } + Err(x) => current = x, } - Err(x) => current = x, } } - } else { - // We need to drop `current_counts` here to release the read lock it holds :) - // Otherwise the [`RwLock::write`] call will block forever. - drop(current_counts); + _ => { + // We need to drop `current_counts` here to release the read lock it holds :) + // Otherwise the [`RwLock::write`] call will block forever. + drop(current_counts); - self.cluster_query_counts - .write() - .await - .insert(cluster_name.clone(), AtomicU64::new(1)); + self.cluster_query_counts + .write() + .await + .insert(cluster_name.clone(), AtomicU64::new(1)); - Ok(true) + Ok(true) + } } } @@ -154,16 +159,21 @@ impl Persistence for InMemoryPersistence { &self, cluster_name: &TrinoClusterName, ) -> Result<(), super::Error> { - if let Some(count) = self.cluster_query_counts.read().await.get(cluster_name) { - if count.fetch_sub(1, Ordering::SeqCst) == 0 { + match self.cluster_query_counts.read().await.get(cluster_name) { + Some(count) => { + if count.fetch_sub(1, Ordering::SeqCst) == 0 { + error!( + cluster_name, + "Persistence was asked to decrement the number of queries for the given cluster, but it would result in a negative amount of queries. Setting it to 0 instead." + ); + count.store(0, Ordering::SeqCst); + } + } + _ => { error!( - cluster_name, - "Persistence was asked to decrement the number of queries for the given cluster, but it would result in a negative amount of queries. Setting it to 0 instead." - ); - count.store(0, Ordering::SeqCst); + "Persistence was asked to decrement the number of queries, but no query count for this cluster was not known. This should not happen." + ) } - } else { - error!("Persistence was asked to decrement the number of queries, but no query count for this cluster was not known. This should not happen.") } Ok(()) @@ -176,15 +186,18 @@ impl Persistence for InMemoryPersistence { count: u64, ) -> Result<(), super::Error> { let current_counts = self.cluster_query_counts.read().await; - if let Some(current_count) = current_counts.get(cluster_name) { - current_count.store(count, Ordering::SeqCst); - } else { - drop(current_counts); - - self.cluster_query_counts - .write() - .await - .insert(cluster_name.clone(), AtomicU64::from(count)); + match current_counts.get(cluster_name) { + Some(current_count) => { + current_count.store(count, Ordering::SeqCst); + } + _ => { + drop(current_counts); + + self.cluster_query_counts + .write() + .await + .insert(cluster_name.clone(), AtomicU64::from(count)); + } } Ok(()) diff --git a/trino-lb-persistence/src/lib.rs b/trino-lb-persistence/src/lib.rs index e277feb..0187d96 100644 --- a/trino-lb-persistence/src/lib.rs +++ b/trino-lb-persistence/src/lib.rs @@ -3,9 +3,9 @@ use std::{fmt::Debug, time::SystemTime}; use enum_dispatch::enum_dispatch; use snafu::Snafu; use trino_lb_core::{ + TrinoClusterName, TrinoLbQueryId, TrinoQueryId, trino_cluster::ClusterState, trino_query::{QueuedQuery, TrinoQuery}, - TrinoClusterName, TrinoLbQueryId, TrinoQueryId, }; pub mod in_memory; diff --git a/trino-lb-persistence/src/postgres/mod.rs b/trino-lb-persistence/src/postgres/mod.rs index 7862c79..710eb31 100644 --- a/trino-lb-persistence/src/postgres/mod.rs +++ b/trino-lb-persistence/src/postgres/mod.rs @@ -7,18 +7,18 @@ use http::HeaderMap; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; use sqlx::{ + Pool, Postgres, migrate::MigrateError, postgres::PgPoolOptions, query, types::chrono::{DateTime, Utc}, - Pool, Postgres, }; use tracing::{debug, info, instrument, warn}; use trino_lb_core::{ + TrinoClusterName, TrinoLbQueryId, TrinoQueryId, config::PostgresConfig, trino_cluster::ClusterState, trino_query::{QueuedQuery, TrinoQuery}, - TrinoClusterName, TrinoLbQueryId, TrinoQueryId, }; use url::Url; @@ -111,7 +111,9 @@ pub struct PostgresPersistence { impl PostgresPersistence { pub async fn new(config: &PostgresConfig) -> Result { - warn!("Please note that the Postgres persistence is experimental! We have seen a few queries too much being send to the Trino clusters, probably related to some transactional problems"); + warn!( + "Please note that the Postgres persistence is experimental! We have seen a few queries too much being send to the Trino clusters, probably related to some transactional problems" + ); let pool = PgPoolOptions::new() .max_connections(config.max_connections) @@ -281,8 +283,11 @@ impl Persistence for PostgresPersistence { debug!(?current, "Current counter is"); if current + 1 > max_allowed_count { - debug!(current, max_allowed_count, - "Rejected increasing the cluster query count, as the current count + 1 is bigger than the max allowed count"); + debug!( + current, + max_allowed_count, + "Rejected increasing the cluster query count, as the current count + 1 is bigger than the max allowed count" + ); transaction .rollback() .await diff --git a/trino-lb-persistence/src/redis/mod.rs b/trino-lb-persistence/src/redis/mod.rs index 1486fbf..d0c6de1 100644 --- a/trino-lb-persistence/src/redis/mod.rs +++ b/trino-lb-persistence/src/redis/mod.rs @@ -4,20 +4,20 @@ use std::{ time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}, }; -use futures::{future::try_join_all, TryFutureExt}; +use futures::{TryFutureExt, future::try_join_all}; use redis::{ + AsyncCommands, Client, RedisError, Script, aio::{ConnectionManager, MultiplexedConnection}, cluster::ClusterClientBuilder, cluster_async::ClusterConnection, - AsyncCommands, Client, RedisError, Script, }; use snafu::{OptionExt, ResultExt, Snafu}; -use tracing::{debug, debug_span, info, instrument, Instrument}; +use tracing::{Instrument, debug, debug_span, info, instrument}; use trino_lb_core::{ + TrinoClusterName, TrinoLbQueryId, TrinoQueryId, config::RedisConfig, trino_cluster::ClusterState, trino_query::{QueuedQuery, TrinoQuery}, - TrinoClusterName, TrinoLbQueryId, TrinoQueryId, }; use url::Url; @@ -78,7 +78,9 @@ pub enum Error { cluster_name: TrinoClusterName, }, - #[snafu(display("Failed to convert retrieved cluster query count {retrieved:?} to an u64 for cluster {cluster_name:?}"))] + #[snafu(display( + "Failed to convert retrieved cluster query count {retrieved:?} to an u64 for cluster {cluster_name:?}" + ))] ConvertClusterQueryCountToU64 { source: TryFromIntError, cluster_name: TrinoClusterName, @@ -94,7 +96,9 @@ pub enum Error { #[snafu(display("Failed to determined elapsed time since last queryCountFetcher update"))] DetermineElapsedTimeSinceLastUpdate { source: SystemTimeError }, - #[snafu(display("Failed to store determined elapsed time since last queryCountFetcher update as millis in a u64"))] + #[snafu(display( + "Failed to store determined elapsed time since last queryCountFetcher update as millis in a u64" + ))] ConvertElapsedTimeSinceLastUpdateToMillis { source: TryFromIntError }, #[snafu(display("Failed to set cluster state"))] @@ -292,8 +296,11 @@ where debug!(current, "Current counter is"); if current + 1 > max_allowed_count { - debug!(current, max_allowed_count, - "Rejected increasing the cluster query count, as the current count + 1 is bigger than the max allowed count"); + debug!( + current, + max_allowed_count, + "Rejected increasing the cluster query count, as the current count + 1 is bigger than the max allowed count" + ); return Ok(false); } diff --git a/trino-lb/src/cluster_group_manager.rs b/trino-lb/src/cluster_group_manager.rs index 67d191f..4bd0272 100644 --- a/trino-lb/src/cluster_group_manager.rs +++ b/trino-lb/src/cluster_group_manager.rs @@ -4,12 +4,12 @@ use std::{ sync::Arc, }; -use axum::{body::Body, response::IntoResponse, Json}; +use axum::{Json, body::Body, response::IntoResponse}; use futures::future::try_join_all; use http::{HeaderMap, StatusCode}; use reqwest::Client; use snafu::{OptionExt, ResultExt, Snafu}; -use tracing::{debug, info_span, instrument, Instrument}; +use tracing::{Instrument, debug, info_span, instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; use trino_lb_core::{ config::Config, sanitization::Sanitize, trino_api::TrinoQueryApiResponse, @@ -43,7 +43,9 @@ pub enum Error { #[snafu(display("Failed to parse Trino API response as JSON"))] ParseTrinoResponse { source: serde_json::Error }, - #[snafu(display("Configuration error: A specific Trino cluster can only be part of a single clusterGroup. Please make sure the Trino cluster {cluster_name:?} only is part of a single clusterGroup."))] + #[snafu(display( + "Configuration error: A specific Trino cluster can only be part of a single clusterGroup. Please make sure the Trino cluster {cluster_name:?} only is part of a single clusterGroup." + ))] ConfigErrorTrinoClusterInMultipleClusterGroups { cluster_name: String }, #[snafu(display( diff --git a/trino-lb/src/http_server/mod.rs b/trino-lb/src/http_server/mod.rs index 2037722..d157b04 100644 --- a/trino-lb/src/http_server/mod.rs +++ b/trino-lb/src/http_server/mod.rs @@ -7,11 +7,11 @@ use std::{ }; use axum::{ + Router, response::Redirect, routing::{delete, get, post}, - Router, }; -use axum_server::{tls_rustls::RustlsConfig, Handle}; +use axum_server::{Handle, tls_rustls::RustlsConfig}; use futures::FutureExt; use snafu::{OptionExt, ResultExt, Snafu}; use tokio::time::sleep; diff --git a/trino-lb/src/http_server/ui/query.rs b/trino-lb/src/http_server/ui/query.rs index 03f532a..9f9d0bc 100644 --- a/trino-lb/src/http_server/ui/query.rs +++ b/trino-lb/src/http_server/ui/query.rs @@ -15,10 +15,14 @@ use crate::http_server::AppState; #[derive(Snafu, Debug)] pub enum Error { - #[snafu(display("Query ID missing. It needs to be specified as query parameter such as https://127.0.0.1:8443/ui/query.html?trino_lb_20231227_122313_2JzDa3bT"))] + #[snafu(display( + "Query ID missing. It needs to be specified as query parameter such as https://127.0.0.1:8443/ui/query.html?trino_lb_20231227_122313_2JzDa3bT" + ))] QueryIdMissing, - #[snafu(display("Query with ID {query_id:?} not found. Maybe the query is not queued any more but was handed over to a Trino cluster."))] + #[snafu(display( + "Query with ID {query_id:?} not found. Maybe the query is not queued any more but was handed over to a Trino cluster." + ))] QueryIdNotFound { source: trino_lb_persistence::Error, query_id: TrinoLbQueryId, diff --git a/trino-lb/src/http_server/v1/statement.rs b/trino-lb/src/http_server/v1/statement.rs index 6e4b786..ef62ff2 100644 --- a/trino-lb/src/http_server/v1/statement.rs +++ b/trino-lb/src/http_server/v1/statement.rs @@ -7,21 +7,21 @@ use std::{ }; use axum::{ + Json, extract::{Path, State}, response::{IntoResponse, Response}, - Json, }; use futures::TryFutureExt; use http::{HeaderMap, StatusCode, Uri}; use opentelemetry::KeyValue; use snafu::{ResultExt, Snafu}; use tokio::time::Instant; -use tracing::{debug, info, info_span, instrument, warn, Instrument}; +use tracing::{Instrument, debug, info, info_span, instrument, warn}; use trino_lb_core::{ + TrinoLbQueryId, TrinoQueryId, sanitization::Sanitize, trino_api::TrinoQueryApiResponse, trino_query::{QueuedQuery, TrinoQuery}, - TrinoLbQueryId, TrinoQueryId, }; use trino_lb_persistence::Persistence; use url::Url; @@ -366,10 +366,10 @@ async fn queue_or_hand_over_query( return Ok(send_to_trino_response); } else { debug!( - cluster = cluster.name, - "The cluster had enough space when asked for the best cluster, but inc_cluster_query_count returned None, \ + cluster = cluster.name, + "The cluster had enough space when asked for the best cluster, but inc_cluster_query_count returned None, \ probably because the cluster has reached its maximum query count in the meantime" - ); + ); } } diff --git a/trino-lb/src/main.rs b/trino-lb/src/main.rs index 0b3b8f4..7be249d 100644 --- a/trino-lb/src/main.rs +++ b/trino-lb/src/main.rs @@ -13,10 +13,10 @@ use scaling::Scaler; use snafu::{ResultExt, Snafu}; use trino_lb_core::config::{self, Config, PersistenceConfig}; use trino_lb_persistence::{ + PersistenceImplementation, in_memory::InMemoryPersistence, postgres::{self, PostgresPersistence}, redis::{self, RedisPersistence}, - PersistenceImplementation, }; use crate::{args::Args, http_server::start_http_server}; diff --git a/trino-lb/src/maintenance/leftover_queries.rs b/trino-lb/src/maintenance/leftover_queries.rs index 59abb72..b7112c0 100644 --- a/trino-lb/src/maintenance/leftover_queries.rs +++ b/trino-lb/src/maintenance/leftover_queries.rs @@ -2,8 +2,9 @@ use std::{ sync::Arc, time::{Duration, SystemTime}, }; + use tokio::time; -use tracing::{debug, error, info, info_span, Instrument}; +use tracing::{Instrument, debug, error, info, info_span}; #[cfg(doc)] use trino_lb_core::trino_query::QueuedQuery; use trino_lb_persistence::{Persistence, PersistenceImplementation}; diff --git a/trino-lb/src/maintenance/query_count_fetcher.rs b/trino-lb/src/maintenance/query_count_fetcher.rs index 34ee65d..8c15072 100644 --- a/trino-lb/src/maintenance/query_count_fetcher.rs +++ b/trino-lb/src/maintenance/query_count_fetcher.rs @@ -4,11 +4,11 @@ use std::{ time::{Duration, SystemTime}, }; -use futures::{future::join_all, TryFutureExt}; +use futures::{TryFutureExt, future::join_all}; use snafu::Snafu; use tokio::time; -use tracing::{error, info, info_span, instrument, Instrument}; -use trino_lb_core::{config::TrinoClusterConfig, trino_cluster::ClusterState, TrinoClusterName}; +use tracing::{Instrument, error, info, info_span, instrument}; +use trino_lb_core::{TrinoClusterName, config::TrinoClusterConfig, trino_cluster::ClusterState}; use trino_lb_persistence::{Persistence, PersistenceImplementation}; use crate::{config::TrinoClusterGroupConfig, metrics::Metrics, trino_client::get_cluster_info}; diff --git a/trino-lb/src/metrics.rs b/trino-lb/src/metrics.rs index cf9d132..1a5cecf 100644 --- a/trino-lb/src/metrics.rs +++ b/trino-lb/src/metrics.rs @@ -6,8 +6,8 @@ use std::{ use futures::future::try_join_all; use opentelemetry::{ - metrics::{Counter, Histogram, MetricsError}, KeyValue, + metrics::{Counter, Histogram, MetricsError}, }; use prometheus::Registry; use snafu::{ResultExt, Snafu}; @@ -17,9 +17,9 @@ use tokio::{ }; use tracing::error; use trino_lb_core::{ + TrinoClusterName, config::{Config, TrinoClusterGroupConfig}, trino_cluster::ClusterState, - TrinoClusterName, }; use trino_lb_persistence::{Persistence, PersistenceImplementation}; diff --git a/trino-lb/src/routing/client_tags.rs b/trino-lb/src/routing/client_tags.rs index e2dbea8..5050d56 100644 --- a/trino-lb/src/routing/client_tags.rs +++ b/trino-lb/src/routing/client_tags.rs @@ -79,11 +79,11 @@ impl RouterImplementationTrait for ClientTagsRouter { #[cfg(test)] mod tests { - use super::*; - use http::{HeaderMap, HeaderName}; use rstest::rstest; + use super::*; + #[rstest] #[case(None, None)] #[case(Some("foo"), Some("my-target"))] diff --git a/trino-lb/src/routing/mod.rs b/trino-lb/src/routing/mod.rs index cb3d539..bc96e78 100644 --- a/trino-lb/src/routing/mod.rs +++ b/trino-lb/src/routing/mod.rs @@ -26,13 +26,17 @@ pub enum Error { #[snafu(display("Failed to create client tags router"))] CreateClientTagsRouter { source: client_tags::Error }, - #[snafu(display("Configuration error: The router {router:?} is configured to route to trinoClusterGroup {trino_cluster_group:?} which does not exist"))] + #[snafu(display( + "Configuration error: The router {router:?} is configured to route to trinoClusterGroup {trino_cluster_group:?} which does not exist" + ))] ConfigErrorClusterGroupDoesNotExist { router: String, trino_cluster_group: String, }, - #[snafu(display("Configuration error: The routingFallback is configured to route to trinoClusterGroup {routing_fallback:?} which does not exist"))] + #[snafu(display( + "Configuration error: The routingFallback is configured to route to trinoClusterGroup {routing_fallback:?} which does not exist" + ))] ConfigErrorRoutingFallbackDoesNotExist { routing_fallback: String }, } diff --git a/trino-lb/src/routing/python_script.rs b/trino-lb/src/routing/python_script.rs index 9e99234..b193117 100644 --- a/trino-lb/src/routing/python_script.rs +++ b/trino-lb/src/routing/python_script.rs @@ -4,9 +4,9 @@ use std::{ }; use pyo3::{ + Py, PyAny, Python, ffi::c_str, types::{IntoPyDict, PyAnyMethods, PyModule}, - Py, PyAny, Python, }; use snafu::{ResultExt, Snafu}; use tracing::{error, instrument, warn}; @@ -130,12 +130,12 @@ fn header_map_to_hashmap(headers: &http::HeaderMap) -> HashMap { #[cfg(test)] mod tests { - use super::*; - use http::{HeaderMap, HeaderName}; use indoc::indoc; use rstest::rstest; + use super::*; + fn create_router(script: String) -> PythonScriptRouter { let valid_target_groups = HashSet::from([ "s".to_string(), diff --git a/trino-lb/src/scaling/config.rs b/trino-lb/src/scaling/config.rs index ad5cceb..276a6a4 100644 --- a/trino-lb/src/scaling/config.rs +++ b/trino-lb/src/scaling/config.rs @@ -119,11 +119,11 @@ impl MinClusters { #[cfg(test)] mod tests { - use super::*; - use chrono::TimeZone; use rstest::rstest; + use super::*; + #[rstest] #[case("00:00:00 - 23:59:59", Utc.with_ymd_and_hms(2023, 12, 8, 0, 0, 0).unwrap(), true)] #[case("00:00:00 - 23:59:59", Utc.with_ymd_and_hms(2023, 12, 8, 8, 0, 0).unwrap(), true)] diff --git a/trino-lb/src/scaling/mod.rs b/trino-lb/src/scaling/mod.rs index f2e6a6b..c4e6475 100644 --- a/trino-lb/src/scaling/mod.rs +++ b/trino-lb/src/scaling/mod.rs @@ -14,17 +14,16 @@ use tokio::{ task::{JoinError, JoinSet}, time, }; -use tracing::{debug, error, info, instrument, warn, Instrument, Span}; +use tracing::{Instrument, Span, debug, error, info, instrument, warn}; use trino_lb_core::{ + TrinoClusterName, config::{Config, ScalerConfig, ScalerConfigImplementation}, trino_cluster::ClusterState, - TrinoClusterName, }; use trino_lb_persistence::{Persistence, PersistenceImplementation}; -use crate::cluster_group_manager::TrinoCluster; - use self::config::TrinoClusterGroupAutoscaling; +use crate::cluster_group_manager::TrinoCluster; pub mod config; pub mod stackable; @@ -35,7 +34,9 @@ pub enum Error { #[allow(clippy::enum_variant_names)] StackableError { source: stackable::Error }, - #[snafu(display("Configuration error: A specific Trino cluster can only be part of a single clusterGroup. Please make sure the Trino cluster {cluster_name:?} only is part of a single clusterGroup."))] + #[snafu(display( + "Configuration error: A specific Trino cluster can only be part of a single clusterGroup. Please make sure the Trino cluster {cluster_name:?} only is part of a single clusterGroup." + ))] ConfigErrorTrinoClusterInMultipleClusterGroups { cluster_name: String }, #[snafu(display("Failed to create Stackable autoscaler"))] @@ -108,10 +109,14 @@ pub enum Error { #[snafu(display("Failed to join get current cluster state task"))] JoinGetCurrentClusterStateTask { source: JoinError }, - #[snafu(display("The variable \"scaler\" is None. This should never happen, as we only run the reconciliation when a scaler is configured!"))] + #[snafu(display( + "The variable \"scaler\" is None. This should never happen, as we only run the reconciliation when a scaler is configured!" + ))] ScalerVariableIsNone, - #[snafu(display("The scaler config is missing. This is a bug in trino-lb, as it should exist at this particular code path"))] + #[snafu(display( + "The scaler config is missing. This is a bug in trino-lb, as it should exist at this particular code path" + ))] ScalerConfigMissing, } @@ -565,7 +570,11 @@ impl Scaler { match target_state { ClusterState::Unknown => { - error!(cluster = cluster.name, ?target_state, "After calculating the new target states the state was \"Unknown\", so we did not enabled or disable the cluster. This should not happen!") + error!( + cluster = cluster.name, + ?target_state, + "After calculating the new target states the state was \"Unknown\", so we did not enabled or disable the cluster. This should not happen!" + ) } ClusterState::Stopped | ClusterState::Terminating => { scaler.deactivate(&cluster.name).await?; diff --git a/trino-lb/src/scaling/stackable.rs b/trino-lb/src/scaling/stackable.rs index af2eb1b..f2ef8c7 100644 --- a/trino-lb/src/scaling/stackable.rs +++ b/trino-lb/src/scaling/stackable.rs @@ -1,16 +1,16 @@ use std::collections::HashMap; use kube::{ + Api, Client, Discovery, api::{Patch, PatchParams}, core::{DynamicObject, GroupVersionKind}, - Api, Client, Discovery, }; use serde_json::Value; use snafu::{OptionExt, ResultExt, Snafu}; -use tracing::{debug_span, instrument, Instrument}; +use tracing::{Instrument, debug_span, instrument}; use trino_lb_core::{ - config::{StackableScalerConfig, TrinoClusterGroupConfig}, TrinoClusterName, + config::{StackableScalerConfig, TrinoClusterGroupConfig}, }; use super::ScalerTrait; @@ -102,7 +102,9 @@ pub enum Error { namespace: String, }, - #[snafu(display("The Trino cluster {cluster:?} has no information on how to be scaled, as it is missing from the Stackable clusterAutoscaler list"))] + #[snafu(display( + "The Trino cluster {cluster:?} has no information on how to be scaled, as it is missing from the Stackable clusterAutoscaler list" + ))] ClusterWithNoScalingInformation { cluster: TrinoClusterName }, #[snafu(display("The Trino cluster {cluster:?} in namespace {namespace:?} was not found"))] diff --git a/trino-lb/src/tracing.rs b/trino-lb/src/tracing.rs index a9b28fa..6360194 100644 --- a/trino-lb/src/tracing.rs +++ b/trino-lb/src/tracing.rs @@ -1,25 +1,24 @@ use std::{sync::Arc, time::Duration}; use opentelemetry::{ - global, + Context, KeyValue, global, metrics::MetricsError, trace::{TraceError, TracerProvider}, - Context, KeyValue, }; use opentelemetry_http::HeaderInjector; use opentelemetry_otlp::{TonicExporterBuilder, WithExportConfig}; use opentelemetry_sdk::{ + Resource, metrics::{ - reader::{DefaultAggregationSelector, DefaultTemporalitySelector}, Aggregation, Instrument, SdkMeterProvider, Stream, + reader::{DefaultAggregationSelector, DefaultTemporalitySelector}, }, propagation::TraceContextPropagator, trace::{self, RandomIdGenerator, Sampler}, - Resource, }; use snafu::{ResultExt, Snafu}; use tracing::{level_filters::LevelFilter, subscriber::SetGlobalDefaultError}; -use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer}; +use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt}; use trino_lb_core::config::{Config, TrinoLbTracingConfig}; use trino_lb_persistence::PersistenceImplementation; diff --git a/trino-lb/src/trino_client/cluster_info.rs b/trino-lb/src/trino_client/cluster_info.rs index d664c17..6710465 100644 --- a/trino-lb/src/trino_client/cluster_info.rs +++ b/trino-lb/src/trino_client/cluster_info.rs @@ -105,10 +105,10 @@ fn login_body(credentials: &TrinoClusterCredentialsConfig) -> String { #[cfg(test)] mod tests { - use super::*; - use rstest::rstest; + use super::*; + #[rstest] #[case("admin", "admin", "username=admin&password=admin&redirectPath=")] #[case( diff --git a/trino-lb/src/trino_client/mod.rs b/trino-lb/src/trino_client/mod.rs index c32f168..393b0f7 100644 --- a/trino-lb/src/trino_client/mod.rs +++ b/trino-lb/src/trino_client/mod.rs @@ -1,5 +1,5 @@ use http::HeaderMap; -use prusto::{auth::Auth, Client, ClientBuilder, DataSet}; +use prusto::{Client, ClientBuilder, DataSet, auth::Auth}; use snafu::{OptionExt, ResultExt, Snafu}; use tracing::instrument; use trino_lb_core::{ @@ -8,9 +8,8 @@ use trino_lb_core::{ }; use url::Url; -use crate::config::TrinoClientConfig; -pub use cluster_info::{get_cluster_info, ClusterInfo}; -use workarounds::query_estimation_workarounds; +pub use crate::trino_client::cluster_info::{ClusterInfo, get_cluster_info}; +use crate::{config::TrinoClientConfig, trino_client::workarounds::query_estimation_workarounds}; mod cluster_info; mod workarounds;