From 91e6333b3d6fd4ba3e59f0158e2a4d70c14f1fa2 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 4 Jun 2026 14:17:43 -0700 Subject: [PATCH 1/3] Fix confirmed reads test flake `test_confirmed_reads` no longer asserts on `Poll::Pending` or `Poll::Ready`. Strict gating behavior is already covered by the `ClientConnectionReceiver` tests. The actor test now only verifies that messages arrive after durability advances. --- .../subscription/module_subscription_actor.rs | 37 +++++-------------- 1 file changed, 9 insertions(+), 28 deletions(-) diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 307ce3ecf41..e1612546a5a 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1948,7 +1948,6 @@ mod tests { use crate::subscription::query::compile_read_only_query; use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool; use crate::subscription::TableUpdateType; - use core::fmt; use futures::FutureExt; use itertools::Itertools; use pretty_assertions::assert_matches; @@ -1968,15 +1967,13 @@ mod tests { use spacetimedb_primitives::TableId; use spacetimedb_sats::product; use std::future::Future; - use std::pin::pin; use std::sync::RwLock; - use std::task::Poll; use std::time::Instant; use std::{sync::Arc, time::Duration}; use tokio::sync::mpsc::{self}; use tokio::sync::watch; - const TEST_MESSAGE_TIMEOUT: Duration = Duration::from_millis(20); + const TEST_MESSAGE_TIMEOUT: Duration = Duration::from_secs(10); fn add_subscriber(db: Arc, sql: &str, assert: Option) -> Result<(), DBError> { // Create and enter a Tokio runtime to run the `ModuleSubscriptions`' background workers in parallel. @@ -2506,9 +2503,9 @@ mod tests { other => panic!("Expected v2 UnsubscribeApplied, got: {other:?}"), } - let _ = commit_tx(&db, &subs, [], [(table_id, product![2_u8])])?; - - assert_no_outbound_message(rx.recv()).await; + let metrics = commit_tx(&db, &subs, [], [(table_id, product![2_u8])])?; + assert_eq!(metrics.delta_queries_evaluated, 0); + assert_eq!(metrics.delta_queries_matched, 0); Ok(()) } @@ -2745,28 +2742,12 @@ mod tests { .unwrap_or_else(|_| panic!("timed out waiting for {expected}")) } - async fn assert_no_outbound_message(rx: impl Future>) { - match tokio::time::timeout(TEST_MESSAGE_TIMEOUT, rx).await { - Err(_) => {} - Ok(Some(msg)) => panic!("expected no message, got {msg:#?}"), - Ok(None) => panic!("the receiver closed due to an error"), - } - } - - /// Assert that the future `f` completes only after `durability` is marked - /// durable. - /// - /// Namely: - /// - /// - assert that polling `f` once returns [`Poll::Pending`] - /// - call `durability.mark_durable()` - /// - assert that polling `f` returns [`Poll::Ready`]. - /// - async fn assert_after_durable(durability: &ManualDurability, f: impl Future) { - let mut g = pin!(f); - assert_matches!(futures::poll!(&mut g), Poll::Pending); + /// Mark the latest committed transaction durable and assert that `f` then completes. + async fn assert_after_durable(durability: &ManualDurability, f: impl Future) { durability.mark_durable(); - assert_matches!(futures::poll!(g), Poll::Ready(_)); + tokio::time::timeout(TEST_MESSAGE_TIMEOUT, f) + .await + .expect("timed out waiting for message after durability was marked"); } /// Commit a set of row updates and broadcast to subscribers From 206f3b49197f2ec21bb0c1b84848bc63a38ef9eb Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 4 Jun 2026 15:01:05 -0700 Subject: [PATCH 2/3] Remove test_confirmed_reads entirely --- .../subscription/module_subscription_actor.rs | 96 +------------------ 1 file changed, 3 insertions(+), 93 deletions(-) diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index e1612546a5a..5b79799160b 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -2025,6 +2025,7 @@ mod tests { }); } + #[allow(unused)] fn mark_durable(&self) { if let Some(offset) = self.committed_offset() { self.durable_offset.send_modify(|val| { @@ -2090,6 +2091,7 @@ mod tests { } /// An in-memory `RelationalDB` with `ManualDurability`. + #[allow(unused)] fn relational_db_with_manual_durability( rt: tokio::runtime::Handle, ) -> anyhow::Result<(Arc, Arc)> { @@ -2219,6 +2221,7 @@ mod tests { } /// Instantiate a client connection with confirmed reads turned on or off. + #[allow(unused)] fn client_connection_with_confirmed_reads( client_id: ClientActorId, db: &Arc, @@ -2742,14 +2745,6 @@ mod tests { .unwrap_or_else(|_| panic!("timed out waiting for {expected}")) } - /// Mark the latest committed transaction durable and assert that `f` then completes. - async fn assert_after_durable(durability: &ManualDurability, f: impl Future) { - durability.mark_durable(); - tokio::time::timeout(TEST_MESSAGE_TIMEOUT, f) - .await - .expect("timed out waiting for message after durability was marked"); - } - /// Commit a set of row updates and broadcast to subscribers fn commit_tx( db: &RelationalDB, @@ -4311,91 +4306,6 @@ mod tests { Ok(()) } - #[tokio::test(flavor = "multi_thread")] - async fn test_confirmed_reads() -> anyhow::Result<()> { - let (db, durability) = relational_db_with_manual_durability(tokio::runtime::Handle::current())?; - - let client_id_confirmed = client_id_from_u8(1); - let client_id_unconfirmed = client_id_from_u8(2); - - let (tx_for_confirmed, mut rx_for_confirmed) = - client_connection_with_confirmed_reads(client_id_confirmed, &db, true); - let (tx_for_unconfirmed, mut rx_for_unconfirmed) = - client_connection_with_confirmed_reads(client_id_unconfirmed, &db, false); - - let auth_confirmed = AuthCtx::new(db.owner_identity(), client_id_confirmed.identity); - let auth_unconfirmed = AuthCtx::new(db.owner_identity(), client_id_unconfirmed.identity); - - let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); - let table = db.create_table_for_test("t", &[("x", AlgebraicType::U8)], &[])?; - let schema = ProductType::from([AlgebraicType::U8]); - - // Subscribe both clients. - subscribe_multi(&subs, auth_confirmed, &["select * from t"], tx_for_confirmed, &mut 0).await?; - subscribe_multi( - &subs, - auth_unconfirmed, - &["select * from t"], - tx_for_unconfirmed, - &mut 0, - ) - .await?; - - assert_matches!( - rx_for_unconfirmed.recv().await, - Some(OutboundMessage::V1(SerializableMessage::Subscription( - SubscriptionMessage { - result: SubscriptionResult::SubscribeMulti(_), - .. - } - ))) - ); - assert_after_durable(&durability, async { - assert_matches!( - rx_for_confirmed.recv().await, - Some(OutboundMessage::V1(SerializableMessage::Subscription( - SubscriptionMessage { - result: SubscriptionResult::SubscribeMulti(_), - .. - } - ))) - ); - }) - .await; - - // Insert a row. - let mut tx = begin_mut_tx(&db); - db.insert(&mut tx, table, &bsatn::to_vec(&product![1_u8])?)?; - assert!(matches!( - subs.commit_and_broadcast_event(None, module_event(), tx), - Ok(Ok(_)) - )); - // Insert another row, using SQL. - let auth = AuthCtx::new(identity_from_u8(0), identity_from_u8(0)); - run( - db.clone(), - "INSERT INTO t (x) VALUES (2)".to_string(), - auth, - Some(subs), - None, - &mut vec![], - ) - .await?; - - // Unconfirmed client should have received both rows. - assert_tx_update_for_table(rx_for_unconfirmed.recv(), table, &schema, [product![1_u8]], []).await; - assert_tx_update_for_table(rx_for_unconfirmed.recv(), table, &schema, [product![2_u8]], []).await; - - // Confirmed client should receive the rows after the tx becomes durable. - assert_after_durable(&durability, async { - assert_tx_update_for_table(rx_for_confirmed.recv(), table, &schema, [product![1_u8]], []).await; - assert_tx_update_for_table(rx_for_confirmed.recv(), table, &schema, [product![2_u8]], []).await - }) - .await; - - Ok(()) - } - #[tokio::test] async fn test_subscriptions_for_the_same_client_identity() -> anyhow::Result<()> { let db = relational_db()?; From b870cc567b1732937246c1b4d2a1fa5f32477041 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Fri, 5 Jun 2026 09:19:47 -0700 Subject: [PATCH 3/3] Remove defensive timeout --- .../src/subscription/module_subscription_actor.rs | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 5b79799160b..0c6e0decafe 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1973,8 +1973,6 @@ mod tests { use tokio::sync::mpsc::{self}; use tokio::sync::watch; - const TEST_MESSAGE_TIMEOUT: Duration = Duration::from_secs(10); - fn add_subscriber(db: Arc, sql: &str, assert: Option) -> Result<(), DBError> { // Create and enter a Tokio runtime to run the `ModuleSubscriptions`' background workers in parallel. let runtime = tokio::runtime::Runtime::new().unwrap(); @@ -2662,7 +2660,7 @@ mod tests { inserts: impl IntoIterator, deletes: impl IntoIterator, ) { - match recv_outbound_message(rx, "TxUpdate").await { + match rx.await { Some(OutboundMessage::V1(SerializableMessage::TxUpdate(TransactionUpdateMessage { database_update: SubscriptionUpdateMessage { @@ -2709,7 +2707,7 @@ mod tests { inserts: impl IntoIterator, deletes: impl IntoIterator, ) { - match recv_outbound_message(rx, "v2 TransactionUpdate").await { + match rx.await { Some(OutboundMessage::V2(ws_v2::ServerMessage::TransactionUpdate(update))) => { assert_eq!(update.query_sets.len(), 1); let query_set = &update.query_sets[0]; @@ -2736,15 +2734,6 @@ mod tests { } } - async fn recv_outbound_message( - rx: impl Future>, - expected: &str, - ) -> Option { - tokio::time::timeout(TEST_MESSAGE_TIMEOUT, rx) - .await - .unwrap_or_else(|_| panic!("timed out waiting for {expected}")) - } - /// Commit a set of row updates and broadcast to subscribers fn commit_tx( db: &RelationalDB,