diff --git a/Cargo.lock b/Cargo.lock
index 764abd78fb5..f52a3ac10b8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5045,6 +5045,7 @@ dependencies = [
"bytes",
"derive_more",
"enum-as-inner",
+ "enum-map",
"hex",
"insta",
"itertools 0.12.1",
@@ -5154,6 +5155,7 @@ version = "1.0.0-rc3"
dependencies = [
"anyhow",
"enum-as-inner",
+ "enum-map",
"hashbrown 0.15.1",
"indexmap 2.6.0",
"itertools 0.12.1",
diff --git a/crates/bindings-csharp/BSATN.Codegen/BSATN.Codegen.csproj b/crates/bindings-csharp/BSATN.Codegen/BSATN.Codegen.csproj
index ee3d8f3a08e..12162cfa25e 100644
--- a/crates/bindings-csharp/BSATN.Codegen/BSATN.Codegen.csproj
+++ b/crates/bindings-csharp/BSATN.Codegen/BSATN.Codegen.csproj
@@ -2,7 +2,7 @@
SpacetimeDB.BSATN.Codegen
- 1.0.0-rc3
+ 1.0.0-rc3-hotfix3
SpacetimeDB BSATN Codegen
The SpacetimeDB BSATN Codegen implements the Roslyn incremental generators for BSATN serialization/deserialization in C#.
diff --git a/crates/bindings-csharp/BSATN.Runtime.Tests/Tests.cs b/crates/bindings-csharp/BSATN.Runtime.Tests/Tests.cs
index 8a49b9a690e..938cf9f1a28 100644
--- a/crates/bindings-csharp/BSATN.Runtime.Tests/Tests.cs
+++ b/crates/bindings-csharp/BSATN.Runtime.Tests/Tests.cs
@@ -139,4 +139,16 @@ public static void NonHexStrings()
() => Address.FromHexString("these are not hex characters....")
);
}
+
+ [Fact]
+ public static void TimestampConversionChecks()
+ {
+ ulong us = 1737582793990639;
+ var time = ScheduleAt.DateTimeOffsetFromMicrosSinceUnixEpoch(us);
+
+ Assert.Equal(ScheduleAt.ToMicrosecondsSinceUnixEpoch(time), us);
+
+ var interval = ScheduleAt.TimeSpanFromMicroseconds(us);
+ Assert.Equal(ScheduleAt.ToMicroseconds(interval), us);
+ }
}
diff --git a/crates/bindings-csharp/BSATN.Runtime/BSATN.Runtime.csproj b/crates/bindings-csharp/BSATN.Runtime/BSATN.Runtime.csproj
index 0262e986899..1212754c544 100644
--- a/crates/bindings-csharp/BSATN.Runtime/BSATN.Runtime.csproj
+++ b/crates/bindings-csharp/BSATN.Runtime/BSATN.Runtime.csproj
@@ -2,7 +2,7 @@
SpacetimeDB.BSATN.Runtime
- 1.0.0-rc3
+ 1.0.0-rc3-hotfix3
SpacetimeDB BSATN Runtime
The SpacetimeDB BSATN Runtime implements APIs for BSATN serialization/deserialization in C#.
diff --git a/crates/bindings-csharp/BSATN.Runtime/Builtins.cs b/crates/bindings-csharp/BSATN.Runtime/Builtins.cs
index 72c8f88e2af..50384d47707 100644
--- a/crates/bindings-csharp/BSATN.Runtime/Builtins.cs
+++ b/crates/bindings-csharp/BSATN.Runtime/Builtins.cs
@@ -257,6 +257,31 @@ public sealed record Time(DateTimeOffset Time_) : ScheduleAt;
public static implicit operator ScheduleAt(DateTimeOffset time) => new Time(time);
+ ///
+ /// There are 10 C# Timestamp "Ticks" per microsecond.
+ ///
+ public static readonly ulong TicksPerMicrosecond = 10;
+
+ public static ulong ToMicroseconds(TimeSpan interval)
+ {
+ return (ulong)interval.Ticks / TicksPerMicrosecond;
+ }
+
+ public static TimeSpan TimeSpanFromMicroseconds(ulong intervalMicros)
+ {
+ return TimeSpan.FromTicks((long)(TicksPerMicrosecond * intervalMicros));
+ }
+
+ public static ulong ToMicrosecondsSinceUnixEpoch(DateTimeOffset time)
+ {
+ return ToMicroseconds(time - DateTimeOffset.UnixEpoch);
+ }
+
+ public static DateTimeOffset DateTimeOffsetFromMicrosSinceUnixEpoch(ulong microsSinceUnixEpoch)
+ {
+ return DateTimeOffset.UnixEpoch + TimeSpanFromMicroseconds(microsSinceUnixEpoch);
+ }
+
public readonly partial struct BSATN : IReadWrite
{
[SpacetimeDB.Type]
@@ -289,6 +314,10 @@ public void Write(BinaryWriter writer, ScheduleAt value)
public AlgebraicType GetAlgebraicType(ITypeRegistrar registrar) =>
// Constructing a custom one instead of ScheduleAtRepr.GetAlgebraicType()
// to avoid leaking the internal *Repr wrappers in generated SATS.
+ // We are leveraging the fact that single-element structs are byte-compatible with their elements
+ // when parsing BSATN.
+ // TODO: this might break when working with other formats like JSON, but this is all going to be rewritten
+ // anyway with Phoebe's Timestamp PR.
new AlgebraicType.Sum(
[
new("Interval", new AlgebraicType.U64(default)),
diff --git a/crates/bindings-csharp/BSATN.Runtime/Repr.cs b/crates/bindings-csharp/BSATN.Runtime/Repr.cs
index c196f2ec59c..cf2657c0f58 100644
--- a/crates/bindings-csharp/BSATN.Runtime/Repr.cs
+++ b/crates/bindings-csharp/BSATN.Runtime/Repr.cs
@@ -11,17 +11,17 @@ namespace SpacetimeDB.Internal;
[SpacetimeDB.Type] // we should be able to encode it to BSATN too
public partial struct DateTimeOffsetRepr(DateTimeOffset time)
{
- public ulong MicrosecondsSinceEpoch = (ulong)time.Ticks / 10;
+ public ulong MicrosecondsSinceEpoch = ScheduleAt.ToMicrosecondsSinceUnixEpoch(time);
public readonly DateTimeOffset ToStd() =>
- DateTimeOffset.UnixEpoch.AddTicks(10 * (long)MicrosecondsSinceEpoch);
+ ScheduleAt.DateTimeOffsetFromMicrosSinceUnixEpoch(MicrosecondsSinceEpoch);
}
[StructLayout(LayoutKind.Sequential)] // we should be able to use it in FFI
[SpacetimeDB.Type] // we should be able to encode it to BSATN too
public partial struct TimeSpanRepr(TimeSpan duration)
{
- public ulong Microseconds = (ulong)duration.Ticks / 10;
+ public ulong Microseconds = ScheduleAt.ToMicroseconds(duration);
- public readonly TimeSpan ToStd() => TimeSpan.FromTicks(10 * (long)Microseconds);
+ public readonly TimeSpan ToStd() => ScheduleAt.TimeSpanFromMicroseconds(Microseconds);
}
diff --git a/crates/bindings-csharp/Codegen/Codegen.csproj b/crates/bindings-csharp/Codegen/Codegen.csproj
index 4b63c4c8dd6..307bb18b42a 100644
--- a/crates/bindings-csharp/Codegen/Codegen.csproj
+++ b/crates/bindings-csharp/Codegen/Codegen.csproj
@@ -2,7 +2,7 @@
SpacetimeDB.Codegen
- 1.0.0-rc3
+ 1.0.0-rc3-hotfix3
SpacetimeDB Module Codegen
The SpacetimeDB Codegen implements the Roslyn incremental generators for writing SpacetimeDB modules in C#.
diff --git a/crates/bindings-csharp/Runtime/Runtime.csproj b/crates/bindings-csharp/Runtime/Runtime.csproj
index 3effc165f19..6cca2395a8d 100644
--- a/crates/bindings-csharp/Runtime/Runtime.csproj
+++ b/crates/bindings-csharp/Runtime/Runtime.csproj
@@ -2,7 +2,7 @@
SpacetimeDB.Runtime
- 1.0.0-rc3
+ 1.0.0-rc3-hotfix3
SpacetimeDB Module Runtime
The SpacetimeDB Runtime implements the database runtime bindings for writing SpacetimeDB modules in C#.
diff --git a/crates/bindings-macro/src/reducer.rs b/crates/bindings-macro/src/reducer.rs
index cb55dc90c08..ff98cc6250b 100644
--- a/crates/bindings-macro/src/reducer.rs
+++ b/crates/bindings-macro/src/reducer.rs
@@ -1,13 +1,14 @@
use crate::sym;
-use crate::util::{check_duplicate_msg, match_meta};
+use crate::util::{check_duplicate, check_duplicate_msg, ident_to_litstr, match_meta};
use proc_macro2::{Span, TokenStream};
use quote::{quote, quote_spanned};
use syn::parse::Parser as _;
use syn::spanned::Spanned;
-use syn::{FnArg, Ident, ItemFn};
+use syn::{FnArg, Ident, ItemFn, LitStr};
#[derive(Default)]
pub(crate) struct ReducerArgs {
+ name: Option,
lifecycle: Option,
}
@@ -18,14 +19,6 @@ enum LifecycleReducer {
Update(Span),
}
impl LifecycleReducer {
- fn reducer_name(&self) -> &'static str {
- match self {
- Self::Init(_) => "__init__",
- Self::ClientConnected(_) => "__identity_connected__",
- Self::ClientDisconnected(_) => "__identity_disconnected__",
- Self::Update(_) => "__update__",
- }
- }
fn to_lifecycle_value(&self) -> Option {
let (Self::Init(span) | Self::ClientConnected(span) | Self::ClientDisconnected(span) | Self::Update(span)) =
*self;
@@ -54,6 +47,10 @@ impl ReducerArgs {
sym::client_connected => set_lifecycle(LifecycleReducer::ClientConnected)?,
sym::client_disconnected => set_lifecycle(LifecycleReducer::ClientDisconnected)?,
sym::update => set_lifecycle(LifecycleReducer::Update)?,
+ sym::name => {
+ check_duplicate(&args.name, &meta)?;
+ args.name = Some(meta.value()?.parse()?);
+ }
});
Ok(())
})
@@ -66,21 +63,7 @@ pub(crate) fn reducer_impl(args: ReducerArgs, original_function: &ItemFn) -> syn
let func_name = &original_function.sig.ident;
let vis = &original_function.vis;
- // Extract reducer name, making sure it's not `__XXX__` as that's the form we reserve for special reducers.
- let reducer_name;
- let reducer_name = match &args.lifecycle {
- Some(lifecycle) => lifecycle.reducer_name(),
- None => {
- reducer_name = func_name.to_string();
- if reducer_name.starts_with("__") && reducer_name.ends_with("__") {
- return Err(syn::Error::new_spanned(
- &original_function.sig.ident,
- "reserved reducer name",
- ));
- }
- &reducer_name
- }
- };
+ let reducer_name = args.name.unwrap_or_else(|| ident_to_litstr(func_name));
for param in &original_function.sig.generics.params {
let err = |msg| syn::Error::new_spanned(param, msg);
@@ -125,7 +108,7 @@ pub(crate) fn reducer_impl(args: ReducerArgs, original_function: &ItemFn) -> syn
}
.into_iter();
- let register_describer_symbol = format!("__preinit__20_register_describer_{reducer_name}");
+ let register_describer_symbol = format!("__preinit__20_register_describer_{}", reducer_name.value());
let lt_params = &original_function.sig.generics;
let lt_where_clause = <_params.where_clause;
diff --git a/crates/cli/src/subcommands/call.rs b/crates/cli/src/subcommands/call.rs
index df8a58e62de..f9501f3d993 100644
--- a/crates/cli/src/subcommands/call.rs
+++ b/crates/cli/src/subcommands/call.rs
@@ -219,8 +219,7 @@ fn add_reducer_ctx_to_err(error: &mut String, schema_json: Value, reducer_name:
.map(|kv| kv.0)
.collect::>();
- // Hide pseudo-reducers (assume that any `__XXX__` are such); they shouldn't be callable.
- reducers.retain(|&c| !(c.starts_with("__") && c.ends_with("__")));
+ // TODO(noa): exclude lifecycle reducers
if let Some(best) = find_best_match_for_name(&reducers, reducer_name, None) {
write!(error, "\n\nA reducer with a similar name exists: `{}`", best).unwrap();
diff --git a/crates/cli/src/subcommands/generate/mod.rs b/crates/cli/src/subcommands/generate/mod.rs
index b1467486d9a..90205bdeabb 100644
--- a/crates/cli/src/subcommands/generate/mod.rs
+++ b/crates/cli/src/subcommands/generate/mod.rs
@@ -259,6 +259,7 @@ pub fn generate(module: RawModuleDef, lang: Language, namespace: &str) -> anyhow
let reducers = module
.reducers()
+ .filter(|r| r.lifecycle.is_none())
.map(|reducer| spacetimedb_lib::ReducerDef {
name: reducer.name.clone().into(),
args: reducer.params.elements.to_vec(),
@@ -268,9 +269,7 @@ pub fn generate(module: RawModuleDef, lang: Language, namespace: &str) -> anyhow
let items = itertools::chain!(
types,
tables.into_iter().map(GenItem::Table),
- reducers
- .filter(|r| !(r.name.starts_with("__") && r.name.ends_with("__")))
- .map(GenItem::Reducer),
+ reducers.map(GenItem::Reducer),
);
let items: Vec = items.collect();
diff --git a/crates/cli/tests/snapshots/codegen__codegen_rust.snap b/crates/cli/tests/snapshots/codegen__codegen_rust.snap
index f37f69c9b49..4a1f4be0698 100644
--- a/crates/cli/tests/snapshots/codegen__codegen_rust.snap
+++ b/crates/cli/tests/snapshots/codegen__codegen_rust.snap
@@ -1,6 +1,7 @@
---
source: crates/cli/tests/codegen.rs
expression: outfiles
+snapshot_kind: text
---
"add_player_reducer.rs" = '''
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
@@ -734,113 +735,6 @@ impl __sdk::InModule for HasSpecialStuff {
type Module = super::RemoteModule;
}
-'''
-"identity_connected_reducer.rs" = '''
-// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
-// WILL NOT BE SAVED. MODIFY TABLES IN RUST INSTEAD.
-
-#![allow(unused)]use spacetimedb_sdk::__codegen::{
- self as __sdk,
- anyhow::{self as __anyhow, Context as _},
- __lib,
- __sats,
- __ws,
-};
-
-
-#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
-#[sats(crate = __lib)]
-pub(super) struct IdentityConnectedArgs {
- }
-
-impl From for super::Reducer {
- fn from(args: IdentityConnectedArgs) -> Self {
- Self::IdentityConnected
-}
-}
-
-impl __sdk::InModule for IdentityConnectedArgs {
- type Module = super::RemoteModule;
-}
-
-pub struct IdentityConnectedCallbackId(__sdk::CallbackId);
-
-#[allow(non_camel_case_types)]
-/// Extension trait for access to the reducer `__identity_connected__`.
-///
-/// Implemented for [`super::RemoteReducers`].
-pub trait identity_connected {
- /// Request that the remote module invoke the reducer `__identity_connected__` to run as soon as possible.
- ///
- /// This method returns immediately, and errors only if we are unable to send the request.
- /// The reducer will run asynchronously in the future,
- /// and its status can be observed by listening for [`Self::on_identity_connected`] callbacks.
- fn identity_connected(&self, ) -> __anyhow::Result<()>;
- /// Register a callback to run whenever we are notified of an invocation of the reducer `__identity_connected__`.
- ///
- /// The [`super::EventContext`] passed to the `callback`
- /// will always have [`__sdk::Event::Reducer`] as its `event`,
- /// but it may or may not have terminated successfully and been committed.
- /// Callbacks should inspect the [`__sdk::ReducerEvent`] contained in the [`super::EventContext`]
- /// to determine the reducer's status.
- ///
- /// The returned [`IdentityConnectedCallbackId`] can be passed to [`Self::remove_on_identity_connected`]
- /// to cancel the callback.
- fn on_identity_connected(&self, callback: impl FnMut(&super::EventContext, ) + Send + 'static) -> IdentityConnectedCallbackId;
- /// Cancel a callback previously registered by [`Self::on_identity_connected`],
- /// causing it not to run in the future.
- fn remove_on_identity_connected(&self, callback: IdentityConnectedCallbackId);
-}
-
-impl identity_connected for super::RemoteReducers {
- fn identity_connected(&self, ) -> __anyhow::Result<()> {
- self.imp.call_reducer("__identity_connected__", IdentityConnectedArgs { })
- }
- fn on_identity_connected(
- &self,
- mut callback: impl FnMut(&super::EventContext, ) + Send + 'static,
- ) -> IdentityConnectedCallbackId {
- IdentityConnectedCallbackId(self.imp.on_reducer(
- "__identity_connected__",
- Box::new(move |ctx: &super::EventContext| {
- let super::EventContext {
- event: __sdk::Event::Reducer(__sdk::ReducerEvent {
- reducer: super::Reducer::IdentityConnected {
-
- },
- ..
- }),
- ..
- } = ctx else { unreachable!() };
- callback(ctx, )
- }),
- ))
- }
- fn remove_on_identity_connected(&self, callback: IdentityConnectedCallbackId) {
- self.imp.remove_on_reducer("__identity_connected__", callback.0)
- }
-}
-
-#[allow(non_camel_case_types)]
-#[doc(hidden)]
-/// Extension trait for setting the call-flags for the reducer `__identity_connected__`.
-///
-/// Implemented for [`super::SetReducerFlags`].
-///
-/// This type is currently unstable and may be removed without a major version bump.
-pub trait set_flags_for_identity_connected {
- /// Set the call-reducer flags for the reducer `__identity_connected__` to `flags`.
- ///
- /// This type is currently unstable and may be removed without a major version bump.
- fn identity_connected(&self, flags: __ws::CallReducerFlags);
-}
-
-impl set_flags_for_identity_connected for super::SetReducerFlags {
- fn identity_connected(&self, flags: __ws::CallReducerFlags) {
- self.imp.set_call_reducer_flags("__identity_connected__", flags);
- }
-}
-
'''
"init_reducer.rs" = '''
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
@@ -873,17 +767,17 @@ impl __sdk::InModule for InitArgs {
pub struct InitCallbackId(__sdk::CallbackId);
#[allow(non_camel_case_types)]
-/// Extension trait for access to the reducer `__init__`.
+/// Extension trait for access to the reducer `init`.
///
/// Implemented for [`super::RemoteReducers`].
pub trait init {
- /// Request that the remote module invoke the reducer `__init__` to run as soon as possible.
+ /// Request that the remote module invoke the reducer `init` to run as soon as possible.
///
/// This method returns immediately, and errors only if we are unable to send the request.
/// The reducer will run asynchronously in the future,
/// and its status can be observed by listening for [`Self::on_init`] callbacks.
fn init(&self, ) -> __anyhow::Result<()>;
- /// Register a callback to run whenever we are notified of an invocation of the reducer `__init__`.
+ /// Register a callback to run whenever we are notified of an invocation of the reducer `init`.
///
/// The [`super::EventContext`] passed to the `callback`
/// will always have [`__sdk::Event::Reducer`] as its `event`,
@@ -901,14 +795,14 @@ pub trait init {
impl init for super::RemoteReducers {
fn init(&self, ) -> __anyhow::Result<()> {
- self.imp.call_reducer("__init__", InitArgs { })
+ self.imp.call_reducer("init", InitArgs { })
}
fn on_init(
&self,
mut callback: impl FnMut(&super::EventContext, ) + Send + 'static,
) -> InitCallbackId {
InitCallbackId(self.imp.on_reducer(
- "__init__",
+ "init",
Box::new(move |ctx: &super::EventContext| {
let super::EventContext {
event: __sdk::Event::Reducer(__sdk::ReducerEvent {
@@ -924,19 +818,19 @@ impl init for super::RemoteReducers {
))
}
fn remove_on_init(&self, callback: InitCallbackId) {
- self.imp.remove_on_reducer("__init__", callback.0)
+ self.imp.remove_on_reducer("init", callback.0)
}
}
#[allow(non_camel_case_types)]
#[doc(hidden)]
-/// Extension trait for setting the call-flags for the reducer `__init__`.
+/// Extension trait for setting the call-flags for the reducer `init`.
///
/// Implemented for [`super::SetReducerFlags`].
///
/// This type is currently unstable and may be removed without a major version bump.
pub trait set_flags_for_init {
- /// Set the call-reducer flags for the reducer `__init__` to `flags`.
+ /// Set the call-reducer flags for the reducer `init` to `flags`.
///
/// This type is currently unstable and may be removed without a major version bump.
fn init(&self, flags: __ws::CallReducerFlags);
@@ -944,7 +838,7 @@ pub trait set_flags_for_init {
impl set_flags_for_init for super::SetReducerFlags {
fn init(&self, flags: __ws::CallReducerFlags) {
- self.imp.set_call_reducer_flags("__init__", flags);
+ self.imp.set_call_reducer_flags("init", flags);
}
}
@@ -1157,8 +1051,8 @@ pub mod add_private_reducer;
pub mod assert_caller_identity_is_module_identity_reducer;
pub mod delete_player_reducer;
pub mod delete_players_by_name_reducer;
-pub mod identity_connected_reducer;
pub mod init_reducer;
+pub mod on_connect_reducer;
pub mod query_private_reducer;
pub mod repeating_test_reducer;
pub mod test_btree_index_args_reducer;
@@ -1201,13 +1095,13 @@ pub use test_a_table::*;
pub use test_d_table::*;
pub use test_e_table::*;
pub use test_f_table::*;
-pub use identity_connected_reducer::{identity_connected, set_flags_for_identity_connected, IdentityConnectedCallbackId};
-pub use init_reducer::{init, set_flags_for_init, InitCallbackId};
pub use add_player_reducer::{add_player, set_flags_for_add_player, AddPlayerCallbackId};
pub use add_private_reducer::{add_private, set_flags_for_add_private, AddPrivateCallbackId};
pub use assert_caller_identity_is_module_identity_reducer::{assert_caller_identity_is_module_identity, set_flags_for_assert_caller_identity_is_module_identity, AssertCallerIdentityIsModuleIdentityCallbackId};
pub use delete_player_reducer::{delete_player, set_flags_for_delete_player, DeletePlayerCallbackId};
pub use delete_players_by_name_reducer::{delete_players_by_name, set_flags_for_delete_players_by_name, DeletePlayersByNameCallbackId};
+pub use init_reducer::{init, set_flags_for_init, InitCallbackId};
+pub use on_connect_reducer::{on_connect, set_flags_for_on_connect, OnConnectCallbackId};
pub use query_private_reducer::{query_private, set_flags_for_query_private, QueryPrivateCallbackId};
pub use repeating_test_reducer::{repeating_test, set_flags_for_repeating_test, RepeatingTestCallbackId};
pub use test_reducer::{test, set_flags_for_test, TestCallbackId};
@@ -1221,9 +1115,7 @@ pub use test_btree_index_args_reducer::{test_btree_index_args, set_flags_for_tes
/// to indicate which reducer caused the event.
pub enum Reducer {
- IdentityConnected ,
- Init ,
- AddPlayer {
+ AddPlayer {
name: String,
} ,
AddPrivate {
@@ -1236,6 +1128,8 @@ pub enum Reducer {
DeletePlayersByName {
name: String,
} ,
+ Init ,
+ OnConnect ,
QueryPrivate ,
RepeatingTest {
arg: RepeatingTestArg,
@@ -1257,13 +1151,13 @@ impl __sdk::InModule for Reducer {
impl __sdk::Reducer for Reducer {
fn reducer_name(&self) -> &'static str {
match self {
- Reducer::IdentityConnected => "__identity_connected__",
- Reducer::Init => "__init__",
- Reducer::AddPlayer { .. } => "add_player",
+ Reducer::AddPlayer { .. } => "add_player",
Reducer::AddPrivate { .. } => "add_private",
Reducer::AssertCallerIdentityIsModuleIdentity => "assert_caller_identity_is_module_identity",
Reducer::DeletePlayer { .. } => "delete_player",
Reducer::DeletePlayersByName { .. } => "delete_players_by_name",
+ Reducer::Init => "init",
+ Reducer::OnConnect => "on_connect",
Reducer::QueryPrivate => "query_private",
Reducer::RepeatingTest { .. } => "repeating_test",
Reducer::Test { .. } => "test",
@@ -1275,13 +1169,13 @@ impl TryFrom<__ws::ReducerCallInfo<__ws::BsatnFormat>> for Reducer {
type Error = __anyhow::Error;
fn try_from(value: __ws::ReducerCallInfo<__ws::BsatnFormat>) -> __anyhow::Result {
match &value.reducer_name[..] {
- "__identity_connected__" => Ok(__sdk::parse_reducer_args::("__identity_connected__", &value.args)?.into()),
- "__init__" => Ok(__sdk::parse_reducer_args::("__init__", &value.args)?.into()),
- "add_player" => Ok(__sdk::parse_reducer_args::("add_player", &value.args)?.into()),
+ "add_player" => Ok(__sdk::parse_reducer_args::("add_player", &value.args)?.into()),
"add_private" => Ok(__sdk::parse_reducer_args::("add_private", &value.args)?.into()),
"assert_caller_identity_is_module_identity" => Ok(__sdk::parse_reducer_args::("assert_caller_identity_is_module_identity", &value.args)?.into()),
"delete_player" => Ok(__sdk::parse_reducer_args::("delete_player", &value.args)?.into()),
"delete_players_by_name" => Ok(__sdk::parse_reducer_args::("delete_players_by_name", &value.args)?.into()),
+ "init" => Ok(__sdk::parse_reducer_args::("init", &value.args)?.into()),
+ "on_connect" => Ok(__sdk::parse_reducer_args::("on_connect", &value.args)?.into()),
"query_private" => Ok(__sdk::parse_reducer_args::("query_private", &value.args)?.into()),
"repeating_test" => Ok(__sdk::parse_reducer_args::("repeating_test", &value.args)?.into()),
"test" => Ok(__sdk::parse_reducer_args::("test", &value.args)?.into()),
@@ -1756,6 +1650,113 @@ impl __sdk::InModule for NamespaceTestF {
type Module = super::RemoteModule;
}
+'''
+"on_connect_reducer.rs" = '''
+// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
+// WILL NOT BE SAVED. MODIFY TABLES IN RUST INSTEAD.
+
+#![allow(unused)]use spacetimedb_sdk::__codegen::{
+ self as __sdk,
+ anyhow::{self as __anyhow, Context as _},
+ __lib,
+ __sats,
+ __ws,
+};
+
+
+#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
+#[sats(crate = __lib)]
+pub(super) struct OnConnectArgs {
+ }
+
+impl From for super::Reducer {
+ fn from(args: OnConnectArgs) -> Self {
+ Self::OnConnect
+}
+}
+
+impl __sdk::InModule for OnConnectArgs {
+ type Module = super::RemoteModule;
+}
+
+pub struct OnConnectCallbackId(__sdk::CallbackId);
+
+#[allow(non_camel_case_types)]
+/// Extension trait for access to the reducer `on_connect`.
+///
+/// Implemented for [`super::RemoteReducers`].
+pub trait on_connect {
+ /// Request that the remote module invoke the reducer `on_connect` to run as soon as possible.
+ ///
+ /// This method returns immediately, and errors only if we are unable to send the request.
+ /// The reducer will run asynchronously in the future,
+ /// and its status can be observed by listening for [`Self::on_on_connect`] callbacks.
+ fn on_connect(&self, ) -> __anyhow::Result<()>;
+ /// Register a callback to run whenever we are notified of an invocation of the reducer `on_connect`.
+ ///
+ /// The [`super::EventContext`] passed to the `callback`
+ /// will always have [`__sdk::Event::Reducer`] as its `event`,
+ /// but it may or may not have terminated successfully and been committed.
+ /// Callbacks should inspect the [`__sdk::ReducerEvent`] contained in the [`super::EventContext`]
+ /// to determine the reducer's status.
+ ///
+ /// The returned [`OnConnectCallbackId`] can be passed to [`Self::remove_on_on_connect`]
+ /// to cancel the callback.
+ fn on_on_connect(&self, callback: impl FnMut(&super::EventContext, ) + Send + 'static) -> OnConnectCallbackId;
+ /// Cancel a callback previously registered by [`Self::on_on_connect`],
+ /// causing it not to run in the future.
+ fn remove_on_on_connect(&self, callback: OnConnectCallbackId);
+}
+
+impl on_connect for super::RemoteReducers {
+ fn on_connect(&self, ) -> __anyhow::Result<()> {
+ self.imp.call_reducer("on_connect", OnConnectArgs { })
+ }
+ fn on_on_connect(
+ &self,
+ mut callback: impl FnMut(&super::EventContext, ) + Send + 'static,
+ ) -> OnConnectCallbackId {
+ OnConnectCallbackId(self.imp.on_reducer(
+ "on_connect",
+ Box::new(move |ctx: &super::EventContext| {
+ let super::EventContext {
+ event: __sdk::Event::Reducer(__sdk::ReducerEvent {
+ reducer: super::Reducer::OnConnect {
+
+ },
+ ..
+ }),
+ ..
+ } = ctx else { unreachable!() };
+ callback(ctx, )
+ }),
+ ))
+ }
+ fn remove_on_on_connect(&self, callback: OnConnectCallbackId) {
+ self.imp.remove_on_reducer("on_connect", callback.0)
+ }
+}
+
+#[allow(non_camel_case_types)]
+#[doc(hidden)]
+/// Extension trait for setting the call-flags for the reducer `on_connect`.
+///
+/// Implemented for [`super::SetReducerFlags`].
+///
+/// This type is currently unstable and may be removed without a major version bump.
+pub trait set_flags_for_on_connect {
+ /// Set the call-reducer flags for the reducer `on_connect` to `flags`.
+ ///
+ /// This type is currently unstable and may be removed without a major version bump.
+ fn on_connect(&self, flags: __ws::CallReducerFlags);
+}
+
+impl set_flags_for_on_connect for super::SetReducerFlags {
+ fn on_connect(&self, flags: __ws::CallReducerFlags) {
+ self.imp.set_call_reducer_flags("on_connect", flags);
+ }
+}
+
'''
"pk_multi_identity_table.rs" = '''
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
diff --git a/crates/cli/tests/snapshots/codegen__codegen_typescript.snap b/crates/cli/tests/snapshots/codegen__codegen_typescript.snap
index 873b097681a..2fe25ab926b 100644
--- a/crates/cli/tests/snapshots/codegen__codegen_typescript.snap
+++ b/crates/cli/tests/snapshots/codegen__codegen_typescript.snap
@@ -1,6 +1,7 @@
---
source: crates/cli/tests/codegen.rs
expression: outfiles
+snapshot_kind: text
---
"add_player_reducer.ts" = '''
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
@@ -680,75 +681,6 @@ export namespace HasSpecialStuff {
}
-'''
-"identity_connected_reducer.ts" = '''
-// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
-// WILL NOT BE SAVED. MODIFY TABLES IN RUST INSTEAD.
-
-import {
- // @ts-ignore
- Address,
- // @ts-ignore
- AlgebraicType,
- // @ts-ignore
- AlgebraicValue,
- // @ts-ignore
- BinaryReader,
- // @ts-ignore
- BinaryWriter,
- // @ts-ignore
- CallReducerFlags,
- // @ts-ignore
- DBConnectionBuilder,
- // @ts-ignore
- DBConnectionImpl,
- // @ts-ignore
- DBContext,
- // @ts-ignore
- Event,
- // @ts-ignore
- EventContextInterface,
- // @ts-ignore
- Identity,
- // @ts-ignore
- ProductType,
- // @ts-ignore
- ProductTypeElement,
- // @ts-ignore
- SumType,
- // @ts-ignore
- SumTypeVariant,
- // @ts-ignore
- TableCache,
- // @ts-ignore
- deepEqual,
-} from "@clockworklabs/spacetimedb-sdk";
-
-export type IdentityConnected = {};
-
-/**
- * A namespace for generated helper functions.
- */
-export namespace IdentityConnected {
- /**
- * A function which returns this type represented as an AlgebraicType.
- * This function is derived from the AlgebraicType used to generate this type.
- */
- export function getTypeScriptAlgebraicType(): AlgebraicType {
- return AlgebraicType.createProductType([
- ]);
- }
-
- export function serialize(writer: BinaryWriter, value: IdentityConnected): void {
- IdentityConnected.getTypeScriptAlgebraicType().serialize(writer, value);
- }
-
- export function deserialize(reader: BinaryReader): IdentityConnected {
- return IdentityConnected.getTypeScriptAlgebraicType().deserialize(reader);
- }
-
-}
-
'''
"index.ts" = '''
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
@@ -794,10 +726,6 @@ import {
} from "@clockworklabs/spacetimedb-sdk";
// Import and reexport all reducer arg types
-import { IdentityConnected } from "./identity_connected_reducer.ts";
-export { IdentityConnected };
-import { Init } from "./init_reducer.ts";
-export { Init };
import { AddPlayer } from "./add_player_reducer.ts";
export { AddPlayer };
import { AddPrivate } from "./add_private_reducer.ts";
@@ -808,6 +736,10 @@ import { DeletePlayer } from "./delete_player_reducer.ts";
export { DeletePlayer };
import { DeletePlayersByName } from "./delete_players_by_name_reducer.ts";
export { DeletePlayersByName };
+import { Init } from "./init_reducer.ts";
+export { Init };
+import { OnConnect } from "./on_connect_reducer.ts";
+export { OnConnect };
import { QueryPrivate } from "./query_private_reducer.ts";
export { QueryPrivate };
import { RepeatingTest } from "./repeating_test_reducer.ts";
@@ -926,14 +858,6 @@ const REMOTE_MODULE = {
},
},
reducers: {
- __identity_connected__: {
- reducerName: "__identity_connected__",
- argsType: IdentityConnected.getTypeScriptAlgebraicType(),
- },
- __init__: {
- reducerName: "__init__",
- argsType: Init.getTypeScriptAlgebraicType(),
- },
add_player: {
reducerName: "add_player",
argsType: AddPlayer.getTypeScriptAlgebraicType(),
@@ -954,6 +878,14 @@ const REMOTE_MODULE = {
reducerName: "delete_players_by_name",
argsType: DeletePlayersByName.getTypeScriptAlgebraicType(),
},
+ init: {
+ reducerName: "init",
+ argsType: Init.getTypeScriptAlgebraicType(),
+ },
+ on_connect: {
+ reducerName: "on_connect",
+ argsType: OnConnect.getTypeScriptAlgebraicType(),
+ },
query_private: {
reducerName: "query_private",
argsType: QueryPrivate.getTypeScriptAlgebraicType(),
@@ -992,13 +924,13 @@ const REMOTE_MODULE = {
// A type representing all the possible variants of a reducer.
export type Reducer = never
-| { name: "IdentityConnected", args: IdentityConnected }
-| { name: "Init", args: Init }
| { name: "AddPlayer", args: AddPlayer }
| { name: "AddPrivate", args: AddPrivate }
| { name: "AssertCallerIdentityIsModuleIdentity", args: AssertCallerIdentityIsModuleIdentity }
| { name: "DeletePlayer", args: DeletePlayer }
| { name: "DeletePlayersByName", args: DeletePlayersByName }
+| { name: "Init", args: Init }
+| { name: "OnConnect", args: OnConnect }
| { name: "QueryPrivate", args: QueryPrivate }
| { name: "RepeatingTest", args: RepeatingTest }
| { name: "Test", args: Test }
@@ -1008,30 +940,6 @@ export type Reducer = never
export class RemoteReducers {
constructor(private connection: DBConnectionImpl, private setCallReducerFlags: SetReducerFlags) {}
- identityConnected() {
- this.connection.callReducer("__identity_connected__", new Uint8Array(0), this.setCallReducerFlags.identityConnectedFlags);
- }
-
- onIdentityConnected(callback: (ctx: EventContext) => void) {
- this.connection.onReducer("__identity_connected__", callback);
- }
-
- removeOnIdentityConnected(callback: (ctx: EventContext) => void) {
- this.connection.offReducer("__identity_connected__", callback);
- }
-
- init() {
- this.connection.callReducer("__init__", new Uint8Array(0), this.setCallReducerFlags.initFlags);
- }
-
- onInit(callback: (ctx: EventContext) => void) {
- this.connection.onReducer("__init__", callback);
- }
-
- removeOnInit(callback: (ctx: EventContext) => void) {
- this.connection.offReducer("__init__", callback);
- }
-
addPlayer(name: string) {
const __args = { name };
let __writer = new BinaryWriter(1024);
@@ -1108,6 +1016,30 @@ export class RemoteReducers {
this.connection.offReducer("delete_players_by_name", callback);
}
+ init() {
+ this.connection.callReducer("init", new Uint8Array(0), this.setCallReducerFlags.initFlags);
+ }
+
+ onInit(callback: (ctx: EventContext) => void) {
+ this.connection.onReducer("init", callback);
+ }
+
+ removeOnInit(callback: (ctx: EventContext) => void) {
+ this.connection.offReducer("init", callback);
+ }
+
+ onConnect() {
+ this.connection.callReducer("on_connect", new Uint8Array(0), this.setCallReducerFlags.onConnectFlags);
+ }
+
+ onOnConnect(callback: (ctx: EventContext) => void) {
+ this.connection.onReducer("on_connect", callback);
+ }
+
+ removeOnOnConnect(callback: (ctx: EventContext) => void) {
+ this.connection.offReducer("on_connect", callback);
+ }
+
queryPrivate() {
this.connection.callReducer("query_private", new Uint8Array(0), this.setCallReducerFlags.queryPrivateFlags);
}
@@ -1167,16 +1099,6 @@ export class RemoteReducers {
}
export class SetReducerFlags {
- identityConnectedFlags: CallReducerFlags = 'FullUpdate';
- identityConnected(flags: CallReducerFlags) {
- this.identityConnectedFlags = flags;
- }
-
- initFlags: CallReducerFlags = 'FullUpdate';
- init(flags: CallReducerFlags) {
- this.initFlags = flags;
- }
-
addPlayerFlags: CallReducerFlags = 'FullUpdate';
addPlayer(flags: CallReducerFlags) {
this.addPlayerFlags = flags;
@@ -1202,6 +1124,16 @@ export class SetReducerFlags {
this.deletePlayersByNameFlags = flags;
}
+ initFlags: CallReducerFlags = 'FullUpdate';
+ init(flags: CallReducerFlags) {
+ this.initFlags = flags;
+ }
+
+ onConnectFlags: CallReducerFlags = 'FullUpdate';
+ onConnect(flags: CallReducerFlags) {
+ this.onConnectFlags = flags;
+ }
+
queryPrivateFlags: CallReducerFlags = 'FullUpdate';
queryPrivate(flags: CallReducerFlags) {
this.queryPrivateFlags = flags;
@@ -1655,6 +1587,75 @@ export type NamespaceTestF = NamespaceTestF.Foo | NamespaceTestF.Bar | Namespace
export default NamespaceTestF;
+'''
+"on_connect_reducer.ts" = '''
+// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
+// WILL NOT BE SAVED. MODIFY TABLES IN RUST INSTEAD.
+
+import {
+ // @ts-ignore
+ Address,
+ // @ts-ignore
+ AlgebraicType,
+ // @ts-ignore
+ AlgebraicValue,
+ // @ts-ignore
+ BinaryReader,
+ // @ts-ignore
+ BinaryWriter,
+ // @ts-ignore
+ CallReducerFlags,
+ // @ts-ignore
+ DBConnectionBuilder,
+ // @ts-ignore
+ DBConnectionImpl,
+ // @ts-ignore
+ DBContext,
+ // @ts-ignore
+ Event,
+ // @ts-ignore
+ EventContextInterface,
+ // @ts-ignore
+ Identity,
+ // @ts-ignore
+ ProductType,
+ // @ts-ignore
+ ProductTypeElement,
+ // @ts-ignore
+ SumType,
+ // @ts-ignore
+ SumTypeVariant,
+ // @ts-ignore
+ TableCache,
+ // @ts-ignore
+ deepEqual,
+} from "@clockworklabs/spacetimedb-sdk";
+
+export type OnConnect = {};
+
+/**
+ * A namespace for generated helper functions.
+ */
+export namespace OnConnect {
+ /**
+ * A function which returns this type represented as an AlgebraicType.
+ * This function is derived from the AlgebraicType used to generate this type.
+ */
+ export function getTypeScriptAlgebraicType(): AlgebraicType {
+ return AlgebraicType.createProductType([
+ ]);
+ }
+
+ export function serialize(writer: BinaryWriter, value: OnConnect): void {
+ OnConnect.getTypeScriptAlgebraicType().serialize(writer, value);
+ }
+
+ export function deserialize(reader: BinaryReader): OnConnect {
+ return OnConnect.getTypeScriptAlgebraicType().deserialize(reader);
+ }
+
+}
+
'''
"pk_multi_identity_table.ts" = '''
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
diff --git a/crates/client-api-messages/src/websocket.rs b/crates/client-api-messages/src/websocket.rs
index 17e36e59675..2822e9501ac 100644
--- a/crates/client-api-messages/src/websocket.rs
+++ b/crates/client-api-messages/src/websocket.rs
@@ -340,6 +340,9 @@ pub struct SubscriptionError {
/// Provided by the client via a [`Subscribe`] or [`Unsubscribe`] message.
/// [`None`] if this occurred as the result of a [`TransactionUpdate`].
pub request_id: Option,
+ /// Provided by the client via a [`Subscribe`] or [`Unsubscribe`] message.
+ /// [`None`] if this occurred as the result of a [`TransactionUpdate`].
+ pub query_id: Option,
/// The return table of the query in question.
/// The server is not required to set this field.
/// It has been added to avoid a breaking change post 1.0.
diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs
index 55412d7dada..da2177994ea 100644
--- a/crates/client-api/src/routes/database.rs
+++ b/crates/client-api/src/routes/database.rs
@@ -108,6 +108,10 @@ pub async fn call(
log::debug!("Attempt to call non-existent reducer {}", reducer);
StatusCode::NOT_FOUND
}
+ ReducerCallError::LifecycleReducer(lifecycle) => {
+ log::debug!("Attempt to call {lifecycle:?} lifeycle reducer {}", reducer);
+ StatusCode::BAD_REQUEST
+ }
};
log::debug!("Error while invoking reducer {:#}", e);
diff --git a/crates/commitlog/Cargo.toml b/crates/commitlog/Cargo.toml
index ea3826ffca9..e0eb8ced353 100644
--- a/crates/commitlog/Cargo.toml
+++ b/crates/commitlog/Cargo.toml
@@ -10,7 +10,7 @@ description = "Implementation of the SpacetimeDB commitlog."
[features]
default = ["serde"]
# Enable types + impls useful for testing
-test = []
+test = ["dep:env_logger"]
[dependencies]
bitflags.workspace = true
@@ -25,6 +25,9 @@ spacetimedb-sats.workspace = true
tempfile.workspace = true
thiserror.workspace = true
+# For the 'test' feature
+env_logger = { workspace = true, optional = true }
+
[dev-dependencies]
env_logger.workspace = true
once_cell.workspace = true
diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs
index 72ca54d00eb..7474aa34577 100644
--- a/crates/commitlog/src/lib.rs
+++ b/crates/commitlog/src/lib.rs
@@ -24,8 +24,8 @@ pub use crate::{
pub mod error;
pub mod payload;
-#[cfg(test)]
-mod tests;
+#[cfg(any(test, feature = "test"))]
+pub mod tests;
/// [`Commitlog`] options.
#[derive(Clone, Copy, Debug)]
diff --git a/crates/commitlog/src/tests.rs b/crates/commitlog/src/tests.rs
index 4f63f75fc33..87733366ad4 100644
--- a/crates/commitlog/src/tests.rs
+++ b/crates/commitlog/src/tests.rs
@@ -1,4 +1,6 @@
+#[cfg(test)]
mod bitflip;
+#[cfg(test)]
mod partial;
pub mod helpers;
diff --git a/crates/commitlog/src/tests/helpers.rs b/crates/commitlog/src/tests/helpers.rs
index 12b65fd4138..523af8f01c4 100644
--- a/crates/commitlog/src/tests/helpers.rs
+++ b/crates/commitlog/src/tests/helpers.rs
@@ -38,6 +38,20 @@ where
total_txs
}
+/// Put the `txes` into `log`.
+///
+/// Each TX from `txes` will be placed in its own commit within `log`.
+pub fn fill_log_with(log: &mut commitlog::Generic, txes: impl IntoIterator- )
+where
+ R: Repo,
+ T: Debug + Encode,
+{
+ for tx in txes {
+ log.append(tx).unwrap();
+ log.commit().unwrap();
+ }
+}
+
pub fn enable_logging() {
let _ = env_logger::builder()
.filter_level(log::LevelFilter::Trace)
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index 21976f25cdd..d07f637e704 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -119,6 +119,7 @@ test = []
[dev-dependencies]
spacetimedb-lib = { path = "../lib", features = ["proptest"] }
spacetimedb-sats = { path = "../sats", features = ["proptest"] }
+spacetimedb-commitlog = { workspace = true, features = ["test"] }
criterion.workspace = true
# Also as dev-dependencies for use in _this_ crate's tests.
diff --git a/crates/core/src/client/message_handlers.rs b/crates/core/src/client/message_handlers.rs
index c2e4312d42f..396d419506c 100644
--- a/crates/core/src/client/message_handlers.rs
+++ b/crates/core/src/client/message_handlers.rs
@@ -74,7 +74,12 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
res.map(drop).map_err(|e| {
(
Some(reducer),
- client.module.info().reducers_map.lookup_id(reducer),
+ client
+ .module
+ .info()
+ .module_def
+ .reducer_full(&**reducer)
+ .map(|(id, _)| id),
e.into(),
)
})
diff --git a/crates/core/src/client/messages.rs b/crates/core/src/client/messages.rs
index 305758d6059..31a7eaf6a6e 100644
--- a/crates/core/src/client/messages.rs
+++ b/crates/core/src/client/messages.rs
@@ -388,7 +388,8 @@ impl ToProtocol for SubscriptionMessage {
SubscriptionResult::Error(error) => {
let msg = ws::SubscriptionError {
total_host_execution_duration_micros,
- request_id: self.request_id, // Pass Option through
+ request_id: self.request_id, // Pass Option through
+ query_id: self.query_id.map(|x| x.id), // Pass Option through
table_id: error.table_id,
error: error.message,
};
diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs
index 765d158e06d..0a9c9808d8a 100644
--- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs
+++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs
@@ -319,9 +319,8 @@ impl CommittedState {
.get_mut(&table_id)
.ok_or_else(|| TableError::IdNotFoundState(table_id))?;
let blob_store = &mut self.blob_store;
- let skip_index_update = true;
table
- .delete_equal_row(blob_store, rel, skip_index_update)
+ .delete_equal_row(blob_store, rel)
.map_err(TableError::Insert)?
.ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?;
Ok(())
@@ -334,7 +333,7 @@ impl CommittedState {
row: &ProductValue,
) -> Result<()> {
let (table, blob_store) = self.get_table_and_blob_store_or_create(table_id, schema);
- table.insert_for_replay(blob_store, row).map_err(TableError::Insert)?;
+ table.insert(blob_store, row).map_err(TableError::Insert)?;
Ok(())
}
diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
index 901dee59ea7..c36c3f6fbe9 100644
--- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
+++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
@@ -115,7 +115,10 @@ impl Locking {
// The database tables are now initialized with the correct data.
// Now we have to build our in memory structures.
commit_state.build_sequence_state(&mut datastore.sequence_state.lock())?;
- commit_state.build_indexes()?;
+ // We don't want to build indexes here; we'll build those later,
+ // in `rebuild_state_after_replay`.
+ // We actively do not want indexes to exist during replay,
+ // as they break replaying TX 0.
log::trace!("DATABASE:BOOTSTRAPPING SYSTEM TABLES DONE");
Ok(datastore)
diff --git a/crates/core/src/db/datastore/system_tables.rs b/crates/core/src/db/datastore/system_tables.rs
index da465527ff8..d5e29fbc41f 100644
--- a/crates/core/src/db/datastore/system_tables.rs
+++ b/crates/core/src/db/datastore/system_tables.rs
@@ -905,7 +905,7 @@ impl From for ProductValue {
/// identity | address
/// -----------------------------------------------------------------------------------------+--------------------------------------------------------
/// (__identity_bytes = 0x7452047061ea2502003412941d85a42f89b0702588b823ab55fc4f12e9ea8363) | (__address_bytes = 0x6bdea3ab517f5857dc9b1b5fe99e1b14)
-#[derive(Clone, Debug, Eq, PartialEq, SpacetimeType)]
+#[derive(Clone, Copy, Debug, Eq, PartialEq, SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct StClientRow {
pub(crate) identity: IdentityViaU256,
diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs
index 914a6091af0..206b43c3232 100644
--- a/crates/core/src/db/relational_db.rs
+++ b/crates/core/src/db/relational_db.rs
@@ -1328,7 +1328,7 @@ pub mod tests_utils {
pub struct TempReplicaDir(ReplicaDir);
impl TempReplicaDir {
- fn new() -> io::Result {
+ pub fn new() -> io::Result {
let dir = TempDir::with_prefix("stdb_test")?;
Ok(Self(ReplicaDir::from_path_unchecked(dir.into_path())))
}
@@ -1388,6 +1388,28 @@ pub mod tests_utils {
})
}
+ /// Create a [`TestDB`] which stores data in a local commitlog,
+ /// initialized with pre-existing data from `history`.
+ ///
+ /// [`TestHistory::from_txes`] is an easy-ish way to construct a non-empty [`History`].
+ ///
+ /// `expected_num_clients` is the expected size of the `connected_clients` return
+ /// from [`RelationalDB::open`] after replaying `history`.
+ /// Opening with an empty history, or one that does not insert into `st_client`,
+ /// should result in this number being 0.
+ pub fn in_memory_with_history(
+ history: impl durability::History,
+ expected_num_clients: usize,
+ ) -> Result {
+ let dir = TempReplicaDir::new()?;
+ let db = Self::open_db(&dir, history, None, None, expected_num_clients)?;
+ Ok(Self {
+ db,
+ durable: None,
+ tmp_dir: dir,
+ })
+ }
+
/// Re-open the database, after ensuring that all data has been flushed
/// to disk (if the database was created via [`Self::durable`]).
pub fn reopen(self) -> Result {
@@ -1468,7 +1490,7 @@ pub mod tests_utils {
}
fn in_memory_internal(root: &ReplicaDir) -> Result {
- Self::open_db(root, EmptyHistory::new(), None, None)
+ Self::open_db(root, EmptyHistory::new(), None, None, 0)
}
fn durable_internal(
@@ -1479,7 +1501,7 @@ pub mod tests_utils {
let history = local.clone();
let durability = local.clone() as Arc>;
let snapshot_repo = open_snapshot_repo(root.snapshots(), Identity::ZERO, 0)?;
- let db = Self::open_db(root, history, Some((durability, disk_size_fn)), Some(snapshot_repo))?;
+ let db = Self::open_db(root, history, Some((durability, disk_size_fn)), Some(snapshot_repo), 0)?;
Ok((db, local))
}
@@ -1489,6 +1511,7 @@ pub mod tests_utils {
history: impl durability::History,
durability: Option<(Arc>, DiskSizeFn)>,
snapshot_repo: Option>,
+ expected_num_clients: usize,
) -> Result {
let (db, connected_clients) = RelationalDB::open(
root,
@@ -1498,7 +1521,7 @@ pub mod tests_utils {
durability,
snapshot_repo,
)?;
- debug_assert!(connected_clients.is_empty());
+ assert_eq!(connected_clients.len(), expected_num_clients);
let db = db.with_row_count(Self::row_count_fn());
db.with_auto_commit(Workload::Internal, |tx| {
db.set_initialized(tx, HostType::Wasm, Program::empty())
@@ -1530,6 +1553,43 @@ pub mod tests_utils {
let gen_cols = row_ref.project(&gen_cols).unwrap();
Ok((gen_cols, row_ref))
}
+
+ /// An in-memory commitlog used for tests that want to replay a known history.
+ pub struct TestHistory(commitlog::commitlog::Generic);
+
+ impl durability::History for TestHistory {
+ type TxData = Txdata;
+ fn fold_transactions_from(&self, offset: TxOffset, decoder: D) -> Result<(), D::Error>
+ where
+ D: commitlog::Decoder,
+ D::Error: From,
+ {
+ self.0.fold_transactions_from(offset, decoder)
+ }
+ fn transactions_from<'a, D>(
+ &self,
+ offset: TxOffset,
+ decoder: &'a D,
+ ) -> impl Iterator
- , D::Error>>
+ where
+ D: commitlog::Decoder,
+ D::Error: From,
+ Self::TxData: 'a,
+ {
+ self.0.transactions_from(offset, decoder)
+ }
+ fn max_tx_offset(&self) -> Option {
+ self.0.max_committed_offset()
+ }
+ }
+
+ impl TestHistory {
+ pub fn from_txes(txes: impl IntoIterator
- ) -> Self {
+ let mut log = commitlog::tests::helpers::mem_log::(32);
+ commitlog::tests::helpers::fill_log_with(&mut log, txes);
+ Self(log)
+ }
+ }
}
#[cfg(test)]
@@ -1563,6 +1623,7 @@ mod tests {
use spacetimedb_schema::schema::RowLevelSecuritySchema;
use spacetimedb_table::read_column::ReadColumn;
use spacetimedb_table::table::RowRef;
+ use tests::tests_utils::TestHistory;
fn my_table(col_type: AlgebraicType) -> TableSchema {
table("MyTable", ProductType::from([("my_col", col_type)]), |builder| builder)
@@ -2429,4 +2490,81 @@ mod tests {
assert_eq!(reducer_timestamp, timestamp);
}
}
+
+ /// This tests that we are able to correctly replay mutations to system tables,
+ /// in this case specifically `st_client`.
+ ///
+ /// [SpacetimeDB PR #2161](https://github.com/clockworklabs/SpacetimeDB/pull/2161)
+ /// fixed a bug where replaying deletes to `st_client` would fail due to an unpopulated index.
+ #[test]
+ fn replay_delete_from_st_client() {
+ use crate::db::datastore::system_tables::{StClientRow, ST_CLIENT_ID};
+
+ let row_0 = StClientRow {
+ identity: Identity::ZERO.into(),
+ address: Address::ZERO.into(),
+ };
+ let row_1 = StClientRow {
+ identity: Identity::ZERO.into(),
+ address: Address::from_u128(1).into(),
+ };
+
+ let history = TestHistory::from_txes([
+ // TX 0: insert row 0
+ Txdata {
+ inputs: None,
+ outputs: None,
+ mutations: Some(txdata::Mutations {
+ inserts: Box::new([txdata::Ops {
+ table_id: ST_CLIENT_ID,
+ rowdata: Arc::new([row_0.into()]),
+ }]),
+ deletes: Box::new([]),
+ truncates: Box::new([]),
+ }),
+ },
+ // TX 1: delete row 0
+ Txdata {
+ inputs: None,
+ outputs: None,
+ mutations: Some(txdata::Mutations {
+ inserts: Box::new([]),
+ deletes: Box::new([txdata::Ops {
+ table_id: ST_CLIENT_ID,
+ rowdata: Arc::new([row_0.into()]),
+ }]),
+ truncates: Box::new([]),
+ }),
+ },
+ // TX 2: insert row 1
+ Txdata {
+ inputs: None,
+ outputs: None,
+ mutations: Some(txdata::Mutations {
+ inserts: Box::new([txdata::Ops {
+ table_id: ST_CLIENT_ID,
+ rowdata: Arc::new([row_1.into()]),
+ }]),
+ deletes: Box::new([]),
+ truncates: Box::new([]),
+ }),
+ },
+ ]);
+
+ // We expect 1 client, since we left `row_1` in there.
+ let stdb = TestDB::in_memory_with_history(history, /* expected_num_clients: */ 1).unwrap();
+
+ let read_tx = stdb.begin_tx(Workload::ForTests);
+
+ // Read all of st_client, assert that there's only one row, and that said row is `row_1`.
+ let present_rows: Vec = stdb
+ .iter(&read_tx, ST_CLIENT_ID)
+ .unwrap()
+ .map(|row_ref| row_ref.try_into().unwrap())
+ .collect();
+ assert_eq!(present_rows.len(), 1);
+ assert_eq!(present_rows[0], row_1);
+
+ stdb.release_tx(read_tx);
+ }
}
diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs
index ba015cb37f8..91440be572c 100644
--- a/crates/core/src/host/mod.rs
+++ b/crates/core/src/host/mod.rs
@@ -101,28 +101,8 @@ impl Default for ArgsTuple {
}
}
-#[derive(Copy, Clone, Debug, Default)]
-pub struct ReducerId(u32);
-impl std::fmt::Display for ReducerId {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- self.0.fmt(f)
- }
-}
-impl From for ReducerId {
- fn from(id: usize) -> Self {
- Self(id as u32)
- }
-}
-impl From for ReducerId {
- fn from(id: u32) -> Self {
- Self(id)
- }
-}
-impl From for u32 {
- fn from(id: ReducerId) -> Self {
- id.0
- }
-}
+// TODO(noa): replace imports from this module with imports straight from primitives.
+pub use spacetimedb_primitives::ReducerId;
#[derive(thiserror::Error, Debug)]
#[error("invalid arguments for reducer {reducer}: {err}")]
diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs
index 83758e86d5e..8c4fbd9e7a0 100644
--- a/crates/core/src/host/module_host.rs
+++ b/crates/core/src/host/module_host.rs
@@ -1,4 +1,3 @@
-use super::wasm_common::{CLIENT_CONNECTED_DUNDER, CLIENT_DISCONNECTED_DUNDER};
use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId};
use crate::client::{ClientActorId, ClientConnectionSender};
use crate::database_logger::{LogLevel, Record};
@@ -27,6 +26,7 @@ use spacetimedb_client_api_messages::timestamp::Timestamp;
use spacetimedb_client_api_messages::websocket::{Compression, QueryUpdate, WebsocketFormat};
use spacetimedb_data_structures::error_stream::ErrorStream;
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
+use spacetimedb_lib::db::raw_def::v9::Lifecycle;
use spacetimedb_lib::identity::{AuthCtx, RequestId};
use spacetimedb_lib::Address;
use spacetimedb_primitives::{col_list, TableId};
@@ -177,9 +177,6 @@ pub struct ModuleInfo {
/// Loaded by loading the module's program from the system tables, extracting its definition,
/// and validating.
pub module_def: ModuleDef,
- /// Map between reducer IDs and reducer names.
- /// Reducer names are sorted alphabetically.
- pub reducers_map: ReducersMap,
/// The identity of the module.
pub owner_identity: Identity,
/// The identity of the database.
@@ -203,11 +200,8 @@ impl ModuleInfo {
log_tx: tokio::sync::broadcast::Sender,
subscriptions: ModuleSubscriptions,
) -> Arc {
- // Note: sorts alphabetically!
- let reducers_map = module_def.reducers().map(|r| &*r.name).collect();
Arc::new(ModuleInfo {
module_def,
- reducers_map,
owner_identity,
database_identity,
module_hash,
@@ -215,26 +209,6 @@ impl ModuleInfo {
subscriptions,
})
}
-
- /// Get the reducer seed and ID for a reducer name, if any.
- pub fn reducer_seed_and_id(&self, reducer_name: &str) -> Option<(ReducerArgsDeserializeSeed, ReducerId)> {
- let seed = self.module_def.reducer_arg_deserialize_seed(reducer_name)?;
- let reducer_id = self
- .reducers_map
- .lookup_id(reducer_name)
- .expect("seed was present, so ID should be present!");
- Some((seed, reducer_id))
- }
-
- /// Get a reducer by its ID.
- pub fn get_reducer_by_id(&self, reducer_id: ReducerId) -> Option<&ReducerDef> {
- let name = self.reducers_map.lookup_name(reducer_id)?;
- Some(
- self.module_def
- .reducer(name)
- .expect("id was present, so reducer should be present!"),
- )
- }
}
/// A bidirectional map between `Identifiers` (reducer names) and `ReducerId`s.
@@ -476,6 +450,8 @@ pub enum ReducerCallError {
NoSuchReducer,
#[error("no such scheduled reducer")]
ScheduleReducerNotFound,
+ #[error("can't directly call special {0:?} lifecycle reducer")]
+ LifecycleReducer(Lifecycle),
}
#[derive(thiserror::Error, Debug)]
@@ -558,12 +534,15 @@ impl ModuleHost {
caller_address: Address,
connected: bool,
) -> Result<(), ReducerCallError> {
- let reducer_name = if connected {
- CLIENT_CONNECTED_DUNDER
+ let (lifecycle, fake_name) = if connected {
+ (Lifecycle::OnConnect, "__identity_connected__")
} else {
- CLIENT_DISCONNECTED_DUNDER
+ (Lifecycle::OnDisconnect, "__identity_disconnected__")
};
+ let reducer_lookup = self.info.module_def.lifecycle_reducer(lifecycle);
+ let reducer_name = reducer_lookup.as_ref().map(|(_, def)| &*def.name).unwrap_or(fake_name);
+
let db = &self.inner.replica_ctx().relational_db;
let workload = || {
Workload::Reducer(ReducerContext {
@@ -575,42 +554,41 @@ impl ModuleHost {
})
};
- let result = self
- .call_reducer_inner(
+ let result = if let Some((reducer_id, reducer_def)) = reducer_lookup {
+ self.call_reducer_inner(
caller_identity,
Some(caller_address),
None,
None,
None,
- reducer_name,
+ reducer_id,
+ reducer_def,
ReducerArgs::Nullary,
)
.await
.map(drop)
- .or_else(|e| match e {
- // If the module doesn't define connected or disconnected, commit
- // a transaction to update `st_clients` and to ensure we always have those events
- // paired in the commitlog.
- //
- // This is necessary to be able to disconnect clients after a server
- // crash.
- ReducerCallError::NoSuchReducer => db
- .with_auto_commit(workload(), |mut_tx| {
- if connected {
- self.update_st_clients(mut_tx, caller_identity, caller_address, connected)
- } else {
- Ok(())
- }
- })
- .map_err(|err| {
- InvalidReducerArguments {
- err: err.into(),
- reducer: reducer_name.into(),
- }
- .into()
- }),
- e => Err(e),
- });
+ } else {
+ // If the module doesn't define connected or disconnected, commit
+ // a transaction to update `st_clients` and to ensure we always have those events
+ // paired in the commitlog.
+ //
+ // This is necessary to be able to disconnect clients after a server
+ // crash.
+ db.with_auto_commit(workload(), |mut_tx| {
+ if connected {
+ self.update_st_clients(mut_tx, caller_identity, caller_address, connected)
+ } else {
+ Ok(())
+ }
+ })
+ .map_err(|err| {
+ InvalidReducerArguments {
+ err: err.into(),
+ reducer: reducer_name.into(),
+ }
+ .into()
+ })
+ };
// Deleting client from `st_clients`does not depend upon result of disconnect reducer hence done in a separate tx.
if !connected {
@@ -663,25 +641,15 @@ impl ModuleHost {
client: Option>,
request_id: Option,
timer: Option,
- reducer_name: &str,
+ reducer_id: ReducerId,
+ reducer_def: &ReducerDef,
args: ReducerArgs,
) -> Result {
- let reducer_seed = self
- .info
- .module_def
- .reducer_arg_deserialize_seed(reducer_name)
- .ok_or(ReducerCallError::NoSuchReducer)?;
-
- let reducer_id = self
- .info
- .reducers_map
- .lookup_id(reducer_name)
- .expect("if we found the seed, we should find the ID!");
-
+ let reducer_seed = ReducerArgsDeserializeSeed(self.info.module_def.typespace().with_type(reducer_def));
let args = args.into_tuple(reducer_seed)?;
let caller_address = caller_address.unwrap_or(Address::__DUMMY);
- self.call(reducer_name, move |inst| {
+ self.call(&reducer_def.name, move |inst| {
inst.call_reducer(
None,
CallReducerParams {
@@ -710,20 +678,28 @@ impl ModuleHost {
reducer_name: &str,
args: ReducerArgs,
) -> Result {
- if reducer_name.starts_with("__") && reducer_name.ends_with("__") {
- return Err(ReducerCallError::NoSuchReducer);
- }
- let res = self
- .call_reducer_inner(
+ let res = async {
+ let (reducer_id, reducer_def) = self
+ .info
+ .module_def
+ .reducer_full(reducer_name)
+ .ok_or(ReducerCallError::NoSuchReducer)?;
+ if let Some(lifecycle) = reducer_def.lifecycle {
+ return Err(ReducerCallError::LifecycleReducer(lifecycle));
+ }
+ self.call_reducer_inner(
caller_identity,
caller_address,
client,
request_id,
timer,
- reducer_name,
+ reducer_id,
+ reducer_def,
args,
)
- .await;
+ .await
+ }
+ .await;
let log_message = match &res {
Err(ReducerCallError::NoSuchReducer) => Some(format!(
@@ -761,10 +737,11 @@ impl ModuleHost {
match call_reducer_params(&mut tx) {
Ok(Some(params)) => {
// Is necessary to patch the context with the actual calling reducer
- let reducer = module
- .reducers_map
- .lookup_name(params.reducer_id)
+ let reducer_def = module
+ .module_def
+ .get_reducer_by_id(params.reducer_id)
.ok_or(ReducerCallError::ScheduleReducerNotFound)?;
+ let reducer = &*reducer_def.name;
tx.ctx = ExecutionContext::with_workload(
tx.ctx.database_identity(),
diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs
index 27b21d659f0..58e1880f085 100644
--- a/crates/core/src/host/scheduler.rs
+++ b/crates/core/src/host/scheduler.rs
@@ -304,8 +304,9 @@ impl SchedulerActor {
let id = match item {
QueueItem::Id(id) => id,
QueueItem::VolatileNonatomicImmediate { reducer_name, args } => {
- let (reducer_seed, reducer_id) = module_info
- .reducer_seed_and_id(&reducer_name[..])
+ let (reducer_id, reducer_seed) = module_info
+ .module_def
+ .reducer_arg_deserialize_seed(&reducer_name[..])
.ok_or_else(|| anyhow!("Reducer not found: {}", reducer_name))?;
let reducer_args = args.into_tuple(reducer_seed)?;
@@ -334,8 +335,9 @@ impl SchedulerActor {
let ScheduledReducer { reducer, bsatn_args } = proccess_schedule(tx, &db, id.table_id, &schedule_row)?;
- let (reducer_seed, reducer_id) = module_info
- .reducer_seed_and_id(&reducer[..])
+ let (reducer_id, reducer_seed) = module_info
+ .module_def
+ .reducer_arg_deserialize_seed(&reducer[..])
.ok_or_else(|| anyhow!("Reducer not found: {}", reducer))?;
let reducer_args = ReducerArgs::Bsatn(bsatn_args.into()).into_tuple(reducer_seed)?;
diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs
index 9baacf66563..c902c2a1e9f 100644
--- a/crates/core/src/host/wasm_common.rs
+++ b/crates/core/src/host/wasm_common.rs
@@ -20,12 +20,6 @@ pub const DESCRIBE_MODULE_DUNDER: &str = "__describe_module__";
pub const PREINIT_DUNDER: &str = "__preinit__";
/// initializes the user code in the module. fallible
pub const SETUP_DUNDER: &str = "__setup__";
-/// the reducer with this name initializes the database
-pub const INIT_DUNDER: &str = "__init__";
-/// The reducer with this name is invoked when a client connects.
-pub const CLIENT_CONNECTED_DUNDER: &str = "__identity_connected__";
-/// The reducer with this name is invoked when a client disconnects.
-pub const CLIENT_DISCONNECTED_DUNDER: &str = "__identity_disconnected__";
#[derive(Debug, Clone)]
#[allow(unused)]
diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs
index 77f4bd7bf98..a2e73e8f2bf 100644
--- a/crates/core/src/host/wasm_common/module_host_actor.rs
+++ b/crates/core/src/host/wasm_common/module_host_actor.rs
@@ -1,6 +1,7 @@
use anyhow::Context;
use bytes::Bytes;
use spacetimedb_client_api_messages::timestamp::Timestamp;
+use spacetimedb_lib::db::raw_def::v9::Lifecycle;
use spacetimedb_primitives::TableId;
use spacetimedb_schema::auto_migrate::ponder_migrate;
use spacetimedb_schema::def::ModuleDef;
@@ -297,13 +298,13 @@ impl ModuleInstance for WasmModuleInstance {
})
.inspect_err(|e| log::error!("{e:?}"))?;
- let rcr = match self.info.reducers_map.lookup_id(INIT_DUNDER) {
+ let rcr = match self.info.module_def.lifecycle_reducer(Lifecycle::Init) {
None => {
stdb.commit_tx(tx)?;
None
}
- Some(reducer_id) => {
+ Some((reducer_id, _)) => {
self.system_logger().info("Invoking `init` reducer");
let caller_identity = self.replica_context().database.owner_identity;
Some(self.call_reducer_with_tx(
@@ -405,11 +406,8 @@ impl WasmModuleInstance {
let replica_ctx = self.replica_context();
let stdb = &*replica_ctx.relational_db.clone();
let address = replica_ctx.database_identity;
- let reducer_name = self
- .info
- .reducers_map
- .lookup_name(reducer_id)
- .expect("reducer not found");
+ let reducer_def = self.info.module_def.reducer_by_id(reducer_id);
+ let reducer_name = &*reducer_def.name;
let _outer_span = tracing::trace_span!("call_reducer",
reducer_name,
@@ -512,7 +510,7 @@ impl WasmModuleInstance {
Ok(Ok(())) => {
// Detecing a new client, and inserting it in `st_clients`
// Disconnect logic is written in module_host.rs, due to different transacationality requirements.
- if reducer_name == CLIENT_CONNECTED_DUNDER {
+ if reducer_def.lifecycle == Some(Lifecycle::OnConnect) {
match self.insert_st_client(&mut tx, caller_identity, caller_address) {
Ok(_) => EventStatus::Committed(DatabaseUpdate::default()),
Err(err) => EventStatus::Failed(err.to_string()),
diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs
index ac7796d287d..b20955d1586 100644
--- a/crates/core/src/subscription/module_subscription_actor.rs
+++ b/crates/core/src/subscription/module_subscription_actor.rs
@@ -108,23 +108,37 @@ impl ModuleSubscriptions {
self.relational_db.release_tx(tx);
});
let auth = AuthCtx::new(self.owner_identity, sender.id.identity);
- let guard = self.subscriptions.read();
let query = super::query::WHITESPACE.replace_all(&request.query, " ");
let sql = query.trim();
let hash = QueryHash::from_string(sql);
- let query = if let Some(unit) = guard.query(&hash) {
- unit
- } else {
+ let existing_query = {
+ let guard = self.subscriptions.read();
+ guard.query(&hash)
+ };
+ let query: Result, DBError> = existing_query.map(Ok).unwrap_or_else(|| {
// NOTE: The following ensures compliance with the 1.0 sql api.
// Come 1.0, it will have replaced the current compilation stack.
compile_sql_sub(sql, &SchemaViewer::new(&self.relational_db, &*tx, &auth))?;
let compiled = compile_read_only_query(&self.relational_db, &auth, &tx, sql)?;
- Arc::new(ExecutionUnit::new(compiled, hash)?)
+ Ok(Arc::new(ExecutionUnit::new(compiled, hash)?))
+ });
+ let query = match query {
+ Ok(query) => query,
+ Err(e) => {
+ let _ = sender.send_message(SubscriptionMessage {
+ request_id: Some(request.request_id),
+ query_id: Some(request.query_id),
+ timer: Some(timer),
+ result: SubscriptionResult::Error(SubscriptionError {
+ table_id: None,
+ message: e.to_string().into(),
+ }),
+ });
+ return Ok(());
+ }
};
- drop(guard);
-
let table_rows = self.evaluate_initial_subscription(sender.clone(), query.clone(), auth, &tx)?;
// It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
diff --git a/crates/lib/Cargo.toml b/crates/lib/Cargo.toml
index 276e441ed60..8d1f1c1401b 100644
--- a/crates/lib/Cargo.toml
+++ b/crates/lib/Cargo.toml
@@ -22,6 +22,7 @@ proptest = ["dep:proptest", "dep:proptest-derive"]
# Allows using additional test methods.
test = ["proptest", "spacetimedb-sats/test"]
metrics_impls = ["dep:spacetimedb-metrics", "spacetimedb-sats/metrics_impls"]
+enum-map = ["dep:enum-map"]
[dependencies]
spacetimedb-bindings-macro.workspace = true
@@ -39,6 +40,7 @@ itertools.workspace = true
serde = { workspace = true, optional = true }
thiserror.workspace = true
blake3.workspace = true
+enum-map = { workspace = true, optional = true }
# For the 'proptest' feature.
proptest = { workspace = true, optional = true }
diff --git a/crates/lib/src/db/raw_def/v9.rs b/crates/lib/src/db/raw_def/v9.rs
index 8b5d041631e..f223f9b6e63 100644
--- a/crates/lib/src/db/raw_def/v9.rs
+++ b/crates/lib/src/db/raw_def/v9.rs
@@ -411,7 +411,8 @@ pub struct RawReducerDefV9 {
}
/// Special roles a reducer can play in the module lifecycle.
-#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, SpacetimeType)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, SpacetimeType)]
+#[cfg_attr(feature = "enum-map", derive(enum_map::Enum))]
#[sats(crate = crate)]
#[non_exhaustive]
pub enum Lifecycle {
diff --git a/crates/primitives/src/ids.rs b/crates/primitives/src/ids.rs
index 6451b760d84..e4152618862 100644
--- a/crates/primitives/src/ids.rs
+++ b/crates/primitives/src/ids.rs
@@ -107,3 +107,9 @@ system_id! {
pub struct ColId(pub u16);
}
// ColId works differently from other system IDs and is not auto-incremented.
+
+system_id! {
+ /// The index of a reducer as defined in a module's reducers list.
+ // This is never stored in a system table, but is useful to have defined here.
+ pub struct ReducerId(pub u32);
+}
diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs
index 8b6c184dc8f..c129f25d933 100644
--- a/crates/primitives/src/lib.rs
+++ b/crates/primitives/src/lib.rs
@@ -7,7 +7,7 @@ mod ids;
pub use attr::{AttributeKind, ColumnAttribute, ConstraintKind, Constraints};
pub use col_list::{ColList, ColSet};
-pub use ids::{ColId, ConstraintId, IndexId, ScheduleId, SequenceId, TableId};
+pub use ids::{ColId, ConstraintId, IndexId, ReducerId, ScheduleId, SequenceId, TableId};
/// The minimum size of a chunk yielded by a wasm abi RowIter.
pub const ROW_ITER_CHUNK_SIZE: usize = 32 * 1024;
diff --git a/crates/schema/Cargo.toml b/crates/schema/Cargo.toml
index 82fda50506e..75758c15d74 100644
--- a/crates/schema/Cargo.toml
+++ b/crates/schema/Cargo.toml
@@ -10,7 +10,7 @@ rust-version.workspace = true
test = []
[dependencies]
-spacetimedb-lib.workspace = true
+spacetimedb-lib = { workspace = true, features = ["enum-map"] }
spacetimedb-primitives.workspace = true
spacetimedb-sats.workspace = true
spacetimedb-data-structures.workspace = true
@@ -28,6 +28,7 @@ serde_json.workspace = true
smallvec.workspace = true
hashbrown.workspace = true
enum-as-inner.workspace = true
+enum-map.workspace = true
[dev-dependencies]
spacetimedb-lib = { workspace = true, features = ["test"] }
diff --git a/crates/schema/src/def.rs b/crates/schema/src/def.rs
index c43df56f361..72795446aaa 100644
--- a/crates/schema/src/def.rs
+++ b/crates/schema/src/def.rs
@@ -24,6 +24,7 @@ use crate::identifier::Identifier;
use crate::schema::{Schema, TableSchema};
use crate::type_for_generate::{AlgebraicTypeUse, ProductTypeDef, TypespaceForGenerate};
use deserialize::ReducerArgsDeserializeSeed;
+use enum_map::EnumMap;
use hashbrown::Equivalent;
use indexmap::IndexMap;
use itertools::Itertools;
@@ -36,7 +37,7 @@ use spacetimedb_lib::db::raw_def::v9::{
RawSql, RawTableDefV9, RawTypeDefV9, RawUniqueConstraintDataV9, TableAccess, TableType,
};
use spacetimedb_lib::{ProductType, RawModuleDef};
-use spacetimedb_primitives::{ColId, ColList, ColSet, TableId};
+use spacetimedb_primitives::{ColId, ColList, ColSet, ReducerId, TableId};
use spacetimedb_sats::AlgebraicType;
use spacetimedb_sats::{AlgebraicTypeRef, Typespace};
use validate::v9::generate_index_name;
@@ -101,6 +102,9 @@ pub struct ModuleDef {
/// and must be preserved for future calls to `__call_reducer__`.
reducers: IndexMap,
+ /// A map from lifecycle reducer kind to reducer id.
+ lifecycle_reducers: EnumMap>,
+
/// The type definitions of the module definition.
types: HashMap,
@@ -219,14 +223,38 @@ impl ModuleDef {
self.reducers.get(name)
}
+ /// Convenience method to look up a reducer, possibly by a string, returning its id as well.
+ pub fn reducer_full>(
+ &self,
+ name: &K,
+ ) -> Option<(ReducerId, &ReducerDef)> {
+ // If the string IS a valid identifier, we can just look it up.
+ self.reducers.get_full(name).map(|(idx, _, def)| (idx.into(), def))
+ }
+
+ /// Look up a reducer by its id.
+ pub fn reducer_by_id(&self, id: ReducerId) -> &ReducerDef {
+ &self.reducers[id.idx()]
+ }
+
+ /// Look up a reducer by its id.
+ pub fn get_reducer_by_id(&self, id: ReducerId) -> Option<&ReducerDef> {
+ self.reducers.get_index(id.idx()).map(|(_, def)| def)
+ }
+
+ /// Looks up a lifecycle reducer defined in the module.
+ pub fn lifecycle_reducer(&self, lifecycle: Lifecycle) -> Option<(ReducerId, &ReducerDef)> {
+ self.lifecycle_reducers[lifecycle].map(|i| (i, &self.reducers[i.idx()]))
+ }
+
/// Get a `DeserializeSeed` that can pull data from a `Deserializer` and format it into a `ProductType`
/// at the parameter type of the reducer named `name`.
pub fn reducer_arg_deserialize_seed>(
&self,
name: &K,
- ) -> Option {
- let reducer = self.reducer(name)?;
- Some(ReducerArgsDeserializeSeed(self.typespace.with_type(reducer)))
+ ) -> Option<(ReducerId, ReducerArgsDeserializeSeed)> {
+ let (id, reducer) = self.reducer_full(name)?;
+ Some((id, ReducerArgsDeserializeSeed(self.typespace.with_type(reducer))))
}
/// Look up the name corresponding to an `AlgebraicTypeRef`.
@@ -361,6 +389,7 @@ impl From for RawModuleDefV9 {
let ModuleDef {
tables,
reducers,
+ lifecycle_reducers: _,
types,
typespace,
stored_in_table_def: _,
diff --git a/crates/schema/src/def/validate/v9.rs b/crates/schema/src/def/validate/v9.rs
index 6a8c671f85a..533199d5ca6 100644
--- a/crates/schema/src/def/validate/v9.rs
+++ b/crates/schema/src/def/validate/v9.rs
@@ -41,9 +41,10 @@ pub fn validate(def: RawModuleDefV9) -> Result {
let reducers = reducers
.into_iter()
- .map(|reducer| {
+ .enumerate()
+ .map(|(idx, reducer)| {
validator
- .validate_reducer_def(reducer)
+ .validate_reducer_def(reducer, ReducerId(idx as u32))
.map(|reducer_def| (reducer_def.name.clone(), reducer_def))
})
.collect_all_errors();
@@ -90,6 +91,7 @@ pub fn validate(def: RawModuleDefV9) -> Result {
let ModuleValidator {
stored_in_table_def,
typespace_for_generate,
+ lifecycle_reducers,
..
} = validator;
@@ -106,6 +108,7 @@ pub fn validate(def: RawModuleDefV9) -> Result {
stored_in_table_def,
refmap,
row_level_security_raw,
+ lifecycle_reducers,
};
result.generate_indexes();
@@ -134,7 +137,7 @@ struct ModuleValidator<'a> {
type_namespace: HashMap,
/// Reducers that play special lifecycle roles.
- lifecycle_reducers: HashSet,
+ lifecycle_reducers: EnumMap>,
}
impl ModuleValidator<'_> {
@@ -241,7 +244,7 @@ impl ModuleValidator<'_> {
}
/// Validate a reducer definition.
- fn validate_reducer_def(&mut self, reducer_def: RawReducerDefV9) -> Result {
+ fn validate_reducer_def(&mut self, reducer_def: RawReducerDefV9, reducer_id: ReducerId) -> Result {
let RawReducerDefV9 {
name,
params,
@@ -279,9 +282,12 @@ impl ModuleValidator<'_> {
let name = identifier(name);
let lifecycle = lifecycle
- .map(|lifecycle| match self.lifecycle_reducers.insert(lifecycle.clone()) {
- true => Ok(lifecycle),
- false => Err(ValidationError::DuplicateLifecycle { lifecycle }.into()),
+ .map(|lifecycle| match &mut self.lifecycle_reducers[lifecycle] {
+ x @ None => {
+ *x = Some(reducer_id);
+ Ok(lifecycle)
+ }
+ Some(_) => Err(ValidationError::DuplicateLifecycle { lifecycle }.into()),
})
.transpose();
diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs
index 1b10f202819..bb28db46a3f 100644
--- a/crates/table/src/table.rs
+++ b/crates/table/src/table.rs
@@ -280,26 +280,6 @@ impl Table {
Ok((hash, row_ref))
}
- /// Insert a `row` into this table during replay.
- ///
- /// NOTE: This method skips index updating. Use `insert` to insert a row with index updating.
- pub fn insert_for_replay(
- &mut self,
- blob_store: &mut dyn BlobStore,
- row: &ProductValue,
- ) -> Result<(Option, RowPointer), InsertError> {
- // Insert the `row`. There should be no errors
- let (row_ref, blob_bytes) = self.insert_physically_pv(blob_store, row)?;
- let row_ptr = row_ref.pointer();
-
- // SAFETY: We just inserted the row, so `self.is_row_present(row_ptr)` holds.
- let row_hash = unsafe { self.insert_into_pointer_map(blob_store, row_ptr) }?;
-
- self.update_statistics_added_row(blob_bytes);
-
- Ok((row_hash, row_ptr))
- }
-
/// Physically inserts `row` into the page
/// without inserting it logically into the pointer map.
///
@@ -871,7 +851,6 @@ impl Table {
&mut self,
blob_store: &mut dyn BlobStore,
row: &ProductValue,
- skip_index_update: bool,
) -> Result