Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 130 additions & 7 deletions crates/smoketests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use regex::Regex;
use spacetimedb_guard::{ensure_binaries_built, SpacetimeDbGuard};
use std::env;
use std::fs;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::process::{Command, Output, Stdio};
use std::sync::OnceLock;
Expand Down Expand Up @@ -291,6 +292,14 @@ impl ApiResponse {
}
}

#[derive(Clone, Debug, Default)]
pub struct PublishOptions {
pub clear: bool,
pub break_clients: bool,
pub num_replicas: Option<u32>,
pub organization: Option<String>,
}

/// Builder for creating `Smoketest` instances.
pub struct SmoketestBuilder {
module_code: Option<String>,
Expand All @@ -299,6 +308,7 @@ pub struct SmoketestBuilder {
extra_deps: String,
autopublish: bool,
pg_port: Option<u16>,
server_url_override: Option<String>,
}

impl Default for SmoketestBuilder {
Expand All @@ -317,9 +327,15 @@ impl SmoketestBuilder {
extra_deps: String::new(),
autopublish: true,
pg_port: None,
server_url_override: None,
}
}

pub fn server_url(mut self, url: &str) -> Self {
self.server_url_override = Some(url.to_string());
self
}

/// Enables the PostgreSQL wire protocol on the specified port.
pub fn pg_port(mut self, port: u16) -> Self {
self.pg_port = Some(port);
Expand Down Expand Up @@ -393,7 +409,10 @@ impl SmoketestBuilder {
let build_start = Instant::now();

// Check if we're running against a remote server
let (guard, server_url) = if let Some(remote_url) = remote_server_url() {
let (guard, server_url) = if let Some(url) = self.server_url_override {
eprintln!("[REMOTE] Using explicit server URL: {}", url);
(None, url)
} else if let Some(remote_url) = remote_server_url() {
eprintln!("[REMOTE] Using remote server: {}", remote_url);
(None, remote_url)
} else {
Expand Down Expand Up @@ -519,6 +538,16 @@ impl Smoketest {
.context("No spacetimedb_token found in config")
}

pub fn login_with_token(&self, token: &str) -> Result<()> {
let host = self.server_host();
let config_str = format!(
"default_server = \"localhost\"\n\nspacetimedb_token = \"{}\"\n\n[[server_configs]]\nnickname = \"localhost\"\nhost = \"{}\"\nprotocol = \"http\"\n",
token, host
);
fs::write(&self.config_path, config_str).context("Failed to write config.toml")?;
Ok(())
}

/// Runs psql command against the PostgreSQL wire protocol server.
///
/// Returns the output on success, or an error with stderr on failure.
Expand Down Expand Up @@ -708,6 +737,10 @@ log = "0.4"
self.publish_module_opts(Some(name), clear)
}

pub fn publish_module_named_ext(&mut self, name: &str, opts: PublishOptions) -> Result<String> {
self.publish_module_internal_ext(Some(name), opts)
}

/// Re-publishes the module to the existing database identity with optional clear.
///
/// This is useful for testing auto-migrations where you want to update
Expand All @@ -726,13 +759,28 @@ log = "0.4"
self.publish_module_internal(Some(name), clear, break_clients)
}

pub fn publish_module_with_options_ext(&mut self, name: &str, opts: PublishOptions) -> Result<String> {
self.publish_module_internal_ext(Some(name), opts)
}

/// Internal helper for publishing with options.
fn publish_module_opts(&mut self, name: Option<&str>, clear: bool) -> Result<String> {
self.publish_module_internal(name, clear, false)
}

/// Internal helper for publishing with all options.
fn publish_module_internal(&mut self, name: Option<&str>, clear: bool, break_clients: bool) -> Result<String> {
self.publish_module_internal_ext(
name,
PublishOptions {
clear,
break_clients,
..PublishOptions::default()
},
)
}

fn publish_module_internal_ext(&mut self, name: Option<&str>, opts: PublishOptions) -> Result<String> {
let start = Instant::now();

// Determine the WASM path - either precompiled or build it
Expand Down Expand Up @@ -782,14 +830,28 @@ log = "0.4"
"--yes",
];

if clear {
if opts.clear {
args.push("--clear-database");
}

if break_clients {
if opts.break_clients {
args.push("--break-clients");
}

let mut num_replicas_owned: Option<String> = None;
if let Some(n) = opts.num_replicas {
num_replicas_owned = Some(n.to_string());
args.push("--num-replicas");
args.push(num_replicas_owned.as_ref().unwrap());
}

let mut org_owned: Option<String> = None;
if let Some(org) = opts.organization.as_ref() {
org_owned = Some(org.clone());
args.push("--organization");
args.push(org_owned.as_ref().unwrap());
}

let name_owned;
if let Some(n) = name {
name_owned = n.to_string();
Expand Down Expand Up @@ -1074,15 +1136,45 @@ log = "0.4"
self.subscribe_opts(queries, n, false)
}

pub fn subscribe_on(&self, database: &str, queries: &[&str], n: usize) -> Result<Vec<serde_json::Value>> {
self.subscribe_on_opts(database, queries, n, false)
}

/// Starts a subscription with --confirmed flag and waits for N updates.
pub fn subscribe_confirmed(&self, queries: &[&str], n: usize) -> Result<Vec<serde_json::Value>> {
self.subscribe_opts(queries, n, true)
}

pub fn subscribe_on_confirmed(&self, database: &str, queries: &[&str], n: usize) -> Result<Vec<serde_json::Value>> {
self.subscribe_on_opts(database, queries, n, true)
}

/// Internal helper for subscribe with options.
fn subscribe_opts(&self, queries: &[&str], n: usize, confirmed: bool) -> Result<Vec<serde_json::Value>> {
let start = Instant::now();
let identity = self.database_identity.as_ref().context("No database published")?;
self.subscribe_on_impl(identity, queries, n, confirmed, start)
}

fn subscribe_on_opts(
&self,
database: &str,
queries: &[&str],
n: usize,
confirmed: bool,
) -> Result<Vec<serde_json::Value>> {
let start = Instant::now();
self.subscribe_on_impl(database, queries, n, confirmed, start)
}

fn subscribe_on_impl(
&self,
database: &str,
queries: &[&str],
n: usize,
confirmed: bool,
start: Instant,
) -> Result<Vec<serde_json::Value>> {
let config_path_str = self.config_path.to_str().unwrap();

let cli_path = ensure_binaries_built();
Expand All @@ -1093,7 +1185,7 @@ log = "0.4"
"subscribe",
"--server",
&self.server_url,
identity,
database,
"-t",
"30",
"-n",
Expand Down Expand Up @@ -1133,21 +1225,52 @@ log = "0.4"
self.subscribe_background_opts(queries, n, false)
}

pub fn subscribe_background_on(&self, database: &str, queries: &[&str], n: usize) -> Result<SubscriptionHandle> {
self.subscribe_background_on_opts(database, queries, n, false)
}

/// Starts a subscription in the background with --confirmed flag.
pub fn subscribe_background_confirmed(&self, queries: &[&str], n: usize) -> Result<SubscriptionHandle> {
self.subscribe_background_opts(queries, n, true)
}

pub fn subscribe_background_on_confirmed(
&self,
database: &str,
queries: &[&str],
n: usize,
) -> Result<SubscriptionHandle> {
self.subscribe_background_on_opts(database, queries, n, true)
}

/// Internal helper for background subscribe with options.
fn subscribe_background_opts(&self, queries: &[&str], n: usize, confirmed: bool) -> Result<SubscriptionHandle> {
use std::io::{BufRead, BufReader};

let identity = self
.database_identity
.as_ref()
.context("No database published")?
.clone();

self.subscribe_background_on_impl(&identity, queries, n, confirmed)
}

fn subscribe_background_on_opts(
&self,
database: &str,
queries: &[&str],
n: usize,
confirmed: bool,
) -> Result<SubscriptionHandle> {
self.subscribe_background_on_impl(database, queries, n, confirmed)
}

fn subscribe_background_on_impl(
&self,
database: &str,
queries: &[&str],
n: usize,
confirmed: bool,
) -> Result<SubscriptionHandle> {
let cli_path = ensure_binaries_built();
let mut cmd = Command::new(&cli_path);
// Use --print-initial-update so we know when subscription is established
Expand All @@ -1158,7 +1281,7 @@ log = "0.4"
"subscribe".to_string(),
"--server".to_string(),
self.server_url.clone(),
identity,
database.to_string(),
"-t".to_string(),
"30".to_string(),
"-n".to_string(),
Expand Down
Loading