diff --git a/CHANGELOG.md b/CHANGELOG.md index cae06d0..5f31d25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,12 +22,14 @@ All notable changes to this project will be documented in this file. ### Fixed - Set connection and response timeout for Redis connections ([#85]). +- Only remove queries from the persistence in case they don't send a `nextUri` and are in state `FINISHED` ([#98]). [#68]: https://github.com/stackabletech/trino-lb/pull/68 [#85]: https://github.com/stackabletech/trino-lb/pull/85 [#86]: https://github.com/stackabletech/trino-lb/pull/86 [#91]: https://github.com/stackabletech/trino-lb/pull/91 [#95]: https://github.com/stackabletech/trino-lb/pull/95 +[#98]: https://github.com/stackabletech/trino-lb/pull/98 ## [0.5.0] - 2025-03-14 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 52e0cfb..b39db3c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -21,3 +21,30 @@ Last but not least run ```bash ./scripts/run_tests.sh ``` + +## mitm proxy debugging + +First, port-forward trino-lb to localhost: + +```bash +kubectl -n kuttl-test-more-titmouse port-forward svc/trino-lb 8443:8443 +``` + +Please make sure that `trinoLb.externalAddress` in your trino-lb config points to `https://127.0.0.1:8443`, so that it populates the nextUri correctly. + +Than start `mitmproxy`: + +```bash +# nix-shell -p mitmproxy +mitmproxy --listen-port 8080 --ssl-insecure +``` + +Afterwards connect with trino-cli: + +```bash +~/Downloads/trino-cli-478 --server https://127.0.0.1:8443 --insecure --user admin --password --http-proxy=127.0.0.1:8080 + +export TRINO_USER="admin" +export TRINO_PASSWORD="adminadmin" +echo 'SELECT * FROM tpch.sf100.customer' | ~/Downloads/trino-cli-478 --server https://127.0.0.1:8443 --insecure --password --http-proxy 127.0.0.1:8080 +``` diff --git a/tests/templates/kuttl/client-spooling/10-install-trino.yaml.j2 b/tests/templates/kuttl/client-spooling/10-install-trino.yaml.j2 index 7122d22..105103f 100644 --- a/tests/templates/kuttl/client-spooling/10-install-trino.yaml.j2 +++ b/tests/templates/kuttl/client-spooling/10-install-trino.yaml.j2 @@ -51,7 +51,7 @@ spec: min: 250m max: "1" memory: - limit: 3Gi + limit: 5Gi gracefulShutdownTimeout: 2m # Let the test run faster logging: enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} diff --git a/tests/templates/kuttl/client-spooling/30-assert.yaml b/tests/templates/kuttl/client-spooling/30-assert.yaml index b533736..2adcaa9 100644 --- a/tests/templates/kuttl/client-spooling/30-assert.yaml +++ b/tests/templates/kuttl/client-spooling/30-assert.yaml @@ -1,7 +1,7 @@ --- apiVersion: kuttl.dev/v1beta1 kind: TestAssert -timeout: 600 +timeout: 1200 --- apiVersion: batch/v1 kind: Job diff --git a/tests/templates/kuttl/client-spooling/30-test-queries.yaml.j2 b/tests/templates/kuttl/client-spooling/30-test-queries.yaml.j2 index 376746c..5433b37 100644 --- a/tests/templates/kuttl/client-spooling/30-test-queries.yaml.j2 +++ b/tests/templates/kuttl/client-spooling/30-test-queries.yaml.j2 @@ -53,9 +53,26 @@ spec: echo "Submitting queries with big result set to trigger client spooling" # Test big query result, so client spooling is used BIG_RESULT_QUERY="SELECT * FROM tpch.sf100.customer" + EXPECTED_ROWS=15000000 + for COORDINATOR in "${COORDINATORS[@]}"; do echo "Running query with big result set against $COORDINATOR" - echo "$BIG_RESULT_QUERY" | java -jar trino-cli-executable.jar --server $COORDINATOR --insecure --user $TRINO_USER --password > /dev/null + ROW_COUNT=$( + echo "$BIG_RESULT_QUERY" | \ + java -jar trino-cli-executable.jar \ + --server "$COORDINATOR" \ + --insecure \ + --user "$TRINO_USER" \ + --password \ + | wc -l + ) + + if [ "$ROW_COUNT" -ne "$EXPECTED_ROWS" ]; then + echo "❌ Assertion failed on $COORDINATOR: expected $EXPECTED_ROWS rows, got $ROW_COUNT" + exit 1 + else + echo "✅ Assertion passed on $COORDINATOR: $ROW_COUNT rows" + fi done echo "All queries completed successfully." diff --git a/trino-lb-core/src/trino_api.rs b/trino-lb-core/src/trino_api.rs index d03dcc5..4e9c831 100644 --- a/trino-lb-core/src/trino_api.rs +++ b/trino-lb-core/src/trino_api.rs @@ -86,6 +86,23 @@ pub struct Stat { } impl TrinoQueryApiResponse { + pub fn is_query_finished(&self) -> bool { + self.stats.state == "FINISHED" + } + + #[instrument( + skip(self), + fields(trino_lb_addr = %trino_lb_addr), + )] + pub fn change_next_uri_to_trino_lb(&mut self, trino_lb_addr: &Url) -> Result<(), Error> { + if let Some(next_uri) = &self.next_uri { + let next_uri = Url::parse(next_uri).context(ParseNextUriFromTrinoSnafu)?; + self.next_uri = Some(change_next_uri_to_trino_lb(&next_uri, trino_lb_addr).to_string()); + } + + Ok(()) + } + #[instrument( skip(query), fields(trino_lb_addr = %trino_lb_addr), @@ -155,19 +172,6 @@ impl TrinoQueryApiResponse { update_count: None, }) } - - #[instrument( - skip(self), - fields(trino_lb_addr = %trino_lb_addr), - )] - pub fn change_next_uri_to_trino_lb(&mut self, trino_lb_addr: &Url) -> Result<(), Error> { - if let Some(next_uri) = &self.next_uri { - let next_uri = Url::parse(next_uri).context(ParseNextUriFromTrinoSnafu)?; - self.next_uri = Some(change_next_uri_to_trino_lb(&next_uri, trino_lb_addr).to_string()); - } - - Ok(()) - } } fn change_next_uri_to_trino_lb(next_uri: &Url, trino_lb_addr: &Url) -> Url { diff --git a/trino-lb/src/http_server/v1/statement.rs b/trino-lb/src/http_server/v1/statement.rs index be55120..47f1ae2 100644 --- a/trino-lb/src/http_server/v1/statement.rs +++ b/trino-lb/src/http_server/v1/statement.rs @@ -437,7 +437,7 @@ async fn handle_query_running_on_trino( .persistence .load_query(&query_id) .await - .context(StoreQueryInPersistenceSnafu { + .context(LoadQueryFromPersistenceSnafu { query_id: query_id.clone(), })?; @@ -455,13 +455,11 @@ async fn handle_query_running_on_trino( .await .context(AskTrinoForQueryStateSnafu)?; - if trino_query_api_response.next_uri.is_some() { - // Change the nextUri to actually point to trino-lb instead of Trino. - trino_query_api_response - .change_next_uri_to_trino_lb(&state.config.trino_lb.external_address) - .context(ModifyNextUriSnafu)?; - } else { - info!(%query_id, "Query completed (no next_uri send)"); + // Just to be safe the query needs to be completed and not contain any nextUri for the client + // to call to, before being considered done. We don't expect any future calls to done queries, + // so we can (hopefully) safely remove them from the persistence. + if trino_query_api_response.is_query_finished() && trino_query_api_response.next_uri.is_none() { + info!(%query_id, "Query completed, removing it from the persistence"); tokio::try_join!( state.persistence.remove_query(&query_id).map_err(|err| { @@ -480,6 +478,11 @@ async fn handle_query_running_on_trino( } }), )?; + } else { + // Change the nextUri to actually point to trino-lb instead of Trino. + trino_query_api_response + .change_next_uri_to_trino_lb(&state.config.trino_lb.external_address) + .context(ModifyNextUriSnafu)?; } Ok((trino_headers, Json(trino_query_api_response))) @@ -582,7 +585,7 @@ async fn cancel_query_on_trino( .persistence .load_query(&query_id) .await - .context(StoreQueryInPersistenceSnafu { + .context(LoadQueryFromPersistenceSnafu { query_id: query_id.clone(), })?;