Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ All notable changes to this project will be documented in this file.
### Fixed

- Set connection and response timeout for Redis connections ([#85]).
- Only remove queries from the persistence in case they don't send a `nextUri` and are in state `FINISHED` ([#98]).

[#68]: https://github.com/stackabletech/trino-lb/pull/68
[#85]: https://github.com/stackabletech/trino-lb/pull/85
[#86]: https://github.com/stackabletech/trino-lb/pull/86
[#91]: https://github.com/stackabletech/trino-lb/pull/91
[#95]: https://github.com/stackabletech/trino-lb/pull/95
[#98]: https://github.com/stackabletech/trino-lb/pull/98

## [0.5.0] - 2025-03-14

Expand Down
27 changes: 27 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,30 @@ Last but not least run
```bash
./scripts/run_tests.sh
```

## mitm proxy debugging

First, port-forward trino-lb to localhost:

```bash
kubectl -n kuttl-test-more-titmouse port-forward svc/trino-lb 8443:8443
```

Please make sure that `trinoLb.externalAddress` in your trino-lb config points to `https://127.0.0.1:8443`, so that it populates the nextUri correctly.

Than start `mitmproxy`:

```bash
# nix-shell -p mitmproxy
mitmproxy --listen-port 8080 --ssl-insecure
```

Afterwards connect with trino-cli:

```bash
~/Downloads/trino-cli-478 --server https://127.0.0.1:8443 --insecure --user admin --password --http-proxy=127.0.0.1:8080

export TRINO_USER="admin"
export TRINO_PASSWORD="adminadmin"
echo 'SELECT * FROM tpch.sf100.customer' | ~/Downloads/trino-cli-478 --server https://127.0.0.1:8443 --insecure --password --http-proxy 127.0.0.1:8080
```
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ spec:
min: 250m
max: "1"
memory:
limit: 3Gi
limit: 5Gi
gracefulShutdownTimeout: 2m # Let the test run faster
logging:
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
Expand Down
2 changes: 1 addition & 1 deletion tests/templates/kuttl/client-spooling/30-assert.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 600
timeout: 1200
---
apiVersion: batch/v1
kind: Job
Expand Down
19 changes: 18 additions & 1 deletion tests/templates/kuttl/client-spooling/30-test-queries.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,26 @@ spec:
echo "Submitting queries with big result set to trigger client spooling"
# Test big query result, so client spooling is used
BIG_RESULT_QUERY="SELECT * FROM tpch.sf100.customer"
EXPECTED_ROWS=15000000

for COORDINATOR in "${COORDINATORS[@]}"; do
echo "Running query with big result set against $COORDINATOR"
echo "$BIG_RESULT_QUERY" | java -jar trino-cli-executable.jar --server $COORDINATOR --insecure --user $TRINO_USER --password > /dev/null
ROW_COUNT=$(
echo "$BIG_RESULT_QUERY" | \
java -jar trino-cli-executable.jar \
--server "$COORDINATOR" \
--insecure \
--user "$TRINO_USER" \
--password \
| wc -l
)

if [ "$ROW_COUNT" -ne "$EXPECTED_ROWS" ]; then
echo "❌ Assertion failed on $COORDINATOR: expected $EXPECTED_ROWS rows, got $ROW_COUNT"
exit 1
else
echo "✅ Assertion passed on $COORDINATOR: $ROW_COUNT rows"
fi
done

echo "All queries completed successfully."
Expand Down
30 changes: 17 additions & 13 deletions trino-lb-core/src/trino_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,23 @@ pub struct Stat {
}

impl TrinoQueryApiResponse {
pub fn is_query_finished(&self) -> bool {
self.stats.state == "FINISHED"
}

#[instrument(
skip(self),
fields(trino_lb_addr = %trino_lb_addr),
)]
pub fn change_next_uri_to_trino_lb(&mut self, trino_lb_addr: &Url) -> Result<(), Error> {
if let Some(next_uri) = &self.next_uri {
let next_uri = Url::parse(next_uri).context(ParseNextUriFromTrinoSnafu)?;
self.next_uri = Some(change_next_uri_to_trino_lb(&next_uri, trino_lb_addr).to_string());
}

Ok(())
}

#[instrument(
skip(query),
fields(trino_lb_addr = %trino_lb_addr),
Expand Down Expand Up @@ -155,19 +172,6 @@ impl TrinoQueryApiResponse {
update_count: None,
})
}

#[instrument(
skip(self),
fields(trino_lb_addr = %trino_lb_addr),
)]
pub fn change_next_uri_to_trino_lb(&mut self, trino_lb_addr: &Url) -> Result<(), Error> {
if let Some(next_uri) = &self.next_uri {
let next_uri = Url::parse(next_uri).context(ParseNextUriFromTrinoSnafu)?;
self.next_uri = Some(change_next_uri_to_trino_lb(&next_uri, trino_lb_addr).to_string());
}

Ok(())
}
}

fn change_next_uri_to_trino_lb(next_uri: &Url, trino_lb_addr: &Url) -> Url {
Expand Down
21 changes: 12 additions & 9 deletions trino-lb/src/http_server/v1/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ async fn handle_query_running_on_trino(
.persistence
.load_query(&query_id)
.await
.context(StoreQueryInPersistenceSnafu {
.context(LoadQueryFromPersistenceSnafu {
query_id: query_id.clone(),
})?;

Expand All @@ -455,13 +455,11 @@ async fn handle_query_running_on_trino(
.await
.context(AskTrinoForQueryStateSnafu)?;

if trino_query_api_response.next_uri.is_some() {
// Change the nextUri to actually point to trino-lb instead of Trino.
trino_query_api_response
.change_next_uri_to_trino_lb(&state.config.trino_lb.external_address)
.context(ModifyNextUriSnafu)?;
} else {
info!(%query_id, "Query completed (no next_uri send)");
// Just to be safe the query needs to be completed and not contain any nextUri for the client
// to call to, before being considered done. We don't expect any future calls to done queries,
// so we can (hopefully) safely remove them from the persistence.
if trino_query_api_response.is_query_finished() && trino_query_api_response.next_uri.is_none() {
info!(%query_id, "Query completed, removing it from the persistence");

tokio::try_join!(
state.persistence.remove_query(&query_id).map_err(|err| {
Expand All @@ -480,6 +478,11 @@ async fn handle_query_running_on_trino(
}
}),
)?;
} else {
// Change the nextUri to actually point to trino-lb instead of Trino.
trino_query_api_response
.change_next_uri_to_trino_lb(&state.config.trino_lb.external_address)
.context(ModifyNextUriSnafu)?;
}

Ok((trino_headers, Json(trino_query_api_response)))
Expand Down Expand Up @@ -582,7 +585,7 @@ async fn cancel_query_on_trino(
.persistence
.load_query(&query_id)
.await
.context(StoreQueryInPersistenceSnafu {
.context(LoadQueryFromPersistenceSnafu {
query_id: query_id.clone(),
})?;

Expand Down
Loading