Skip to content

Commit 898b30d

Browse files
committed
adapter: WIP, fix some out-of-order bugs in QueryTracker
1 parent 7a3ab18 commit 898b30d

2 files changed

Lines changed: 16 additions & 4 deletions

File tree

src/adapter/src/coord/peek.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ use uuid::Uuid;
5454

5555
use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
5656
use crate::coord::timestamp_selection::TimestampDetermination;
57-
use crate::query_tracker::QueryTrackerCmd;
5857
use crate::optimize::OptimizerError;
58+
use crate::query_tracker::QueryTrackerCmd;
5959
use crate::statement_logging::WatchSetCreation;
6060
use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
6161
use crate::util::ResultExt;
@@ -841,6 +841,7 @@ impl crate::coord::Coordinator {
841841
mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
842842
.get(self.catalog().system_config().dyncfgs());
843843

844+
let cluster_name = self.catalog().get_cluster(compute_instance).name.clone();
844845
let peek_response_stream = Self::create_peek_response_stream(
845846
rows_rx,
846847
finishing,
@@ -850,6 +851,10 @@ impl crate::coord::Coordinator {
850851
persist_client,
851852
peek_stash_read_batch_size_bytes,
852853
peek_stash_read_memory_budget_bytes,
854+
Some(format!(
855+
"query could not complete because cluster \"{}\" was dropped",
856+
cluster_name
857+
)),
853858
);
854859

855860
Ok(crate::ExecuteResponse::SendingRowsStreaming {
@@ -872,14 +877,19 @@ impl crate::coord::Coordinator {
872877
mut persist_client: mz_persist_client::PersistClient,
873878
peek_stash_read_batch_size_bytes: usize,
874879
peek_stash_read_memory_budget_bytes: usize,
880+
rows_rx_err: Option<String>,
875881
) -> impl futures::Stream<Item = PeekResponseUnary> {
876882
async_stream::stream!({
877883
let result = rows_rx.await;
878884

879885
let rows = match result {
880886
Ok(rows) => rows,
881887
Err(e) => {
882-
yield PeekResponseUnary::Error(e.to_string());
888+
if let Some(msg) = rows_rx_err {
889+
yield PeekResponseUnary::Error(msg);
890+
} else {
891+
yield PeekResponseUnary::Error(e.to_string());
892+
}
883893
return;
884894
}
885895
};

src/adapter/src/peek_client.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ use crate::catalog::Catalog;
3232
use crate::command::{CatalogSnapshot, Command};
3333
use crate::coord::Coordinator;
3434
use crate::coord::peek::FastPathPlan;
35+
use crate::query_tracker::QueryTrackerCmd;
3536
use crate::statement_logging::WatchSetCreation;
3637
use crate::statement_logging::{
3738
FrontendStatementLoggingEvent, PreparedStatementEvent, StatementLoggingFrontend,
3839
StatementLoggingId,
3940
};
40-
use crate::query_tracker::QueryTrackerCmd;
4141
use crate::{AdapterError, Client, CollectionIdBundle, ReadHolds, statement_logging};
4242

4343
/// Storage collections trait alias we need to consult for since/frontiers.
@@ -390,7 +390,8 @@ impl PeekClient {
390390
if let Err(err) = peek_result {
391391
// Clean up the registered peek since the peek failed to issue.
392392
// The frontend will handle statement logging for the error.
393-
self.query_tracker.send(QueryTrackerCmd::UntrackPeek { uuid });
393+
self.query_tracker
394+
.send(QueryTrackerCmd::UntrackPeek { uuid });
394395
return Err(AdapterError::concurrent_dependency_drop_from_peek_error(
395396
err,
396397
compute_instance,
@@ -406,6 +407,7 @@ impl PeekClient {
406407
self.persist_client.clone(),
407408
peek_stash_read_batch_size_bytes,
408409
peek_stash_read_memory_budget_bytes,
410+
None,
409411
);
410412

411413
Ok(crate::ExecuteResponse::SendingRowsStreaming {

0 commit comments

Comments
 (0)