diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 307ce3ecf41..0c6e0decafe 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1948,7 +1948,6 @@ mod tests { use crate::subscription::query::compile_read_only_query; use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool; use crate::subscription::TableUpdateType; - use core::fmt; use futures::FutureExt; use itertools::Itertools; use pretty_assertions::assert_matches; @@ -1968,16 +1967,12 @@ mod tests { use spacetimedb_primitives::TableId; use spacetimedb_sats::product; use std::future::Future; - use std::pin::pin; use std::sync::RwLock; - use std::task::Poll; use std::time::Instant; use std::{sync::Arc, time::Duration}; use tokio::sync::mpsc::{self}; use tokio::sync::watch; - const TEST_MESSAGE_TIMEOUT: Duration = Duration::from_millis(20); - fn add_subscriber(db: Arc, sql: &str, assert: Option) -> Result<(), DBError> { // Create and enter a Tokio runtime to run the `ModuleSubscriptions`' background workers in parallel. let runtime = tokio::runtime::Runtime::new().unwrap(); @@ -2028,6 +2023,7 @@ mod tests { }); } + #[allow(unused)] fn mark_durable(&self) { if let Some(offset) = self.committed_offset() { self.durable_offset.send_modify(|val| { @@ -2093,6 +2089,7 @@ mod tests { } /// An in-memory `RelationalDB` with `ManualDurability`. + #[allow(unused)] fn relational_db_with_manual_durability( rt: tokio::runtime::Handle, ) -> anyhow::Result<(Arc, Arc)> { @@ -2222,6 +2219,7 @@ mod tests { } /// Instantiate a client connection with confirmed reads turned on or off. + #[allow(unused)] fn client_connection_with_confirmed_reads( client_id: ClientActorId, db: &Arc, @@ -2506,9 +2504,9 @@ mod tests { other => panic!("Expected v2 UnsubscribeApplied, got: {other:?}"), } - let _ = commit_tx(&db, &subs, [], [(table_id, product![2_u8])])?; - - assert_no_outbound_message(rx.recv()).await; + let metrics = commit_tx(&db, &subs, [], [(table_id, product![2_u8])])?; + assert_eq!(metrics.delta_queries_evaluated, 0); + assert_eq!(metrics.delta_queries_matched, 0); Ok(()) } @@ -2662,7 +2660,7 @@ mod tests { inserts: impl IntoIterator, deletes: impl IntoIterator, ) { - match recv_outbound_message(rx, "TxUpdate").await { + match rx.await { Some(OutboundMessage::V1(SerializableMessage::TxUpdate(TransactionUpdateMessage { database_update: SubscriptionUpdateMessage { @@ -2709,7 +2707,7 @@ mod tests { inserts: impl IntoIterator, deletes: impl IntoIterator, ) { - match recv_outbound_message(rx, "v2 TransactionUpdate").await { + match rx.await { Some(OutboundMessage::V2(ws_v2::ServerMessage::TransactionUpdate(update))) => { assert_eq!(update.query_sets.len(), 1); let query_set = &update.query_sets[0]; @@ -2736,39 +2734,6 @@ mod tests { } } - async fn recv_outbound_message( - rx: impl Future>, - expected: &str, - ) -> Option { - tokio::time::timeout(TEST_MESSAGE_TIMEOUT, rx) - .await - .unwrap_or_else(|_| panic!("timed out waiting for {expected}")) - } - - async fn assert_no_outbound_message(rx: impl Future>) { - match tokio::time::timeout(TEST_MESSAGE_TIMEOUT, rx).await { - Err(_) => {} - Ok(Some(msg)) => panic!("expected no message, got {msg:#?}"), - Ok(None) => panic!("the receiver closed due to an error"), - } - } - - /// Assert that the future `f` completes only after `durability` is marked - /// durable. - /// - /// Namely: - /// - /// - assert that polling `f` once returns [`Poll::Pending`] - /// - call `durability.mark_durable()` - /// - assert that polling `f` returns [`Poll::Ready`]. - /// - async fn assert_after_durable(durability: &ManualDurability, f: impl Future) { - let mut g = pin!(f); - assert_matches!(futures::poll!(&mut g), Poll::Pending); - durability.mark_durable(); - assert_matches!(futures::poll!(g), Poll::Ready(_)); - } - /// Commit a set of row updates and broadcast to subscribers fn commit_tx( db: &RelationalDB, @@ -4330,91 +4295,6 @@ mod tests { Ok(()) } - #[tokio::test(flavor = "multi_thread")] - async fn test_confirmed_reads() -> anyhow::Result<()> { - let (db, durability) = relational_db_with_manual_durability(tokio::runtime::Handle::current())?; - - let client_id_confirmed = client_id_from_u8(1); - let client_id_unconfirmed = client_id_from_u8(2); - - let (tx_for_confirmed, mut rx_for_confirmed) = - client_connection_with_confirmed_reads(client_id_confirmed, &db, true); - let (tx_for_unconfirmed, mut rx_for_unconfirmed) = - client_connection_with_confirmed_reads(client_id_unconfirmed, &db, false); - - let auth_confirmed = AuthCtx::new(db.owner_identity(), client_id_confirmed.identity); - let auth_unconfirmed = AuthCtx::new(db.owner_identity(), client_id_unconfirmed.identity); - - let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); - let table = db.create_table_for_test("t", &[("x", AlgebraicType::U8)], &[])?; - let schema = ProductType::from([AlgebraicType::U8]); - - // Subscribe both clients. - subscribe_multi(&subs, auth_confirmed, &["select * from t"], tx_for_confirmed, &mut 0).await?; - subscribe_multi( - &subs, - auth_unconfirmed, - &["select * from t"], - tx_for_unconfirmed, - &mut 0, - ) - .await?; - - assert_matches!( - rx_for_unconfirmed.recv().await, - Some(OutboundMessage::V1(SerializableMessage::Subscription( - SubscriptionMessage { - result: SubscriptionResult::SubscribeMulti(_), - .. - } - ))) - ); - assert_after_durable(&durability, async { - assert_matches!( - rx_for_confirmed.recv().await, - Some(OutboundMessage::V1(SerializableMessage::Subscription( - SubscriptionMessage { - result: SubscriptionResult::SubscribeMulti(_), - .. - } - ))) - ); - }) - .await; - - // Insert a row. - let mut tx = begin_mut_tx(&db); - db.insert(&mut tx, table, &bsatn::to_vec(&product![1_u8])?)?; - assert!(matches!( - subs.commit_and_broadcast_event(None, module_event(), tx), - Ok(Ok(_)) - )); - // Insert another row, using SQL. - let auth = AuthCtx::new(identity_from_u8(0), identity_from_u8(0)); - run( - db.clone(), - "INSERT INTO t (x) VALUES (2)".to_string(), - auth, - Some(subs), - None, - &mut vec![], - ) - .await?; - - // Unconfirmed client should have received both rows. - assert_tx_update_for_table(rx_for_unconfirmed.recv(), table, &schema, [product![1_u8]], []).await; - assert_tx_update_for_table(rx_for_unconfirmed.recv(), table, &schema, [product![2_u8]], []).await; - - // Confirmed client should receive the rows after the tx becomes durable. - assert_after_durable(&durability, async { - assert_tx_update_for_table(rx_for_confirmed.recv(), table, &schema, [product![1_u8]], []).await; - assert_tx_update_for_table(rx_for_confirmed.recv(), table, &schema, [product![2_u8]], []).await - }) - .await; - - Ok(()) - } - #[tokio::test] async fn test_subscriptions_for_the_same_client_identity() -> anyhow::Result<()> { let db = relational_db()?;