Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 8 additions & 128 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1948,7 +1948,6 @@ mod tests {
use crate::subscription::query::compile_read_only_query;
use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool;
use crate::subscription::TableUpdateType;
use core::fmt;
use futures::FutureExt;
use itertools::Itertools;
use pretty_assertions::assert_matches;
Expand All @@ -1968,16 +1967,12 @@ mod tests {
use spacetimedb_primitives::TableId;
use spacetimedb_sats::product;
use std::future::Future;
use std::pin::pin;
use std::sync::RwLock;
use std::task::Poll;
use std::time::Instant;
use std::{sync::Arc, time::Duration};
use tokio::sync::mpsc::{self};
use tokio::sync::watch;

const TEST_MESSAGE_TIMEOUT: Duration = Duration::from_millis(20);

fn add_subscriber(db: Arc<RelationalDB>, sql: &str, assert: Option<AssertTxFn>) -> Result<(), DBError> {
// Create and enter a Tokio runtime to run the `ModuleSubscriptions`' background workers in parallel.
let runtime = tokio::runtime::Runtime::new().unwrap();
Expand Down Expand Up @@ -2028,6 +2023,7 @@ mod tests {
});
}

#[allow(unused)]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why keep the code marked #[allow(unused)] in this patch?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They looked like useful utilities. But I'm fine removing them as well.

fn mark_durable(&self) {
if let Some(offset) = self.committed_offset() {
self.durable_offset.send_modify(|val| {
Expand Down Expand Up @@ -2093,6 +2089,7 @@ mod tests {
}

/// An in-memory `RelationalDB` with `ManualDurability`.
#[allow(unused)]
fn relational_db_with_manual_durability(
rt: tokio::runtime::Handle,
) -> anyhow::Result<(Arc<RelationalDB>, Arc<ManualDurability>)> {
Expand Down Expand Up @@ -2222,6 +2219,7 @@ mod tests {
}

/// Instantiate a client connection with confirmed reads turned on or off.
#[allow(unused)]
fn client_connection_with_confirmed_reads(
client_id: ClientActorId,
db: &Arc<RelationalDB>,
Expand Down Expand Up @@ -2506,9 +2504,9 @@ mod tests {
other => panic!("Expected v2 UnsubscribeApplied, got: {other:?}"),
}

let _ = commit_tx(&db, &subs, [], [(table_id, product![2_u8])])?;

assert_no_outbound_message(rx.recv()).await;
let metrics = commit_tx(&db, &subs, [], [(table_id, product![2_u8])])?;
assert_eq!(metrics.delta_queries_evaluated, 0);
assert_eq!(metrics.delta_queries_matched, 0);

Ok(())
}
Expand Down Expand Up @@ -2662,7 +2660,7 @@ mod tests {
inserts: impl IntoIterator<Item = ProductValue>,
deletes: impl IntoIterator<Item = ProductValue>,
) {
match recv_outbound_message(rx, "TxUpdate").await {
match rx.await {
Some(OutboundMessage::V1(SerializableMessage::TxUpdate(TransactionUpdateMessage {
database_update:
SubscriptionUpdateMessage {
Expand Down Expand Up @@ -2709,7 +2707,7 @@ mod tests {
inserts: impl IntoIterator<Item = ProductValue>,
deletes: impl IntoIterator<Item = ProductValue>,
) {
match recv_outbound_message(rx, "v2 TransactionUpdate").await {
match rx.await {
Some(OutboundMessage::V2(ws_v2::ServerMessage::TransactionUpdate(update))) => {
assert_eq!(update.query_sets.len(), 1);
let query_set = &update.query_sets[0];
Expand All @@ -2736,39 +2734,6 @@ mod tests {
}
}

async fn recv_outbound_message(
rx: impl Future<Output = Option<OutboundMessage>>,
expected: &str,
) -> Option<OutboundMessage> {
tokio::time::timeout(TEST_MESSAGE_TIMEOUT, rx)
.await
.unwrap_or_else(|_| panic!("timed out waiting for {expected}"))
}

async fn assert_no_outbound_message(rx: impl Future<Output = Option<OutboundMessage>>) {
match tokio::time::timeout(TEST_MESSAGE_TIMEOUT, rx).await {
Err(_) => {}
Ok(Some(msg)) => panic!("expected no message, got {msg:#?}"),
Ok(None) => panic!("the receiver closed due to an error"),
}
}

/// Assert that the future `f` completes only after `durability` is marked
/// durable.
///
/// Namely:
///
/// - assert that polling `f` once returns [`Poll::Pending`]
/// - call `durability.mark_durable()`
/// - assert that polling `f` returns [`Poll::Ready`].
///
async fn assert_after_durable(durability: &ManualDurability, f: impl Future<Output: fmt::Debug>) {
let mut g = pin!(f);
assert_matches!(futures::poll!(&mut g), Poll::Pending);
durability.mark_durable();
assert_matches!(futures::poll!(g), Poll::Ready(_));
}

/// Commit a set of row updates and broadcast to subscribers
fn commit_tx(
db: &RelationalDB,
Expand Down Expand Up @@ -4330,91 +4295,6 @@ mod tests {
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_confirmed_reads() -> anyhow::Result<()> {
let (db, durability) = relational_db_with_manual_durability(tokio::runtime::Handle::current())?;

let client_id_confirmed = client_id_from_u8(1);
let client_id_unconfirmed = client_id_from_u8(2);

let (tx_for_confirmed, mut rx_for_confirmed) =
client_connection_with_confirmed_reads(client_id_confirmed, &db, true);
let (tx_for_unconfirmed, mut rx_for_unconfirmed) =
client_connection_with_confirmed_reads(client_id_unconfirmed, &db, false);

let auth_confirmed = AuthCtx::new(db.owner_identity(), client_id_confirmed.identity);
let auth_unconfirmed = AuthCtx::new(db.owner_identity(), client_id_unconfirmed.identity);

let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
let table = db.create_table_for_test("t", &[("x", AlgebraicType::U8)], &[])?;
let schema = ProductType::from([AlgebraicType::U8]);

// Subscribe both clients.
subscribe_multi(&subs, auth_confirmed, &["select * from t"], tx_for_confirmed, &mut 0).await?;
subscribe_multi(
&subs,
auth_unconfirmed,
&["select * from t"],
tx_for_unconfirmed,
&mut 0,
)
.await?;

assert_matches!(
rx_for_unconfirmed.recv().await,
Some(OutboundMessage::V1(SerializableMessage::Subscription(
SubscriptionMessage {
result: SubscriptionResult::SubscribeMulti(_),
..
}
)))
);
assert_after_durable(&durability, async {
assert_matches!(
rx_for_confirmed.recv().await,
Some(OutboundMessage::V1(SerializableMessage::Subscription(
SubscriptionMessage {
result: SubscriptionResult::SubscribeMulti(_),
..
}
)))
);
})
.await;

// Insert a row.
let mut tx = begin_mut_tx(&db);
db.insert(&mut tx, table, &bsatn::to_vec(&product![1_u8])?)?;
assert!(matches!(
subs.commit_and_broadcast_event(None, module_event(), tx),
Ok(Ok(_))
));
// Insert another row, using SQL.
let auth = AuthCtx::new(identity_from_u8(0), identity_from_u8(0));
run(
db.clone(),
"INSERT INTO t (x) VALUES (2)".to_string(),
auth,
Some(subs),
None,
&mut vec![],
)
.await?;

// Unconfirmed client should have received both rows.
assert_tx_update_for_table(rx_for_unconfirmed.recv(), table, &schema, [product![1_u8]], []).await;
assert_tx_update_for_table(rx_for_unconfirmed.recv(), table, &schema, [product![2_u8]], []).await;

// Confirmed client should receive the rows after the tx becomes durable.
assert_after_durable(&durability, async {
assert_tx_update_for_table(rx_for_confirmed.recv(), table, &schema, [product![1_u8]], []).await;
assert_tx_update_for_table(rx_for_confirmed.recv(), table, &schema, [product![2_u8]], []).await
})
.await;

Ok(())
}

#[tokio::test]
async fn test_subscriptions_for_the_same_client_identity() -> anyhow::Result<()> {
let db = relational_db()?;
Expand Down
Loading