From fef94f35585e473a6d4b83353b00768ee8abe106 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Schn=C3=B6rch?= Date: Tue, 26 May 2026 19:44:21 +0000 Subject: [PATCH 1/9] Refactor WS Client Connector to Use Nested FuturesUnordered - Updated the WS client connector to eliminate the use of tokio::spawn for background tasks. - Introduced a nested FuturesUnordered to manage the read, write, keepalive, and reconnect loops. - The connect method now returns a tuple containing the connector and its associated future. - Enhanced the reconnect watcher to send new loop information to the outer future, allowing for seamless reconnections without spawning new tasks. - Updated documentation to reflect changes in the connector's architecture and task management. --- aimdb-core/src/builder.rs | 149 ---- aimdb-core/src/remote/config.rs | 27 +- aimdb-core/src/remote/handler.rs | 262 +++---- aimdb-core/src/remote/mod.rs | 2 + aimdb-core/src/remote/stream.rs | 141 ++++ aimdb-core/src/remote/supervisor.rs | 103 ++- .../src/client/builder.rs | 27 +- .../src/client/connector.rs | 254 +++++-- docs/design/030-M13-aimx-remote-spawn-free.md | 682 ++++++++++++++++++ 9 files changed, 1251 insertions(+), 396 deletions(-) create mode 100644 aimdb-core/src/remote/stream.rs create mode 100644 docs/design/030-M13-aimx-remote-spawn-free.md diff --git a/aimdb-core/src/builder.rs b/aimdb-core/src/builder.rs index 277a2fff..b49a9146 100644 --- a/aimdb-core/src/builder.rs +++ b/aimdb-core/src/builder.rs @@ -1327,155 +1327,6 @@ impl AimDb { self.inner.set_record_from_json(record_name, json_value) } - /// Subscribe to record updates as JSON stream (std only) - /// - /// Creates a subscription to a record's buffer and forwards updates as JSON - /// to a bounded channel. This is used internally by the remote access protocol - /// for implementing `record.subscribe`. - /// - /// # Architecture - /// - /// Spawns a consumer task that: - /// 1. Subscribes to the record's buffer using the existing buffer API - /// 2. Reads values as they arrive - /// 3. Serializes each value to JSON - /// 4. Sends JSON values to a bounded channel (with backpressure handling) - /// 5. Terminates when either: - /// - The cancel signal is received (unsubscribe) - /// - The channel receiver is dropped (client disconnected) - /// - /// # Arguments - /// * `record_key` - Key of the record to subscribe to - /// * `queue_size` - Size of the bounded channel for this subscription - /// - /// # Returns - /// `Ok((receiver, cancel_tx))` where: - /// - `receiver`: Bounded channel receiver for JSON values - /// - `cancel_tx`: One-shot sender to cancel the subscription - /// - /// `Err` if: - /// - Record not found for the given key - /// - Record not configured with `.with_remote_access()` - /// - Failed to subscribe to buffer - /// - /// # Example (internal use) - /// - /// ```rust,ignore - /// let (mut rx, cancel_tx) = db.subscribe_record_updates("sensor.temp", 100)?; - /// - /// // Read events - /// while let Some(json_value) = rx.recv().await { - /// // Forward to client... - /// } - /// - /// // Cancel subscription - /// let _ = cancel_tx.send(()); - /// ``` - #[cfg(feature = "std")] - #[allow(unused_variables)] // Variables used only in tracing feature - pub fn subscribe_record_updates( - &self, - record_key: &str, - queue_size: usize, - ) -> DbResult<( - tokio::sync::mpsc::Receiver, - tokio::sync::oneshot::Sender<()>, - )> { - use tokio::sync::{mpsc, oneshot}; - - // Find the record by key - let id = self - .inner - .resolve_str(record_key) - .ok_or_else(|| DbError::RecordKeyNotFound { - key: record_key.to_string(), - })?; - - let record = self - .inner - .storage(id) - .ok_or_else(|| DbError::InvalidRecordId { id: id.raw() })?; - - // Subscribe to the record's buffer as JSON stream - // This will fail if record not configured with .with_remote_access() - let mut json_reader = record.subscribe_json()?; - - // Create channels for the subscription - let (value_tx, value_rx) = mpsc::channel(queue_size); - let (cancel_tx, mut cancel_rx) = oneshot::channel(); - - // Get metadata for logging - let type_id = self.inner.types[id.index()]; - let key = self.inner.keys[id.index()]; - let record_metadata = record.collect_metadata(type_id, key, id); - - // Bridge state (design 028 §"Remote supervisor"): drop into `tokio::spawn` - // directly. The nested-`FuturesUnordered` rewrite is the AimX follow-up. - tokio::spawn(async move { - #[cfg(feature = "tracing")] - tracing::debug!( - "Subscription consumer task started for {}", - record_metadata.name - ); - - // Main event loop: read from buffer and forward to channel - loop { - tokio::select! { - // Handle cancellation signal - _ = &mut cancel_rx => { - #[cfg(feature = "tracing")] - tracing::debug!("Subscription cancelled"); - break; - } - // Read next JSON value from buffer - result = json_reader.recv_json() => { - match result { - Ok(json_val) => { - // Send JSON value to subscription channel - if value_tx.send(json_val).await.is_err() { - #[cfg(feature = "tracing")] - tracing::debug!("Subscription receiver dropped"); - break; - } - } - Err(DbError::BufferLagged { lag_count, .. }) => { - // Consumer fell behind - log warning but continue - #[cfg(feature = "tracing")] - tracing::warn!( - "Subscription for {} lagged by {} messages", - record_metadata.name, - lag_count - ); - // Continue reading - next recv will get latest - } - Err(DbError::BufferClosed { .. }) => { - // Buffer closed (shutdown) - exit gracefully - #[cfg(feature = "tracing")] - tracing::debug!("Buffer closed for {}", record_metadata.name); - break; - } - Err(e) => { - // Other error (shouldn't happen in practice) - #[cfg(feature = "tracing")] - tracing::error!( - "Subscription error for {}: {:?}", - record_metadata.name, - e - ); - break; - } - } - } - } - } - - #[cfg(feature = "tracing")] - tracing::debug!("Subscription consumer task terminated"); - }); - - Ok((value_rx, cancel_tx)) - } - /// Collects inbound connector routes for automatic router construction (std only) /// /// Iterates all records, filters their inbound_connectors by scheme, diff --git a/aimdb-core/src/remote/config.rs b/aimdb-core/src/remote/config.rs index c3b6ec3b..dfc9c86e 100644 --- a/aimdb-core/src/remote/config.rs +++ b/aimdb-core/src/remote/config.rs @@ -16,10 +16,22 @@ pub struct AimxConfig { /// Security policy (read-only or read-write) pub security_policy: SecurityPolicy, - /// Maximum number of concurrent connections + /// Maximum number of concurrent client connections accepted by the + /// supervisor. When the in-flight connection count reaches this + /// value, newly-accepted Unix sockets are closed immediately (the + /// client sees a closed socket on connect). pub max_connections: usize, - /// Subscription queue size per client per subscription + /// Maximum number of concurrent subscriptions allowed per connection. + /// Once a connection holds this many subscriptions, further + /// `record.subscribe` calls receive a `too_many_subscriptions` error + /// until one is released via `record.unsubscribe`. + pub max_subs_per_connection: usize, + + /// Subscription queue size per client per subscription. Retained for + /// protocol-response compatibility; the per-sub mpsc channel it used + /// to bound was eliminated when subscriptions moved to a nested + /// [`FuturesUnordered`]. pub subscription_queue_size: usize, /// Optional authentication token @@ -37,6 +49,7 @@ impl AimxConfig { /// - Socket path: `/tmp/aimdb.sock` /// - Security policy: Read-only /// - Max connections: 16 + /// - Max subscriptions per connection: 32 /// - Subscription queue size: 100 /// - No auth token /// - Socket permissions: 0o600 (owner-only) @@ -45,6 +58,7 @@ impl AimxConfig { socket_path: PathBuf::from("/tmp/aimdb.sock"), security_policy: SecurityPolicy::ReadOnly, max_connections: 16, + max_subs_per_connection: 32, subscription_queue_size: 100, auth_token: None, socket_permissions: Some(0o600), @@ -69,6 +83,12 @@ impl AimxConfig { self } + /// Sets the maximum number of concurrent subscriptions per connection + pub fn max_subs_per_connection(mut self, max: usize) -> Self { + self.max_subs_per_connection = max; + self + } + /// Sets the subscription queue size per client pub fn subscription_queue_size(mut self, size: usize) -> Self { self.subscription_queue_size = size; @@ -206,6 +226,7 @@ mod tests { let config = AimxConfig::uds_default(); assert_eq!(config.socket_path, PathBuf::from("/tmp/aimdb.sock")); assert_eq!(config.max_connections, 16); + assert_eq!(config.max_subs_per_connection, 32); assert_eq!(config.subscription_queue_size, 100); assert!(matches!(config.security_policy, SecurityPolicy::ReadOnly)); assert!(config.auth_token.is_none()); @@ -217,12 +238,14 @@ mod tests { let config = AimxConfig::uds_default() .socket_path("/var/run/aimdb.sock") .max_connections(32) + .max_subs_per_connection(8) .subscription_queue_size(200) .auth_token("secret-token") .socket_permissions(0o660); assert_eq!(config.socket_path, PathBuf::from("/var/run/aimdb.sock")); assert_eq!(config.max_connections, 32); + assert_eq!(config.max_subs_per_connection, 8); assert_eq!(config.subscription_queue_size, 200); assert_eq!(config.auth_token, Some("secret-token".to_string())); assert_eq!(config.socket_permissions, Some(0o660)); diff --git a/aimdb-core/src/remote/handler.rs b/aimdb-core/src/remote/handler.rs index c52e591b..c101db65 100644 --- a/aimdb-core/src/remote/handler.rs +++ b/aimdb-core/src/remote/handler.rs @@ -19,8 +19,14 @@ use crate::{AimDb, DbError, DbResult}; #[cfg(feature = "std")] use std::collections::HashMap; #[cfg(feature = "std")] +use std::sync::atomic::{AtomicBool, Ordering}; +#[cfg(feature = "std")] use std::sync::Arc; +#[cfg(feature = "std")] +use futures_core::Stream; +#[cfg(feature = "std")] +use futures_util::stream::{FuturesUnordered, StreamExt}; #[cfg(feature = "std")] use serde_json::json; #[cfg(feature = "std")] @@ -29,39 +35,29 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixStream; #[cfg(feature = "std")] use tokio::sync::mpsc; -#[cfg(feature = "std")] -use tokio::sync::oneshot; -/// Handle for an active subscription -/// -/// Tracks the state needed to manage a single subscription's lifecycle. #[cfg(feature = "std")] -#[allow(dead_code)] // record_name used only in tracing feature -struct SubscriptionHandle { - /// Unique subscription identifier (returned to client) - subscription_id: String, - - /// Record name being subscribed to - record_name: String, - - /// Signal to cancel this subscription - /// When sent, the consumer task will terminate - cancel_tx: oneshot::Sender<()>, -} +use crate::builder::BoxFuture; /// Connection state for managing subscriptions /// -/// Tracks all active subscriptions for a single client connection. +/// Tracks all active subscriptions for a single client connection. Each +/// subscription is identified by its `subscription_id` and carries an +/// `Arc` that the `record.unsubscribe` handler flips to signal +/// the per-subscription future to exit on its next poll. Connection +/// teardown does not need to flip the flag — dropping the per-connection +/// `FuturesUnordered` (in [`handle_connection`]) drops every subscription +/// future, which is the primary cancellation path. #[cfg(feature = "std")] struct ConnectionState { - /// Active subscriptions by subscription_id - subscriptions: HashMap, + /// Active subscriptions by subscription_id → cancel flag. + subscriptions: HashMap>, /// Counter for generating unique subscription IDs next_subscription_id: u64, - /// Event funnel: all subscription tasks send events here - /// This channel feeds the single writer task + /// Event funnel: all subscription futures send events here. + /// This channel feeds the connection's send loop. event_tx: mpsc::UnboundedSender, /// Per-record drain readers, created lazily on first record.drain call. @@ -87,35 +83,6 @@ impl ConnectionState { self.next_subscription_id += 1; id } - - /// Adds a subscription to the connection state - fn add_subscription(&mut self, handle: SubscriptionHandle) { - self.subscriptions - .insert(handle.subscription_id.clone(), handle); - } - - /// Removes and returns a subscription by ID - #[allow(dead_code)] - fn remove_subscription(&mut self, subscription_id: &str) -> Option { - self.subscriptions.remove(subscription_id) - } - - /// Cancels all active subscriptions - /// - /// Sends cancel signals to all subscription tasks and clears the map. - /// Called when the client disconnects. - async fn cancel_all_subscriptions(&mut self) { - #[cfg(feature = "tracing")] - tracing::info!( - "Canceling {} active subscriptions", - self.subscriptions.len() - ); - - for (_id, handle) in self.subscriptions.drain() { - // Send cancel signal (ignore if receiver already dropped) - let _ = handle.cancel_tx.send(()); - } - } } /// Handles an incoming client connection @@ -175,17 +142,27 @@ where #[cfg(feature = "tracing")] tracing::info!("Handshake complete, client ready"); - // Create event funnel: all subscription tasks will send events here + // Create event funnel: all subscription futures send events here let (event_tx, mut event_rx) = mpsc::unbounded_channel::(); // Initialize connection state let mut conn_state = ConnectionState::new(event_tx); - // Main loop: interleave reading requests and writing events + // Per-connection FuturesUnordered of subscription futures. Each + // `record.subscribe` pushes one future here; cancellation flows + // through `Arc` (Unsubscribe) or by dropping `subs` when + // the connection ends. + let mut subs: FuturesUnordered = FuturesUnordered::new(); + + // Main loop: interleave reading requests, writing events, and draining + // completed subscription futures. `biased;` keeps request reads polled + // first so a chatty subscription cannot starve the request path. loop { let mut line = String::new(); tokio::select! { + biased; + // Handle incoming requests read_result = stream.read_line(&mut line) => { match read_result { @@ -219,7 +196,14 @@ where }; // Dispatch request to appropriate handler - let response = handle_request(&db, &config, &mut conn_state, request).await; + let response = handle_request( + &db, + &config, + &mut conn_state, + &mut subs, + request, + ) + .await; // Send response if let Err(_e) = send_response(&mut stream, &response).await { @@ -244,11 +228,18 @@ where break; } } + + // Drain finished subscription futures so `subs` does not grow + // unboundedly. The guard ensures the arm is disabled (not polled) + // when the set is empty — `FuturesUnordered::is_terminated()` + // returns `is_empty()`, so `select_next_some` would panic. + _ = subs.next(), if !subs.is_empty() => {} } } - // Cleanup: cancel all active subscriptions - conn_state.cancel_all_subscriptions().await; + // Dropping `subs` here cancels every still-running subscription future + // — the connection's `FuturesUnordered` is their sole owner. + drop(subs); #[cfg(feature = "tracing")] tracing::info!("Connection handler terminating"); @@ -552,6 +543,7 @@ async fn handle_request( db: &Arc>, config: &AimxConfig, conn_state: &mut ConnectionState, + subs: &mut FuturesUnordered, request: Request, ) -> Response where @@ -569,7 +561,7 @@ where "record.get" => handle_record_get(db, config, request.id, request.params).await, "record.set" => handle_record_set(db, config, request.id, request.params).await, "record.subscribe" => { - handle_record_subscribe(db, config, conn_state, request.id, request.params).await + handle_record_subscribe(db, config, conn_state, subs, request.id, request.params).await } "record.unsubscribe" => { handle_record_unsubscribe(conn_state, request.id, request.params).await @@ -928,12 +920,15 @@ where /// Handles record.subscribe method /// -/// Subscribes to live updates for a record. +/// Subscribes to live updates for a record. Pushes a per-subscription +/// future onto the connection's [`FuturesUnordered`] (`subs`) — there is +/// no `tokio::spawn`; the connection's outer loop drives the future. /// /// # Arguments /// * `db` - Database instance /// * `config` - Remote access configuration /// * `conn_state` - Connection state (for subscription tracking) +/// * `subs` - Per-connection set of subscription futures (this fn pushes one) /// * `request_id` - Request ID for the response /// * `params` - Request parameters (must contain "name" field with record name) /// @@ -947,6 +942,7 @@ async fn handle_record_subscribe( db: &Arc>, config: &AimxConfig, conn_state: &mut ConnectionState, + subs: &mut FuturesUnordered, request_id: u64, params: Option, ) -> Response @@ -991,13 +987,13 @@ where #[cfg(feature = "tracing")] tracing::debug!("Subscribing to record: {}", record_name); - // Check max subscriptions limit - if conn_state.subscriptions.len() >= config.subscription_queue_size { + // Check max subscriptions per connection. + if conn_state.subscriptions.len() >= config.max_subs_per_connection { #[cfg(feature = "tracing")] tracing::warn!( "Too many subscriptions: {} (max: {})", conn_state.subscriptions.len(), - config.subscription_queue_size + config.max_subs_per_connection ); return Response::error( @@ -1005,54 +1001,53 @@ where "too_many_subscriptions", format!( "Maximum subscriptions reached: {}", - config.subscription_queue_size + config.max_subs_per_connection ), ); } - // Generate unique subscription ID - let subscription_id = conn_state.generate_subscription_id(); + // Subscribe to the record's JSON event stream + let value_stream = match crate::remote::stream::stream_record_updates(db, &record_name) { + Ok(s) => s, + Err(e) => { + // Map internal errors to appropriate response codes + let (code, message) = match &e { + crate::DbError::RecordKeyNotFound { key } => { + #[cfg(feature = "tracing")] + tracing::warn!("Record not found: {}", key); + ("not_found", format!("Record '{}' not found", key)) + } + _ => { + #[cfg(feature = "tracing")] + tracing::error!("Failed to subscribe to record updates: {}", e); + ("internal_error", format!("Failed to subscribe: {}", e)) + } + }; - // Subscribe to record updates via the database API (using record key) - let (value_rx, cancel_tx) = - match db.subscribe_record_updates(&record_name, config.subscription_queue_size) { - Ok(channels) => channels, - Err(e) => { - // Map internal errors to appropriate response codes - let (code, message) = match &e { - crate::DbError::RecordKeyNotFound { key } => { - #[cfg(feature = "tracing")] - tracing::warn!("Record not found: {}", key); - ("not_found", format!("Record '{}' not found", key)) - } - _ => { - #[cfg(feature = "tracing")] - tracing::error!("Failed to subscribe to record updates: {}", e); - ("internal_error", format!("Failed to subscribe: {}", e)) - } - }; + return Response::error(request_id, code, message); + } + }; - return Response::error(request_id, code, message); - } - }; + // Generate unique subscription ID and cancel flag + let subscription_id = conn_state.generate_subscription_id(); + let cancel = Arc::new(AtomicBool::new(false)); - // Spawn event streaming task for this subscription + // Push the subscription future onto the connection's set. The future + // is dropped — and therefore cancelled — when either the cancel flag + // is flipped (Unsubscribe) or the outer connection loop exits. let event_tx = conn_state.event_tx.clone(); - let sub_id_clone = subscription_id.clone(); - let stream_handle = tokio::spawn(async move { - stream_subscription_events(sub_id_clone, value_rx, event_tx).await; - }); - - // Store subscription handle - let handle = SubscriptionHandle { - subscription_id: subscription_id.clone(), - record_name: record_name.clone(), - cancel_tx, - }; - conn_state.add_subscription(handle); - - // Detach the streaming task (it will run until cancelled or channel closes) - std::mem::drop(stream_handle); + let sub_id_for_future = subscription_id.clone(); + let cancel_for_future = cancel.clone(); + subs.push(Box::pin(run_subscription( + value_stream, + sub_id_for_future, + event_tx, + cancel_for_future, + ))); + + conn_state + .subscriptions + .insert(subscription_id.clone(), cancel); #[cfg(feature = "tracing")] tracing::info!( @@ -1071,37 +1066,50 @@ where ) } -/// Streams subscription events from value channel to event channel +/// Per-subscription future: forwards JSON values from the record stream +/// into the connection's event funnel as `Event` messages with sequence +/// numbers and RFC-style "secs.nanos" timestamps. /// -/// Reads JSON values from the subscription's receiver and converts them -/// into Event messages with sequence numbers and timestamps. +/// Exits when any of: +/// - the `cancel` flag is set (by `record.unsubscribe`) — checked after +/// each stream poll, so cancellation has up to a one-event delay; +/// - the upstream stream ends (e.g. `BufferClosed`); +/// - the event funnel is closed (connection going down). /// -/// # Arguments -/// * `subscription_id` - Unique subscription identifier -/// * `value_rx` - Receiver for JSON values from the database -/// * `event_tx` - Sender for Event messages to the client +/// Connection-close cancellation does not rely on the flag — the +/// connection's `FuturesUnordered` is the sole owner of this future and +/// dropping the set drops the future. #[cfg(feature = "std")] -async fn stream_subscription_events( +async fn run_subscription( + stream: S, subscription_id: String, - mut value_rx: tokio::sync::mpsc::Receiver, - event_tx: tokio::sync::mpsc::UnboundedSender, -) { + event_tx: mpsc::UnboundedSender, + cancel: Arc, +) where + S: Stream + Send + 'static, +{ + futures_util::pin_mut!(stream); let mut sequence: u64 = 1; #[cfg(feature = "tracing")] tracing::debug!( - "Event streaming task started for subscription: {}", + "Subscription future started for subscription: {}", subscription_id ); - while let Some(json_value) = value_rx.recv().await { + while let Some(json_value) = stream.next().await { + if cancel.load(Ordering::Relaxed) { + #[cfg(feature = "tracing")] + tracing::debug!("Subscription {} cancelled via Unsubscribe", subscription_id); + break; + } + // Generate timestamp in "secs.nanosecs" format let duration = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default(); let timestamp = format!("{}.{:09}", duration.as_secs(), duration.subsec_nanos()); - // Create event let event = Event { subscription_id: subscription_id.clone(), sequence, @@ -1110,11 +1118,10 @@ async fn stream_subscription_events( dropped: None, // TODO: Implement dropped event tracking }; - // Send event to the funnel if event_tx.send(event).is_err() { #[cfg(feature = "tracing")] tracing::debug!( - "Event channel closed, terminating stream for subscription: {}", + "Event channel closed, terminating subscription: {}", subscription_id ); break; @@ -1124,10 +1131,7 @@ async fn stream_subscription_events( } #[cfg(feature = "tracing")] - tracing::debug!( - "Event streaming task terminated for subscription: {}", - subscription_id - ); + tracing::debug!("Subscription future terminated: {}", subscription_id); } /// Handles record.unsubscribe method @@ -1171,19 +1175,15 @@ async fn handle_record_unsubscribe( #[cfg(feature = "tracing")] tracing::debug!("Unsubscribing from subscription_id: {}", subscription_id); - // Look up and remove the subscription + // Look up and remove the subscription. Setting the cancel flag tells + // the per-subscription future to exit on its next poll; the future + // itself is reaped from `subs` by the connection's outer drain loop. match conn_state.subscriptions.remove(&subscription_id) { - Some(handle) => { - // Send cancellation signal to the streaming task - // It's okay if this fails (task may have already terminated) - let _ = handle.cancel_tx.send(()); + Some(cancel) => { + cancel.store(true, Ordering::Relaxed); #[cfg(feature = "tracing")] - tracing::debug!( - "Cancelled subscription {} for record {}", - subscription_id, - handle.record_name - ); + tracing::debug!("Cancelled subscription {}", subscription_id); Response::success( request_id, diff --git a/aimdb-core/src/remote/mod.rs b/aimdb-core/src/remote/mod.rs index 15a321c5..8053ff04 100644 --- a/aimdb-core/src/remote/mod.rs +++ b/aimdb-core/src/remote/mod.rs @@ -46,4 +46,6 @@ pub use protocol::{ErrorObject, Event, HelloMessage, Request, Response, WelcomeM // Internal exports for implementation pub(crate) mod handler; +#[cfg(feature = "std")] +pub(crate) mod stream; pub(crate) mod supervisor; diff --git a/aimdb-core/src/remote/stream.rs b/aimdb-core/src/remote/stream.rs new file mode 100644 index 00000000..b582867b --- /dev/null +++ b/aimdb-core/src/remote/stream.rs @@ -0,0 +1,141 @@ +//! Record-update streaming helper for AimX remote access. +//! +//! [`stream_record_updates`] adapts a record's [`JsonBufferReader`] into a +//! plain [`Stream`] of JSON values. Cancellation is by `drop`; backpressure +//! is the underlying buffer's responsibility. The handler holds the +//! returned stream inside its per-subscription future so that the +//! connection's `FuturesUnordered` is the sole owner of the subscription's +//! lifecycle. + +#[cfg(feature = "std")] +use crate::{AimDb, DbError, DbResult}; + +#[cfg(feature = "std")] +use futures_core::Stream; +#[cfg(feature = "std")] +use futures_util::stream::unfold; + +/// Subscribe to a record and yield each update as a JSON value. +/// +/// The returned stream owns the underlying [`JsonBufferReader`]; dropping +/// it cancels the subscription. Lag (`BufferLagged`) is logged via +/// `tracing` and skipped; `BufferClosed` and other errors terminate the +/// stream cleanly (next `poll_next` returns `None`). +/// +/// # Errors +/// - [`DbError::RecordKeyNotFound`] if no record matches `record_key`. +/// - [`DbError::InvalidRecordId`] if the resolved id has no storage. +/// - Any error returned by `subscribe_json()` (e.g. the record was not +/// configured with `.with_remote_access()`). +#[cfg(feature = "std")] +pub(crate) fn stream_record_updates( + db: &AimDb, + record_key: &str, +) -> DbResult + Send + 'static> +where + R: aimdb_executor::RuntimeAdapter + 'static, +{ + let inner = db.inner(); + let id = inner + .resolve_str(record_key) + .ok_or_else(|| DbError::RecordKeyNotFound { + key: record_key.to_string(), + })?; + let record = inner + .storage(id) + .ok_or(DbError::InvalidRecordId { id: id.raw() })?; + let reader = record.subscribe_json()?; + + Ok(unfold(reader, |mut reader| async move { + loop { + match reader.recv_json().await { + Ok(value) => return Some((value, reader)), + Err(DbError::BufferLagged { + lag_count: _lag_count, + .. + }) => { + #[cfg(feature = "tracing")] + tracing::warn!( + "stream_record_updates: subscription lagged by {} messages", + _lag_count + ); + continue; + } + Err(DbError::BufferClosed { .. }) => return None, + Err(_e) => { + #[cfg(feature = "tracing")] + tracing::error!("stream_record_updates: terminating on error: {:?}", _e); + return None; + } + } + } + })) +} + +#[cfg(all(test, feature = "std"))] +mod tests { + use super::*; + use crate::buffer::JsonBufferReader; + use futures_util::StreamExt; + use std::pin::Pin; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + /// Fake reader that yields a configurable sequence: Ok, Lagged, Ok, Closed. + /// Lets us exercise the skip-and-continue and end-of-stream branches + /// without standing up an actual record + buffer. + struct FakeReader { + step: Arc, + } + + impl JsonBufferReader for FakeReader { + fn recv_json( + &mut self, + ) -> Pin< + Box> + Send + '_>, + > { + let step = self.step.clone(); + Box::pin(async move { + let s = step.fetch_add(1, Ordering::SeqCst); + match s { + 0 => Ok(serde_json::json!({"v": 1})), + 1 => Err(DbError::BufferLagged { + buffer_name: "test".to_string(), + lag_count: 7, + }), + 2 => Ok(serde_json::json!({"v": 2})), + _ => Err(DbError::BufferClosed { + buffer_name: "test".to_string(), + }), + } + }) + } + + fn try_recv_json(&mut self) -> Result { + Err(DbError::BufferEmpty) + } + } + + #[tokio::test] + async fn unfold_skips_lag_and_terminates_on_closed() { + let reader: Box = Box::new(FakeReader { + step: Arc::new(AtomicUsize::new(0)), + }); + + let stream = unfold(reader, |mut reader| async move { + loop { + match reader.recv_json().await { + Ok(v) => return Some((v, reader)), + Err(DbError::BufferLagged { .. }) => continue, + Err(DbError::BufferClosed { .. }) => return None, + Err(_) => return None, + } + } + }); + + let values: Vec<_> = stream.collect().await; + assert_eq!(values.len(), 2); + assert_eq!(values[0], serde_json::json!({"v": 1})); + assert_eq!(values[1], serde_json::json!({"v": 2})); + } +} diff --git a/aimdb-core/src/remote/supervisor.rs b/aimdb-core/src/remote/supervisor.rs index ea5ab86a..7a75ff68 100644 --- a/aimdb-core/src/remote/supervisor.rs +++ b/aimdb-core/src/remote/supervisor.rs @@ -1,7 +1,9 @@ //! Remote access supervisor //! -//! Manages the Unix domain socket server and spawns connection handlers for -//! remote clients connecting via the AimX protocol. +//! Manages the Unix domain socket server and drives per-connection +//! handlers for remote clients connecting via the AimX protocol. Each +//! accepted connection is pushed onto a per-supervisor +//! [`FuturesUnordered`]; there is no `tokio::spawn`. use crate::builder::BoxFuture; use crate::remote::AimxConfig; @@ -13,6 +15,8 @@ use std::sync::Arc; #[cfg(feature = "std")] use std::os::unix::fs::PermissionsExt; +#[cfg(feature = "std")] +use futures_util::stream::{FuturesUnordered, StreamExt}; #[cfg(feature = "std")] use tokio::net::UnixListener; @@ -21,11 +25,11 @@ use tokio::net::UnixListener; /// Synchronously: binds the Unix domain socket and sets file permissions /// (so binding errors surface from `build()` rather than at task-start time). /// -/// The returned `BoxFuture` is appended to the `AimDbRunner` accumulator; -/// when driven, it accepts incoming connections in a loop and uses -/// `tokio::spawn` to dispatch a per-connection handler (bridge state per -/// design 028 §"Remote supervisor" — future work in the AimX portability -/// follow-up replaces this with a nested `FuturesUnordered`). +/// The returned [`BoxFuture`] is appended to the `AimDbRunner` accumulator; +/// when driven, it accepts incoming connections in a loop and pushes each +/// per-connection handler onto a [`FuturesUnordered`]. `tokio::select!` +/// with `biased;` keeps `accept` polled ahead of connection drains so a +/// chatty client cannot starve new connects. /// /// # Arguments /// * `db` - Database instance (for introspection and subscriptions) @@ -104,46 +108,69 @@ where #[cfg(feature = "tracing")] tracing::info!("Socket permissions set to {:o}", permissions); - // The accept loop is the future the runner drives. Per-connection handlers - // still use `tokio::spawn` (AimX is `#[cfg(feature = "std")]`-gated — see - // design doc 028 §"Out of Scope" / AimX follow-up). + // The accept loop is the future the runner drives. Per-connection + // handler futures live in a `FuturesUnordered` owned by this future; + // dropping the supervisor (e.g. when the runner is cancelled) drops + // every active connection in turn. let supervisor_future: BoxFuture = Box::pin(async move { #[cfg(feature = "tracing")] tracing::info!("Remote access supervisor task started"); - loop { - match listener.accept().await { - Ok((stream, _addr)) => { - #[cfg(feature = "tracing")] - tracing::debug!("Accepted new client connection"); - - let db_clone = db.clone(); - let config_clone = config.clone(); + let mut connections: FuturesUnordered = FuturesUnordered::new(); - tokio::spawn(async move { - #[cfg(feature = "tracing")] - tracing::debug!("Connection handler spawned for client"); - - if let Err(_e) = crate::remote::handler::handle_connection( - db_clone, - config_clone, - stream, - ) - .await - { + loop { + tokio::select! { + biased; + + // Accept the next incoming connection + accept_res = listener.accept() => match accept_res { + Ok((stream, _addr)) => { + // Refuse if we are already at the connection cap. + // The accepted `UnixStream` is dropped, which closes + // the socket; the client sees a closed connection. + if connections.len() >= config.max_connections { #[cfg(feature = "tracing")] - tracing::error!("Connection handler error: {}", _e); + tracing::warn!( + "max_connections={} reached, refusing new client", + config.max_connections + ); + drop(stream); + continue; } #[cfg(feature = "tracing")] - tracing::debug!("Connection handler terminated"); - }); - } - Err(_e) => { - #[cfg(feature = "tracing")] - tracing::error!("Failed to accept connection: {}", _e); - // Continue accepting other connections despite error - } + tracing::debug!("Accepted new client connection"); + + let db_clone = db.clone(); + let config_clone = config.clone(); + connections.push(Box::pin(async move { + if let Err(_e) = crate::remote::handler::handle_connection( + db_clone, + config_clone, + stream, + ) + .await + { + #[cfg(feature = "tracing")] + tracing::error!("Connection handler error: {}", _e); + } + + #[cfg(feature = "tracing")] + tracing::debug!("Connection handler terminated"); + })); + } + Err(_e) => { + #[cfg(feature = "tracing")] + tracing::error!("Failed to accept connection: {}", _e); + // Continue accepting other connections despite error + } + }, + + // Drain finished connection futures. The guard ensures the + // arm is disabled (not polled) when the set is empty — + // `FuturesUnordered::is_terminated()` returns `is_empty()`, + // so `select_next_some` would panic. + _ = connections.next(), if !connections.is_empty() => {} } } }); diff --git a/aimdb-websocket-connector/src/client/builder.rs b/aimdb-websocket-connector/src/client/builder.rs index be5c20ce..d8bccba5 100644 --- a/aimdb-websocket-connector/src/client/builder.rs +++ b/aimdb-websocket-connector/src/client/builder.rs @@ -10,11 +10,11 @@ //! AimDbBuilder::build() //! └─ WsClientConnectorBuilder::build(&db) //! ├─ db.collect_inbound_routes("ws-client") → Router -//! ├─ db.collect_outbound_routes("ws-client") → outbound tasks +//! ├─ db.collect_outbound_routes("ws-client") → outbound futures //! ├─ connect to remote WebSocket server -//! ├─ spawn receive loop (router dispatch) -//! ├─ spawn outbound publisher tasks -//! └─ return Arc +//! ├─ build connector_future (read + write + keepalive + reconnect) +//! ├─ build outbound publisher futures +//! └─ return Vec (drained by AimDbRunner) //! ``` use std::{pin::Pin, sync::Arc, time::Duration}; @@ -190,21 +190,24 @@ where subscribe_topics: topics, }; - // ── Build the connector (internal tasks are spawned via tokio::spawn — Group 4) ── - let connector = WsClientConnectorImpl::connect(config, router, db) + // ── Build the connector and collect its infrastructure future ── + // The connector future owns a `FuturesUnordered` driving the + // write/read/keepalive loops and reconnect watcher; no + // `tokio::spawn` is involved. + let (connector, connector_future) = WsClientConnectorImpl::connect(config, router, db) .await .map_err(|e| aimdb_core::DbError::RuntimeError { message: format!("WS client connect failed: {}", e), })?; // ── Collect outbound publisher futures ─────────────────── - let futures = connector.collect_outbound_futures(db, outbound_routes); + let mut futures = connector.collect_outbound_futures(db, outbound_routes); - // The `WsClientConnectorImpl` itself is no longer returned (the - // `Arc` was discarded by `AimDbBuilder` anyway). - // Its background tasks live on inside the tokio spawns started in - // `connect()`; the only futures the runner drives from this connector - // are the per-route publishers. + // Prepend the connector's infrastructure future so it gets + // driven alongside the per-route publishers. Order does not + // matter to `FuturesUnordered`, but front-loading the long- + // running infra future keeps logs readable. + futures.insert(0, connector_future); Ok(futures) }) } diff --git a/aimdb-websocket-connector/src/client/connector.rs b/aimdb-websocket-connector/src/client/connector.rs index b8446c99..6bb7afc1 100644 --- a/aimdb-websocket-connector/src/client/connector.rs +++ b/aimdb-websocket-connector/src/client/connector.rs @@ -17,9 +17,33 @@ use aimdb_core::{ OutboundRoute, }; use aimdb_ws_protocol::{ClientMessage, ServerMessage}; +use futures_util::stream::FuturesUnordered; use futures_util::{SinkExt, StreamExt}; use tokio::sync::{mpsc, Mutex}; +/// Boxed `()`-yielding future used for the connector's nested +/// `FuturesUnordered`. Identical in shape to `aimdb_core::builder::BoxFuture`. +type BoxFuture = Pin + Send + 'static>>; + +/// Aliases for the split halves of the underlying WebSocket stream. +/// Defined once so the reconnect-watcher's `NewLoops` payload type +/// stays readable. +type WsStream = + tokio_tungstenite::WebSocketStream>; +type WsWriteSink = + futures_util::stream::SplitSink; +type WsReadStream = futures_util::stream::SplitStream; + +/// Sent from the reconnect watcher to the outer connector future after a +/// successful reconnect. The outer loop pushes one write-loop future and +/// one read-loop future built from these halves into its +/// `FuturesUnordered`. +struct NewLoops { + write_sink: WsWriteSink, + read_stream: WsReadStream, + write_rx: mpsc::UnboundedReceiver, +} + // ════════════════════════════════════════════════════════════════════ // Configuration // ════════════════════════════════════════════════════════════════════ @@ -84,12 +108,20 @@ pub struct WsClientConnectorImpl { } impl WsClientConnectorImpl { - /// Connect to the remote WebSocket server and spawn background tasks. + /// Connect to the remote WebSocket server and return a handle plus + /// the infrastructure future that drives the read/write/keepalive + /// loops and the reconnect watcher. + /// + /// The returned [`BoxFuture`] owns a [`FuturesUnordered`] holding all + /// background loops; dropping it (via the runner being cancelled) + /// terminates every loop in one step. On successful reconnect the + /// watcher sends a [`NewLoops`] message that the outer future + /// translates into two fresh futures pushed onto the set. pub(crate) async fn connect( config: WsClientConfig, router: Arc, db: &aimdb_core::builder::AimDb, - ) -> Result + ) -> Result<(Self, BoxFuture), String> where R: aimdb_executor::RuntimeAdapter + 'static, { @@ -114,6 +146,9 @@ impl WsClientConnectorImpl { })); // ── Send subscribe message ────────────────────────────────── + // The mpsc buffers this until the write loop is first polled by + // the runner; the message is delivered as soon as the outer + // future starts. if !config.subscribe_topics.is_empty() { let sub_msg = ClientMessage::Subscribe { topics: config.subscribe_topics.clone(), @@ -127,62 +162,127 @@ impl WsClientConnectorImpl { let reconnect_topics = config.subscribe_topics.clone(); let auto_reconnect = config.auto_reconnect; let max_reconnect_attempts = config.max_reconnect_attempts; - let router_for_reconnect = router.clone(); + let keepalive_interval = config.keepalive_interval; let runtime_ctx: Arc = db.runtime_any(); - // Write loop. Bridge state: tokio::spawn directly (see method doc). - tokio::spawn({ - let state = state.clone(); - async move { - Self::run_write_loop(ws_write, write_rx).await; + // Channel from the reconnect watcher to the outer future. The + // watcher sends a `NewLoops` on each successful reconnect; the + // outer future pushes a fresh write+read future onto its set. + let (new_loops_tx, mut new_loops_rx) = mpsc::unbounded_channel::(); - #[cfg(feature = "tracing")] - tracing::warn!("WS client: write loop ended"); + let state_for_future = state.clone(); + let router_for_future = router.clone(); + let runtime_ctx_for_future = runtime_ctx.clone(); + + let connector_future: BoxFuture = Box::pin(async move { + let mut tasks: FuturesUnordered = FuturesUnordered::new(); - state.lock().await.status = ConnectionStatus::Disconnected; + // Initial write loop. On exit, mark the connection as + // disconnected so the reconnect watcher notices. + { + let state_for_write = state_for_future.clone(); + tasks.push(Box::pin(async move { + Self::run_write_loop(ws_write, write_rx).await; + + #[cfg(feature = "tracing")] + tracing::warn!("WS client: write loop ended"); + + state_for_write.lock().await.status = ConnectionStatus::Disconnected; + })); } - }); - // Read loop. - tokio::spawn({ - let router = router.clone(); - let runtime_ctx = runtime_ctx.clone(); - async move { - Self::run_read_loop(ws_read, &router, Some(&runtime_ctx)).await; + // Initial read loop. + { + let router_for_read = router_for_future.clone(); + let ctx_for_read = runtime_ctx_for_future.clone(); + tasks.push(Box::pin(async move { + Self::run_read_loop(ws_read, &router_for_read, Some(&ctx_for_read)).await; - #[cfg(feature = "tracing")] - tracing::warn!("WS client: read loop ended"); + #[cfg(feature = "tracing")] + tracing::warn!("WS client: read loop ended"); + })); } - }); - // Keepalive. - if let Some(interval) = config.keepalive_interval { - let ka_state = state.clone(); - tokio::spawn(async move { - Self::run_keepalive(ka_state, interval).await; - }); - } + // Keepalive. + if let Some(interval) = keepalive_interval { + let ka_state = state_for_future.clone(); + tasks.push(Box::pin(Self::run_keepalive(ka_state, interval))); + } + + // Reconnect watcher. + if auto_reconnect { + let watcher_state = state_for_future.clone(); + let watcher_router = router_for_future.clone(); + let watcher_ctx = runtime_ctx_for_future.clone(); + let watcher_tx = new_loops_tx.clone(); + tasks.push(Box::pin(Self::run_reconnect_watcher( + watcher_state, + reconnect_url, + reconnect_topics, + watcher_router, + max_reconnect_attempts, + Some(watcher_ctx), + watcher_tx, + ))); + } + // Drop the watcher's sender clone we still hold so the + // `new_loops_rx.recv()` returns `None` once the watcher + // task ends, breaking the outer loop cleanly. + drop(new_loops_tx); + + // Drive the set. `biased;` keeps reconnect handling + // (which churns rarely) polled ahead of the drain arm. + loop { + tokio::select! { + biased; + + // Reconnect produced fresh halves — push new read + + // write futures into the set. + maybe_new = new_loops_rx.recv() => match maybe_new { + Some(NewLoops { write_sink, read_stream, write_rx }) => { + let router_for_read = router_for_future.clone(); + let ctx_for_read = runtime_ctx_for_future.clone(); + let state_for_write = state_for_future.clone(); + tasks.push(Box::pin(async move { + Self::run_write_loop(write_sink, write_rx).await; + + #[cfg(feature = "tracing")] + tracing::warn!("WS client: (reconnect) write loop ended"); + + state_for_write.lock().await.status = ConnectionStatus::Disconnected; + })); + tasks.push(Box::pin(async move { + Self::run_read_loop( + read_stream, + &router_for_read, + Some(&ctx_for_read), + ) + .await; + + #[cfg(feature = "tracing")] + tracing::warn!("WS client: (reconnect) read loop ended"); + })); + } + None => { + // Watcher gone (auto_reconnect disabled or + // it gave up after max_attempts). Stop + // listening for new loops; tasks continue + // draining until empty. + break; + } + }, - // Reconnect watcher (deferred to AimX follow-up — Group 4). - if auto_reconnect { - tokio::spawn({ - let state = state.clone(); - let runtime_ctx = runtime_ctx.clone(); - async move { - Self::run_reconnect_watcher( - state, - reconnect_url, - reconnect_topics, - router_for_reconnect, - max_reconnect_attempts, - Some(runtime_ctx), - ) - .await; + // Drain finished child futures. Dormant when empty. + _ = tasks.select_next_some() => {} } - }); - } + } + + // After the watcher exited: drain remaining children to + // completion so resources release cleanly. + while tasks.next().await.is_some() {} + }); - Ok(Self { state, router }) + Ok((Self { state, router }, connector_future)) } /// Collect one outbound publisher future per route. @@ -432,14 +532,24 @@ impl WsClientConnectorImpl { /// Reconnect watcher: monitors connection status and reconnects when needed. /// - /// Uses exponential backoff: 500ms, 1s, 2s, 4s, 8s (capped). + /// Uses exponential backoff: 500ms, 1s, 2s, 4s, 8s (capped). On a + /// successful reconnect it sends a [`NewLoops`] to the outer + /// connector future, which translates it into a fresh write- and + /// read-loop future pushed onto the connector's `FuturesUnordered`. + /// The watcher itself never calls `tokio::spawn`. + /// + /// `_router` and `_runtime_ctx` are retained in the signature for + /// symmetry with the outer connector future — they are not used by + /// the watcher itself in the new design (the outer future supplies + /// its own clones to the spawned read-loop future). async fn run_reconnect_watcher( state: Arc>, url: String, subscribe_topics: Vec, - router: Arc, + _router: Arc, max_attempts: usize, - runtime_ctx: Option>, + _runtime_ctx: Option>, + new_loops_tx: mpsc::UnboundedSender, ) { let backoff = [500u64, 1_000, 2_000, 4_000, 8_000]; let mut attempt = 0usize; @@ -489,20 +599,15 @@ impl WsClientConnectorImpl { let (ws_write, ws_read) = ws_stream.split(); - // Create new channel and swap the sender atomically + // Create new channel; sender will be swapped into + // shared state; receiver travels to the outer future + // inside `NewLoops`. let (new_write_tx, new_write_rx) = mpsc::unbounded_channel::(); - tokio::spawn(Self::run_write_loop(ws_write, new_write_rx)); - - // Spawn new read loop - let router_clone = router.clone(); - let runtime_ctx_clone = runtime_ctx.clone(); - tokio::spawn(async move { - Self::run_read_loop(ws_read, &router_clone, runtime_ctx_clone.as_ref()) - .await; - }); - - // Re-subscribe + // Re-subscribe before swapping — the new sender is + // still local and other producers cannot reach it + // yet, so this `Subscribe` is guaranteed first in + // the new write channel. if !subscribe_topics.is_empty() { let sub = ClientMessage::Subscribe { topics: subscribe_topics.clone(), @@ -512,9 +617,10 @@ impl WsClientConnectorImpl { } } - // Swap write_tx and flush pending writes in one critical section. - // All producers (outbound publishers, publish(), keepalive) will - // pick up the new sender on their next lock acquisition. + // Swap write_tx and flush pending writes in one + // critical section. All producers (outbound + // publishers, publish(), keepalive) pick up the new + // sender on their next lock acquisition. { let mut s = state.lock().await; s.write_tx = new_write_tx; @@ -524,6 +630,26 @@ impl WsClientConnectorImpl { s.status = ConnectionStatus::Connected; } + // Hand the new halves to the outer connector future, + // which will push fresh read+write loop futures onto + // its `FuturesUnordered`. + if new_loops_tx + .send(NewLoops { + write_sink: ws_write, + read_stream: ws_read, + write_rx: new_write_rx, + }) + .is_err() + { + // Outer future has gone away — nothing left to + // drive the loops; give up. + #[cfg(feature = "tracing")] + tracing::warn!( + "WS client: outer future dropped, stopping reconnect watcher" + ); + break; + } + attempt = 0; } Err(_e) => { diff --git a/docs/design/030-M13-aimx-remote-spawn-free.md b/docs/design/030-M13-aimx-remote-spawn-free.md new file mode 100644 index 00000000..7e0ab58e --- /dev/null +++ b/docs/design/030-M13-aimx-remote-spawn-free.md @@ -0,0 +1,682 @@ +# AimX Remote-Access: Spawn-Free via Nested `FuturesUnordered` + +**Version:** 0.1 (proposed) +**Status:** 📝 Draft +**Issue:** [#114](https://github.com/aimdb-dev/aimdb/issues/114) +**Predecessor:** [Design 028 — Remove Spawn Trait](028-M13-remove-spawn-trait.md) +**Last Updated:** May 26, 2026 +**Milestone:** M13 — Architectural clean-up (follow-up) + +--- + +## Table of Contents + +- [Summary](#summary) +- [Motivation](#motivation) +- [Current State — Site Inventory](#current-state--site-inventory) + - [Site 1 — Supervisor](#site-1--supervisor) + - [Site 2 — Per-subscription event streamer](#site-2--per-subscription-event-streamer) + - [Site 3 — `subscribe_record_updates` consumer task](#site-3--subscribe_record_updates-consumer-task) + - [Site 4 — WS client connector](#site-4--ws-client-connector) +- [Proposed Design](#proposed-design) + - [Pattern](#pattern) + - [New helper: `stream_record_updates`](#new-helper-stream_record_updates) + - [Site 1 rewrite — supervisor](#site-1-rewrite--supervisor) + - [Sites 2 + 3 rewrite — handler](#sites-2--3-rewrite--handler) + - [Site 4 rewrite — WS client](#site-4-rewrite--ws-client) +- [Cancellation Semantics](#cancellation-semantics) +- [Bounds and Backpressure](#bounds-and-backpressure) +- [Type-System Impact](#type-system-impact) +- [Implementation Plan](#implementation-plan) +- [Testing](#testing) +- [Risks](#risks) +- [Decisions](#decisions) +- [Out of Scope](#out-of-scope) + +--- + +## Summary + +Replace every remaining `tokio::spawn` in the AimX remote-access path with a +nested `FuturesUnordered>` driven by +`futures_util::select_biased!`. Each dynamic fan-out point — the supervisor's +accept loop, the connection handler's subscription set, the WS client's +read/write/keepalive/reconnect loops — becomes a future that owns its own +unordered set. Cancellation collapses to a single mechanism: dropping the +future. + +This does **not** lift `#[cfg(feature = "std")]` on AimX. It removes the +first blocker so that the eventual gate-removal work does not have to +rewrite the concurrency model under time pressure. + +--- + +## Motivation + +Design 028 deliberately deferred this work +([028 §Out of Scope](028-M13-remove-spawn-trait.md#out-of-scope)). The +deferral is now redeemed by this design. + +### Problem 1 — Three exit paths per subscription + +A single AimX subscription today runs across **two** chained +`tokio::spawn` tasks (Sites 2 + 3) joined by an mpsc channel. The subscription +can terminate via three independent mechanisms: + +1. `oneshot::Sender<()>::send(())` from `ConnectionState::cancel_all_subscriptions` + or the `record.unsubscribe` handler — fires the inner select-arm in + the buffer-reader task at [builder.rs:1425](../../aimdb-core/src/builder.rs#L1425). +2. The buffer-reader task's `value_tx.send(json_val).await.is_err()` exit + path ([builder.rs:1435](../../aimdb-core/src/builder.rs#L1435)) — fires + when `value_rx` has been dropped. +3. The forwarder task's `event_tx.send(event).is_err()` exit path + ([handler.rs:1114](../../aimdb-core/src/remote/handler.rs#L1114)) — fires + when the connection's event funnel has been dropped. + +Race windows between these three paths produce orphaned task pairs (e.g. +the `oneshot` fires and the buffer-reader exits, but the forwarder keeps +draining `value_rx` until it drains empty and `value_tx` drops). The +resulting "task ghost" is brief but real, and is the kind of thing that +becomes a leak once we add features (e.g. metrics that hold per-sub +counters on a task-local). + +With nested `FuturesUnordered` there is a single per-subscription future, +and dropping it is the only exit path. + +### Problem 2 — Unbounded spawn is silent + +Every accepted connection allocates a Tokio task (`supervisor.rs:123`); every +`record.subscribe` allocates another (`handler.rs:1042`); every WS client +reconnect allocates two more (`connector.rs:495,500`). A misbehaving client +that opens N connections × M subscriptions consumes N×M independent Tokio +tasks. The first symptom is OOM. + +With nested sets, the cost is the same heap-wise but is **observable** in +the read loop's poll behaviour, so future backpressure work has a place +to attach (`AimxConfig::max_connections`, `max_subs_per_connection`). + +### Problem 3 — `subscribe_record_updates` is the wrong shape + +[`AimDb::subscribe_record_updates`](../../aimdb-core/src/builder.rs#L1376) +exists only to spawn an internal forwarder task that converts a +`BufferReader` into an mpsc of JSON values. With `Spawn` gone, this is the +last `runtime.spawn`-shaped method on the public-ish `AimDb` surface +(it is `pub fn` but used only by the handler). It hides the cancellation +mechanism inside the implementation and forces every caller to handle +two channels. A `Stream`-returning helper is the right shape. + +--- + +## Current State — Site Inventory + +### Site 1 — Supervisor + +[`supervisor.rs:110-149`](../../aimdb-core/src/remote/supervisor.rs#L110-L149): +the accept loop wraps `handle_connection(...)` in `tokio::spawn`. The +supervisor future is itself returned from `build_supervisor_future` and +driven by `AimDbRunner`; only the per-connection dispatch is still a spawn. + +### Site 2 — Per-subscription event streamer + +[`handler.rs:1042`](../../aimdb-core/src/remote/handler.rs#L1042): inside +`handle_record_subscribe`, a `tokio::spawn` runs +`stream_subscription_events(sub_id, value_rx, event_tx)`. That task is +the **second** task in the subscription chain — it reads JSON values from +the mpsc that the buffer-reader task fills (Site 3), wraps each in an +`Event`, and pushes onto the connection's event funnel. + +The `JoinHandle` is `std::mem::drop`-ped immediately ([handler.rs:1055](../../aimdb-core/src/remote/handler.rs#L1055)); +cancellation flows in via the receiver-dropped exit on `event_tx`. + +### Site 3 — `subscribe_record_updates` consumer task + +[`builder.rs:1414`](../../aimdb-core/src/builder.rs#L1414): `tokio::spawn` +runs a `tokio::select!` between `cancel_rx` (oneshot) and +`json_reader.recv_json()`. Exits via: + +- `cancel_rx` fires (Unsubscribe / connection cleanup). +- `value_tx.send(...).await.is_err()` (downstream dropped). +- `BufferClosed` / generic error from the buffer. + +The method returns `(mpsc::Receiver, oneshot::Sender<()>)`. The +handler stores the `Sender<()>` in `SubscriptionHandle::cancel_tx` and the +`Receiver` flows into Site 2. + +### Site 4 — WS client connector + +[`client/connector.rs`](../../aimdb-websocket-connector/src/client/connector.rs) +has **six** `tokio::spawn` call sites, not the two named in #114: + +| Line | Task | Where | +|---|---|---| +| 134 | initial write loop | `connect()` | +| 147 | initial read loop | `connect()` | +| 161 | initial keepalive | `connect()` | +| 168 | initial reconnect watcher | `connect()` | +| 495 | reconnect write loop | `run_reconnect_watcher()` | +| 500 | reconnect read loop | `run_reconnect_watcher()` | + +The reconnect watcher is the dynamic-fan-out site: on each reconnect it +spawns two fresh tasks (lines 495, 500) and lets the previous ones die +when the underlying `WebSocketStream` halves close. The other four +(lines 134–168) are static one-per-connector spawns that could equally +be returned futures, but were left as `tokio::spawn` because the +`ConnectorBuilder::build()` API at the time of #88 collected only +*outbound publisher* futures, not infrastructure futures. + +--- + +## Proposed Design + +### Pattern + +Every dynamic fan-out point owns its own +`FuturesUnordered>` and drives it with +`futures_util::select_biased!`: + +```rust +let mut children: FuturesUnordered> = FuturesUnordered::new(); +loop { + futures_util::select_biased! { + // Outer signal — accept, request-read, reconnect-tick, etc. + outer = outer_source.fuse() => match outer { + NewChild(fut) => children.push(fut), + ... + }, + // Drain completed children so the set doesn't grow forever + _ = children.select_next_some() => {} + } +} +``` + +`select_next_some()` from `futures-util` returns `Pending` when the set is +empty, avoiding the busy-loop that `next()` would produce. The macro +requires `default-features = false, features = ["async-await"]` on +`futures-util`, which is already enabled by design 028. + +`select_biased!` is preferred over plain `select!` so that the outer +signal (accept / read-request / reconnect-need) is always polled first — +without it, a flood of child completions could starve the outer source. + +### New helper: `stream_record_updates` + +Replace `AimDb::subscribe_record_updates` with a `Stream`-returning helper +that does not own a task: + +```rust +// aimdb-core/src/remote/stream.rs (new file) +#[cfg(feature = "std")] +pub(crate) fn stream_record_updates( + db: &AimDb, + record_key: &str, +) -> DbResult + Send + 'static> +where + R: aimdb_executor::RuntimeAdapter + 'static, +{ + let id = db.inner.resolve_str(record_key) + .ok_or_else(|| DbError::RecordKeyNotFound { key: record_key.into() })?; + let record = db.inner.storage(id) + .ok_or_else(|| DbError::InvalidRecordId { id: id.raw() })?; + let mut json_reader = record.subscribe_json()?; + + Ok(async_stream::stream! { + loop { + match json_reader.recv_json().await { + Ok(v) => yield v, + Err(DbError::BufferLagged { .. }) => continue, // log + skip + Err(_) => break, // BufferClosed or fatal + } + } + }) +} +``` + +**Dependency note.** `async_stream` is the cleanest expression; if we +prefer to avoid the extra dep, use `futures_util::stream::unfold` with a +`(reader, ())` state — same shape, no macro. + +The helper owns no task and no channel. Cancellation = drop the stream. +Lag handling stays where it is today (skip-and-continue). The +`BufferClosed` and generic-error branches collapse to `break`, since the +caller now sees a `None` from the stream and exits its own loop +naturally. + +Visibility: `pub(crate)`. The old `pub fn subscribe_record_updates` +disappears. + +### Site 1 rewrite — supervisor + +`supervisor.rs` accept loop becomes: + +```rust +let supervisor_future: BoxFuture = Box::pin(async move { + let mut connections: FuturesUnordered> = + FuturesUnordered::new(); + + loop { + futures_util::select_biased! { + accept_res = listener.accept().fuse() => match accept_res { + Ok((stream, _addr)) => { + if let Some(max) = config.max_connections { + if connections.len() >= max { + // refuse: close stream with a one-line error + drop(stream); + continue; + } + } + let db_clone = db.clone(); + let config_clone = config.clone(); + connections.push(Box::pin(async move { + let _ = crate::remote::handler::handle_connection( + db_clone, config_clone, stream, + ).await; + })); + } + Err(_e) => { + // log; continue accepting + } + }, + _ = connections.select_next_some() => { + // a connection ended — implicit cleanup, nothing to do + } + } + } +}); +``` + +`select_biased!` polls `accept` first; busy connections never starve the +accept loop. The optional `max_connections` cap is the new bound surface +described in [Bounds and Backpressure](#bounds-and-backpressure). + +### Sites 2 + 3 rewrite — handler + +The two-task chain (Sites 2 + 3) collapses to a single per-subscription +future. The handler's outer loop today already uses `tokio::select!` to +interleave `stream.read_line` and `event_rx.recv` +([handler.rs:185-248](../../aimdb-core/src/remote/handler.rs#L185-L248)). +We add a third arm to drain a subscription `FuturesUnordered`: + +```rust +let (event_tx, mut event_rx) = mpsc::unbounded_channel::(); +let mut conn_state = ConnectionState::new(event_tx.clone()); +let mut subs: FuturesUnordered> = FuturesUnordered::new(); + +loop { + let mut line = String::new(); + futures_util::select_biased! { + // Incoming request line + read_result = stream.read_line(&mut line).fuse() => match read_result { + Ok(0) => break, // client closed + Ok(_) => { + let response = handle_request( + &db, &config, &mut conn_state, &mut subs, request, &event_tx + ).await; + if send_response(&mut stream, &response).await.is_err() { break; } + } + Err(_) => break, + }, + // Outgoing event to client + Some(event) = event_rx.recv().fuse() => { + if send_event(&mut stream, &event).await.is_err() { break; } + }, + // Drain finished subscriptions + _ = subs.select_next_some() => {} + } +} +// Drop subs: every subscription future is cancelled by being dropped. +drop(subs); +``` + +`handle_request` is threaded the `&mut subs` set and the `event_tx` +clone, so `handle_record_subscribe` can push a fresh subscription future +instead of spawning. The subscription future is: + +```rust +async fn run_subscription( + stream: impl Stream + Send + 'static, + subscription_id: String, + event_tx: mpsc::UnboundedSender, + cancel: Arc, +) { + futures_util::pin_mut!(stream); + let mut sequence: u64 = 1; + while let Some(json_value) = stream.next().await { + if cancel.load(Ordering::Relaxed) { break; } + + let duration = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default(); + let event = Event { + subscription_id: subscription_id.clone(), + sequence, + data: json_value, + timestamp: format!("{}.{:09}", duration.as_secs(), duration.subsec_nanos()), + dropped: None, + }; + if event_tx.send(event).is_err() { break; } + sequence += 1; + } +} +``` + +`ConnectionState::subscriptions` changes from +`HashMap` to +`HashMap>`. `SubscriptionHandle` and the +`oneshot` cancel channel are deleted. `cancel_all_subscriptions` becomes +trivial: on connection exit the whole `FuturesUnordered` is dropped, and +the handler exits naturally. No explicit drain needed. + +### Site 4 rewrite — WS client + +The WS client connector currently spawns six tasks. The natural model is +one outer future that owns a `FuturesUnordered` and on reconnect drops +the old read/write children and pushes new ones. + +`WsClientConnectorImpl::connect()` becomes +`WsClientConnectorImpl::connector_future()` (or returns a tuple +`(impl Connector, BoxFuture)`). The future: + +```rust +async fn connector_future(...) { + let mut tasks: FuturesUnordered> = FuturesUnordered::new(); + + // Initial read / write / keepalive — pushed once + tasks.push(Box::pin(Self::run_write_loop(ws_write, write_rx))); + tasks.push(Box::pin(Self::run_read_loop(ws_read, router, ctx))); + if let Some(interval) = keepalive_interval { + tasks.push(Box::pin(Self::run_keepalive(state.clone(), interval))); + } + + // Reconnect coordinator (an mpsc) — or a select! arm that + // periodically checks status and emits new read/write futures. + let (reconnect_tx, mut reconnect_rx) = mpsc::unbounded_channel::(); + if auto_reconnect { + tasks.push(Box::pin(Self::run_reconnect_watcher( + state.clone(), url, subscribe_topics, router.clone(), + max_reconnect_attempts, Some(runtime_ctx), reconnect_tx, + ))); + } + + loop { + futures_util::select_biased! { + Some(new_loops) = reconnect_rx.recv().fuse() => { + tasks.push(Box::pin(Self::run_write_loop(new_loops.write_sink, new_loops.write_rx))); + tasks.push(Box::pin(Self::run_read_loop(new_loops.read_stream, &router_for_reconnect, Some(&ctx_for_reconnect)))); + }, + _ = tasks.select_next_some() => { + // a child ended — read/write halves close together on disconnect; + // reconnect watcher handles the rest + } + } + } +} +``` + +`run_reconnect_watcher` no longer calls `tokio::spawn`. Instead, when it +completes a fresh `connect_async`, it sends a `NewLoops { write_sink, +read_stream, write_tx }` over `reconnect_tx`. The outer loop pushes the +new futures into `tasks` and the old halves are already dead. + +`ConnectorBuilder::build()` returns this future plus the outbound +publisher futures already collected today, all appended to +`AimDbRunner`'s vec. + +> **Alternative shape.** Instead of an mpsc, the reconnect watcher can +> itself return a `Stream`; then the outer loop's +> `select_biased!` has one arm per stream and the loop is purely +> futures-based. Pick during implementation; the mpsc shape is shown +> because it matches today's code most directly. + +--- + +## Cancellation Semantics + +| Event | Today | After | +|---|---|---| +| Connection closed by client | RX drops → buffer-reader exits → forwarder drains and exits | Outer handler loop breaks; per-conn `FuturesUnordered` dropped; all subs cancelled in one step | +| `record.unsubscribe` | `oneshot::Sender<()>::send(())` fires inner select-arm | `Arc::store(true)`; sub future exits at next `stream.next()` poll | +| Supervisor drop (runner shutdown) | `tokio::spawn`-ed connections orphan; runtime waits for them | Outer `FuturesUnordered` dropped → all connection futures dropped → all sub futures dropped | +| Buffer producer closed | `BufferClosed` error → reader exits → forwarder mpsc-EOFs → exits | Stream yields `None` → sub future exits | + +**Unsubscribe delay.** The Arc approach yields a delay of up +to one poll cycle: an idle subscription does not see the flag flip until +the next buffered value arrives. For AimX semantics this is acceptable — +`record.unsubscribe` has never been a synchronous contract. Document it +in the AimX protocol docs. + +**Why not `tokio::sync::Notify` or oneshot?** Both are tokio-specific +and would re-introduce the dual-path cancellation that this design is +trying to delete. The whole point is one mechanism: drop the future. +`AtomicBool` is the minimum primitive that lets Unsubscribe target a +specific subscription inside the set; if and when AimX un-gates from +`std`, the path is already runtime-agnostic. + +--- + +## Bounds and Backpressure + +[`AimxConfig`](../../aimdb-core/src/remote/config.rs) gains two optional +caps: + +```rust +pub struct AimxConfig { + // ... existing fields ... + + /// Maximum concurrent client connections. `None` = unbounded (current behaviour). + pub max_connections: Option, + + /// Maximum subscriptions per connection. Today: implicit via + /// `subscription_queue_size`; this becomes an explicit cap. + pub max_subs_per_connection: Option, +} +``` + +`subscription_queue_size` stays (it controls per-sub channel depth, not +sub count). When `max_connections` is `Some(n)` and reached, the +supervisor refuses new connects by dropping the accepted `UnixStream` +without handshake — that maps to "connection refused" on the client +side. When `max_subs_per_connection` is `Some(n)` and reached, the +handler returns the existing `too_many_subscriptions` response code. + +Defaults: both `None` to preserve current behaviour for in-flight +deployments. The acceptance criterion in #114 is "pick one and document"; +this design picks **soft caps as opt-in**, documented in CHANGELOG and +the AimX guide. + +--- + +## Type-System Impact + +Surface area is small — design 028 already dropped `R: Spawn` everywhere. +The signatures that move: + +| Item | Before | After | +|---|---|---| +| `AimDb::subscribe_record_updates` | `pub fn` returning `(mpsc::Receiver, oneshot::Sender)` | **deleted** | +| `stream_record_updates` (new) | — | `pub(crate) fn stream_record_updates(db, key) -> DbResult>` | +| `SubscriptionHandle` (handler.rs) | `{ subscription_id, record_name, cancel_tx }` | **deleted** | +| `ConnectionState::subscriptions` | `HashMap` | `HashMap>` | +| `ConnectionState::cancel_all_subscriptions` | iterates handles, sends to each oneshot | **deleted** — outer drop handles it | +| `stream_subscription_events` | `async fn`, spawned via `tokio::spawn` | **deleted** — folded into `run_subscription` | +| `handle_record_subscribe` | `(&db, &config, &mut conn_state, request_id, params)` | adds `&mut subs: &mut FuturesUnordered` and `&event_tx` | +| `handle_request` | dispatches all method calls | threads `&mut subs` + `&event_tx` into the `record.subscribe` arm | + +No public `aimdb-core` API changes beyond `subscribe_record_updates` +removal. That method has no in-tree callers besides `handler.rs` +([git grep verified](../../aimdb-core/CHANGELOG.md)), so the breakage +is internal-only. + +WS client: `WsClientConnectorImpl::connect` signature changes from +`Result` to `Result<(Self, BoxFuture<'static, ()>), String>` +(or equivalent). This is internal to the connector crate; the +`ConnectorBuilder::build()` surface already returns a Vec of +`BoxFuture`s, which absorbs the new infrastructure future. + +--- + +## Implementation Plan + +Each step should pass `make check` before the next begins. + +### Step 1 — `stream_record_updates` helper + +- Add `aimdb-core/src/remote/stream.rs` with `stream_record_updates`. +- Choose `async_stream` vs hand-rolled `stream::unfold` (see helper section). +- Wire it as `pub(crate)`; do **not** yet delete `subscribe_record_updates`. +- Add a unit test that drives the stream against a fake record and + asserts values + lag-skip behaviour. + +### Step 2 — Handler: collapse subscriptions into one future + +- In `handle_connection`, introduce `subs: FuturesUnordered`. +- Replace `SubscriptionHandle` with `Arc`; update + `ConnectionState`. +- Convert `tokio::select!` in the connection loop to + `futures_util::select_biased!` with the third (`subs.select_next_some()`) + arm. +- Rewrite `handle_record_subscribe` to push `run_subscription(...)` into + `subs` instead of spawning. +- Delete `stream_subscription_events`. +- Update `handle_record_unsubscribe` to `cancel.store(true, Relaxed)`. +- Delete `cancel_all_subscriptions` (outer drop handles cleanup). + +### Step 3 — Supervisor: nested `FuturesUnordered` + +- Rewrite `build_supervisor_future` body: replace `tokio::spawn(handle_connection(...))` + with `connections.push(Box::pin(handle_connection(...)))`. +- Wrap accept loop in `futures_util::select_biased!` with `connections.select_next_some()` + arm. +- Add optional `max_connections` enforcement. + +### Step 4 — Delete `subscribe_record_updates` + +- Remove the method body and doc comment from `builder.rs`. +- `git grep subscribe_record_updates` must be empty. + +### Step 5 — `AimxConfig` bounds + +- Add `max_connections` and `max_subs_per_connection` (both `Option`, + default `None`). +- Update config docs and CHANGELOG. + +### Step 6 — WS client connector + +- Refactor `WsClientConnectorImpl::connect` to return + `(Self, BoxFuture<'static, ()>)`. +- Move write/read/keepalive/reconnect-watcher spawns into a single + `connector_future` that owns a `FuturesUnordered`. +- Reconnect watcher emits `NewLoops` over an mpsc; outer loop pushes + fresh read/write futures on reception. +- Update `WsClientConnectorBuilder` to return the infrastructure future + in its `Vec`. + +### Step 7 — Tests + +- Existing AimX integration tests must pass unchanged. They live in the + remote-access demo (`examples/remote-access-demo/`) and any in-tree + tests under `aimdb-core`. As of this design, the in-tree test + directory does not exist; coverage relies on the demo binary plus + unit tests inside each module. +- Add one cancellation-on-drop test: open a subscription, drop the + client connection, assert (via a fresh subscriber on the same record) + that the buffer's consumer-side reader is dropped — measurable by + asserting that consumer count goes back to its pre-subscribe value + within N polls. + +### Step 8 — Docs and CHANGELOG + +- Update [028 §Remote supervisor](028-M13-remove-spawn-trait.md#remote-supervisor) + status: "Bridge state" → "Target state achieved." +- Update [028 §Out of Scope](028-M13-remove-spawn-trait.md#out-of-scope): + remove the AimX follow-up section. +- Add CHANGELOG entry under "Internal refactors" for both + `aimdb-core` and `aimdb-websocket-connector`. No user-facing + semver bump. +- Update AimX protocol docs (if any) noting the one-poll-cycle + Unsubscribe semantic. + +--- + +## Testing + +Beyond the cancellation-on-drop test in Step 7: + +- **Fairness.** With M concurrent connections each holding K subscriptions + pushing values continuously, every connection should observe events + within a bounded delta. `select_biased!` polls accept-first on the + supervisor and read-first on the handler, which is the right default; + `FuturesUnordered` itself is fair within a level. Add a multi-connection + stress test that asserts the max-to-min event latency ratio stays + within (e.g.) 10×. +- **Bounds.** Test that `max_connections = Some(n)` refuses connection + n+1 and accepts again after one drops. +- **Reconnect.** Test that the WS client's reconnect watcher can survive + N reconnect cycles without leaking futures (assert `tasks.len()` stays + bounded after settling). + +--- + +## Risks + +| Risk | Likelihood | Mitigation | +|---|---|---| +| One busy connection starves siblings inside the supervisor's FuturesUnordered | Low | Set is fair within a level; `select_biased!` only biases accept-vs-drain. Fairness test in Step 7. | +| Unsubscribe delay (one-poll-cycle) surfaces as a perceived bug for idle subscriptions | Low | Documented in AimX protocol docs; no client today relies on synchronous Unsubscribe. | +| `select_biased!` adds compile-time / binary-size overhead vs hand-rolled `poll_fn` | Negligible | Already on dep tree via design 028 (`AimDbRunner::run`). | +| Embassy stack overflow when `std` gate eventually drops and supervisor runs in the shared task | Medium (later) | Out of scope here. When the gate-removal PR lands, place the supervisor in its own Embassy task with an explicit `stack_size`. Already noted in [028 §Embassy](028-M13-remove-spawn-trait.md#embassy-nostd--alloc). | +| `async_stream` dep adds proc-macro overhead | Low | Alternative: hand-rolled `stream::unfold` (no new dep, slightly more verbose). Decide during Step 1. | +| Race: `Unsubscribe` arrives between `stream.next()` poll and event delivery | Low | The flag is checked **after** `stream.next().await` returns and **before** `event_tx.send`. The dropped event matches the protocol's "events between Unsubscribe and ack may or may not be delivered" semantic. | + +--- + +## Decisions + +1. **Unsubscribe cancellation primitive** → **`Arc`.** + Honours #114's recommendation. Keeps the path runtime-agnostic for + the eventual `std`-gate lift. One-poll-cycle delay on idle + subscriptions is acceptable AimX semantics. + +2. **`stream_record_updates` visibility** → **`pub(crate)`.** + The only caller is `handler.rs`. Promoting to `pub` would re-create + a not-quite-public API; if external callers ever need it we can + widen later. + +3. **WS client (Site 4) in the same PR** → **Yes.** + Same pattern, same review surface. Splitting would force the + reviewer to context-switch on the second PR with no new insight. + +4. **Connection / subscription bounds** → **Opt-in caps in `AimxConfig`.** + `max_connections` and `max_subs_per_connection` both default + `None` (current unbounded behaviour). Operators who care + set them; #114's acceptance criterion is satisfied either way. + +5. **`async_stream` dependency** → **Decide in Step 1.** + The `unfold` shape is fine; the dep saves ~6 lines. Not load-bearing. + +6. **`SubscriptionHandle` retention** → **Delete.** + `record_name` was only used in tracing; `cancel_tx` is gone. + Replacing with `Arc` directly in the HashMap value + is one fewer indirection. + +7. **WS client reconnect coordination** → **mpsc from watcher to outer loop.** + Concrete and matches today's structure. A pure-Stream alternative + exists but is more invasive; either is acceptable during + implementation. + +--- + +## Out of Scope + +- **Lifting `#[cfg(feature = "std")]` on AimX.** Requires porting the + Unix-domain-socket transport, serialising security policy without + `std::path::PathBuf`, and replacing `tokio::io::BufReader`. Tracked + separately; this design only removes the concurrency-model blocker. +- **Changing the AimX wire protocol.** Unchanged. +- **WS *server* per-connection handlers.** Live in Axum, which spawns + internally. Same category as Tokio's listener internals — not an + AimDB-owned spawn site. +- **Removing `R` from `Producer` / `Consumer`.** Deferred in + 028 ([Alternative C](028-M13-remove-spawn-trait.md#alternatives-considered)), + still deferred here. +- **MCP / persistence / sync crates.** No spawn calls in their hot paths; + no work needed. From b5ef22e36b567a0a5abdc103f3507314bba999bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Schn=C3=B6rch?= Date: Wed, 27 May 2026 04:13:34 +0000 Subject: [PATCH 2/9] refactor: make AimX remote-access and WS client connector spawn-free --- aimdb-core/CHANGELOG.md | 11 +- aimdb-core/src/remote/handler.rs | 126 +++++++++++++++++- aimdb-core/src/remote/supervisor.rs | 13 +- aimdb-websocket-connector/CHANGELOG.md | 8 +- .../src/client/connector.rs | 11 +- docs/design/028-M13-remove-spawn-trait.md | 99 +++++++------- docs/design/030-M13-aimx-remote-spawn-free.md | 23 +++- 7 files changed, 225 insertions(+), 66 deletions(-) diff --git a/aimdb-core/CHANGELOG.md b/aimdb-core/CHANGELOG.md index eccd4913..5296b84d 100644 --- a/aimdb-core/CHANGELOG.md +++ b/aimdb-core/CHANGELOG.md @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Internal refactors + +- **AimX remote-access path is now spawn-free (Issue #114, Design 030).** Every remaining `tokio::spawn` in `aimdb-core/src/remote/` was removed; the supervisor's accept loop and each connection handler now own their own `FuturesUnordered` driven by `tokio::select! { biased; }`. Cancellation collapsed to one mechanism — dropping the future. + - New `aimdb-core/src/remote/stream.rs` exports a `pub(crate) stream_record_updates` helper that adapts a record's `JsonBufferReader` into a `Stream` via `futures_util::stream::unfold`. No task, no channel — drop the stream to cancel. + - `AimDb::subscribe_record_updates` **deleted**. The method had no out-of-tree callers (the only caller was the AimX handler); replaced by `stream_record_updates` above. + - Per-subscription `oneshot::Sender<()>` cancel channels and the `SubscriptionHandle` struct **deleted**. `ConnectionState::subscriptions` is now `HashMap>`; `record.unsubscribe` flips the flag, which the per-sub future observes after its next stream poll. + - The two-task chain per subscription (buffer-reader task + JSON-event forwarder task) **collapsed** into one `run_subscription` future per subscription, held in the connection's `FuturesUnordered`. + - `AimxConfig` gains `max_subs_per_connection: usize` (default 32) — the dedicated per-connection cap that the handler had previously been spelling as `subscription_queue_size`. The existing `max_connections: usize` (previously declared but unread) is now actually enforced by the supervisor; over-cap connections are refused by closing the accepted `UnixStream` pre-handshake. + ### Changed (breaking) - **`Producer::produce` is now sync + infallible; `Consumer::subscribe` is now infallible (Design 029 follow-up, M14).** The pre-resolved `WriteHandle::push` cannot fail and the pre-resolved buffer Arc makes `subscribe()` infallible. Call sites collapse: `producer.produce(x).await?` → `producer.produce(x);` and `let Ok(reader) = consumer.subscribe() else { ... }` → `let reader = consumer.subscribe();`. The `ProducerTrait::produce_any` / `ConsumerTrait::subscribe_any` trait surfaces stay `Result`/`async` because the type-erasure downcast remains fallible. @@ -36,7 +45,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Join transforms now hoist their per-input forwarder construction to build time — `JoinPipeline::into_descriptor()` returns a `CollectedTransform { task_future, fanin_futures }` and the lazy `runtime.spawn(forwarder)` inside `run_join_transform` is gone. - `ConnectorBuilder::build()` now returns `Vec>` instead of `Arc` (which `AimDbBuilder` already discarded). - Unsafe `impl Send/Sync` blocks on `Producer` / `Consumer` deleted — they auto-derive now. - - On the AimX remote-access path, three `runtime.spawn(...)` call sites bridge to `tokio::spawn` directly under `#[cfg(feature = "std")]`. These (per-connection handler, per-subscription event stream, `subscribe_record_updates`) are addressed in the AimX portability follow-up. + - On the AimX remote-access path, three `runtime.spawn(...)` call sites were temporarily bridged to bare `tokio::spawn` under `#[cfg(feature = "std")]`. These have since been removed by the AimX spawn-free follow-up — see the "AimX remote-access path is now spawn-free" entry above. - `on_start` no_std bifurcation collapsed: a single `StartFnType` alias replaces the byte-identical std/no_std pair. ## [1.1.0] - 2026-05-22 diff --git a/aimdb-core/src/remote/handler.rs b/aimdb-core/src/remote/handler.rs index c101db65..ca575788 100644 --- a/aimdb-core/src/remote/handler.rs +++ b/aimdb-core/src/remote/handler.rs @@ -230,10 +230,14 @@ where } // Drain finished subscription futures so `subs` does not grow - // unboundedly. The guard ensures the arm is disabled (not polled) - // when the set is empty — `FuturesUnordered::is_terminated()` - // returns `is_empty()`, so `select_next_some` would panic. - _ = subs.next(), if !subs.is_empty() => {} + // unboundedly. Using `Some(_) = next()` (rather than + // `select_next_some()`) is the safe form: an empty + // `FuturesUnordered` reports `is_terminated() == true`, and + // `select_next_some` panics in that state. With the pattern + // guard the arm is simply disabled when `next()` resolves to + // `None`, and the always-active `read_line` arm keeps the + // select alive. + Some(_) = subs.next() => {} } } @@ -1552,3 +1556,117 @@ where Response::success(request_id, json!(topo_order)) } + +#[cfg(all(test, feature = "std"))] +mod tests { + use super::*; + use futures_core::Stream; + use std::pin::Pin; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + use std::task::{Context, Poll}; + + /// A `Stream` that never yields a value but + /// flips a flag when dropped. Used to verify that dropping the + /// per-connection `FuturesUnordered` drops the per-subscription + /// future, which in turn drops its underlying record stream — the + /// invariant the AimX spawn-free refactor depends on for cancellation + /// on connection close. + struct DropTracker { + dropped: Arc, + } + + impl Drop for DropTracker { + fn drop(&mut self) { + self.dropped.store(true, Ordering::SeqCst); + } + } + + impl Stream for DropTracker { + type Item = serde_json::Value; + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + // Park forever; we only care that the stream gets dropped. + Poll::Pending + } + } + + #[tokio::test] + async fn dropping_subs_set_drops_subscription_stream() { + let dropped = Arc::new(AtomicBool::new(false)); + let stream = DropTracker { + dropped: dropped.clone(), + }; + + let (event_tx, _event_rx) = mpsc::unbounded_channel::(); + let cancel = Arc::new(AtomicBool::new(false)); + + let mut subs: FuturesUnordered = FuturesUnordered::new(); + subs.push(Box::pin(run_subscription( + stream, + "sub-1".to_string(), + event_tx, + cancel, + ))); + + // Drive the set once so the future is actually pinned/installed. + tokio::task::yield_now().await; + let _ = futures_util::future::poll_fn(|cx| { + let _ = Pin::new(&mut subs).poll_next(cx); + Poll::Ready(()) + }) + .await; + + assert!( + !dropped.load(Ordering::SeqCst), + "drop must not have fired yet" + ); + + // Dropping the set drops every contained future, which in turn + // drops the stream owned by `run_subscription`. + drop(subs); + + assert!( + dropped.load(Ordering::SeqCst), + "dropping the FuturesUnordered must drop the subscription stream" + ); + } + + #[tokio::test] + async fn unsubscribe_flag_terminates_subscription_on_next_event() { + use futures_util::stream; + + // Stream that yields one value then pends forever — lets us + // observe the flag check that sits *after* `stream.next().await`. + let values = stream::iter(vec![serde_json::json!({"v": 1})]) + .chain(stream::pending::()); + + let (event_tx, mut event_rx) = mpsc::unbounded_channel::(); + let cancel = Arc::new(AtomicBool::new(false)); + + // Pre-flip the flag so the future exits after producing one event. + // (We could also flip after seeing the first event; either path + // demonstrates that the flag is observed.) + let cancel_for_future = cancel.clone(); + let handle = tokio::spawn(run_subscription( + values, + "sub-1".to_string(), + event_tx, + cancel_for_future, + )); + + // Consume the one event the stream produces. + let event = event_rx.recv().await.expect("expected one event"); + assert_eq!(event.subscription_id, "sub-1"); + + // Now flip the cancel flag; the next stream.next() will pend + // forever, so the future would never exit without the flag. + // To observe the flag, we'd need a stream that wakes — but the + // semantic guarantee is that *the next event after the flag flip* + // exits the loop, which is what we actually need. + cancel.store(true, Ordering::Relaxed); + + // Abort cleanly so the test doesn't hang on the pending stream. + handle.abort(); + let _ = handle.await; + } +} diff --git a/aimdb-core/src/remote/supervisor.rs b/aimdb-core/src/remote/supervisor.rs index 7a75ff68..bc615261 100644 --- a/aimdb-core/src/remote/supervisor.rs +++ b/aimdb-core/src/remote/supervisor.rs @@ -166,11 +166,14 @@ where } }, - // Drain finished connection futures. The guard ensures the - // arm is disabled (not polled) when the set is empty — - // `FuturesUnordered::is_terminated()` returns `is_empty()`, - // so `select_next_some` would panic. - _ = connections.next(), if !connections.is_empty() => {} + // Drain finished connection futures. Using `Some(_) = next()` + // (rather than `select_next_some()`) is the safe form: an + // empty `FuturesUnordered` reports `is_terminated() == true`, + // and `select_next_some` panics in that state. With the + // pattern guard, the arm is simply disabled when `next()` + // resolves to `None`, and the always-active `accept` + // arm keeps the select alive. + Some(_) = connections.next() => {} } } }); diff --git a/aimdb-websocket-connector/CHANGELOG.md b/aimdb-websocket-connector/CHANGELOG.md index f4b489f5..806a5c5b 100644 --- a/aimdb-websocket-connector/CHANGELOG.md +++ b/aimdb-websocket-connector/CHANGELOG.md @@ -7,11 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Internal refactors + +- **WS client connector is now spawn-free (Issue #114, Design 030).** All six `tokio::spawn` call sites in the client connector (initial write/read/keepalive/reconnect-watcher plus the watcher's per-reconnect read/write loops) collapsed into one infrastructure future that owns a `FuturesUnordered` driven by `tokio::select! { biased; }`. The reconnect watcher no longer spawns; on a successful reconnect it sends a `NewLoops { write_sink, read_stream, write_rx }` over an mpsc to the outer future, which pushes fresh read- and write-loop futures onto the set. + - `WsClientConnectorImpl::connect()` return type changed from `Result` to `Result<(Self, BoxFuture), String>` — the second element is the infrastructure future; the builder prepends it to the outbound publisher futures before returning to `AimDbBuilder`. + - Internal-only API change; no impact on the public `WsClientConnectorBuilder` or `ConnectorBuilder` surfaces. + ### Changed (breaking) - **`ConnectorBuilder::build()` now returns `Vec>` instead of `Arc` (Issue #88).** Server-side: `start_server()` → `build_server_future()` (the `axum::serve()` accept loop is collected, not spawned). Client-side: outbound publishers converted to `collect_outbound_futures()`. - `R: Spawn` bounds dropped throughout in favour of `R: RuntimeAdapter`. The no-op `transport::Connector` impl on `WebSocketConnectorImpl` was removed. -- WS *client* internal background tasks (write loop, read loop, keepalive, reconnect watcher) are temporarily bridged to `tokio::spawn` directly (per design 028 §"Out of Scope" / Group 4). They will move to nested `FuturesUnordered` in the AimX portability follow-up. +- ~~WS *client* internal background tasks (write loop, read loop, keepalive, reconnect watcher) are temporarily bridged to `tokio::spawn` directly (per design 028 §"Out of Scope" / Group 4). They will move to nested `FuturesUnordered` in the AimX portability follow-up.~~ Resolved by the spawn-free refactor above. ## [0.2.0] - 2026-05-22 diff --git a/aimdb-websocket-connector/src/client/connector.rs b/aimdb-websocket-connector/src/client/connector.rs index 6bb7afc1..6ecdd7a6 100644 --- a/aimdb-websocket-connector/src/client/connector.rs +++ b/aimdb-websocket-connector/src/client/connector.rs @@ -272,8 +272,15 @@ impl WsClientConnectorImpl { } }, - // Drain finished child futures. Dormant when empty. - _ = tasks.select_next_some() => {} + // Drain finished child futures. `Some(_) = next()` + // (rather than `select_next_some()`) is the safe form: + // an empty `FuturesUnordered` reports + // `is_terminated() == true`, and `select_next_some` + // panics in that state. With the pattern guard, the + // arm is simply disabled when `next()` resolves to + // `None`; the always-active reconnect arm keeps the + // select alive. + Some(_) = tasks.next() => {} } } diff --git a/docs/design/028-M13-remove-spawn-trait.md b/docs/design/028-M13-remove-spawn-trait.md index 5eea1cb7..d612c6e0 100644 --- a/docs/design/028-M13-remove-spawn-trait.md +++ b/docs/design/028-M13-remove-spawn-trait.md @@ -1,10 +1,10 @@ # Remove `Spawn` Trait — `build()` Collects, `run()` Drives -**Version:** 0.3 (implemented — `unsafe impl` outcome on Embassy corrected) +**Version:** 0.4 (Group 4 / AimX bridge state removed by design 030) **Status:** ✅ Implemented **Issue:** [#88](https://github.com/aimdb-dev/aimdb/issues/88) -**Follow-up issue:** [AimX remote-access portability](../issues/aimx-remote-spawn-free.md) — TBD -**Last Updated:** May 23, 2026 +**Follow-up:** [Design 030 — AimX remote-access spawn-free](030-M13-aimx-remote-spawn-free.md) / [#114](https://github.com/aimdb-dev/aimdb/issues/114) — ✅ Implemented +**Last Updated:** May 26, 2026 **Milestone:** M13 — Architectural clean-up --- @@ -216,13 +216,19 @@ join transform future itself. | `TokioAdapter::spawn_connectors` | `aimdb-tokio-adapter/src/connector.rs:54` | Delete. Test-only helper, no production callers. | | `BufferOps::spawn_dispatcher` | `aimdb-tokio-adapter/src/buffer.rs:205` | Keep (test-only utility), but mark for removal in a follow-up tidy if no external user adopts it. | -**Group 4 — Runtime spawn deferred to follow-up issue (out of scope).** +**Group 4 — Runtime spawn deferred to follow-up issue (now resolved).** -| Call site | File | Reason for deferral | +All three sites were addressed by the AimX spawn-free follow-up +([design 030](030-M13-aimx-remote-spawn-free.md), issue +[#114](https://github.com/aimdb-dev/aimdb/issues/114)). Each was +converted to a nested `FuturesUnordered` driven by `tokio::select! { +biased; }`; cancellation collapsed to dropping the future. + +| Call site | File | Resolution | |---|---|---| -| AimX per-connection handler | `aimdb-core/src/remote/supervisor.rs:122` (bare `tokio::spawn`) | Dynamic fan-out by client count; needs nested-`FuturesUnordered` refactor — see [Out of Scope](#out-of-scope) | -| AimX per-subscription stream | `aimdb-core/src/remote/handler.rs:1042` (bare `tokio::spawn`) + `builder.rs:1409` (`runtime.spawn` inside `subscribe_record_updates`) | Same as above; coupled to handler refactor | -| WebSocket **client** reconnect | `aimdb-websocket-connector/src/client/connector.rs:505,510` (bare `tokio::spawn`) | Dynamic fan-out per reconnect; same nested-`FuturesUnordered` pattern as AimX. Tracked alongside follow-up. | +| AimX per-connection handler | `aimdb-core/src/remote/supervisor.rs` | Supervisor owns a `FuturesUnordered`; accepted connections are pushed in. | +| AimX per-subscription stream | `aimdb-core/src/remote/handler.rs` + `builder.rs` | New `stream_record_updates` helper returns a `Stream`; per-conn `FuturesUnordered` holds one future per `record.subscribe`. `subscribe_record_updates` deleted. | +| WebSocket **client** reconnect | `aimdb-websocket-connector/src/client/connector.rs` | Six `tokio::spawn` sites collapsed into one connector future that owns a `FuturesUnordered`; reconnect watcher sends `NewLoops` over an mpsc rather than spawning. | **Group 5 — External / out-of-codebase (informational).** @@ -232,12 +238,12 @@ join transform future itself. Groups 1, 2, and 3 cover **every** `runtime.spawn(...)` call within `aimdb-core` and every connector — once they land, the `Spawn` trait has no -internal callers. Group 4 retains bare `tokio::spawn` calls inside -`aimdb-core/src/remote/`, but those calls do **not** depend on the trait -— they call Tokio directly through `#[cfg(feature = "std")]`. The trait -can therefore be deleted in this PR without breaking the remote-access -path; the follow-up issue removes the remaining `tokio::spawn` calls in a -separate, focused change. +internal callers. Group 4 retained bare `tokio::spawn` calls inside +`aimdb-core/src/remote/` as a deliberate bridge state in this PR; those +calls did **not** depend on the trait (they called Tokio directly through +`#[cfg(feature = "std")]`), so the trait could be deleted cleanly. The +follow-up ([design 030](030-M13-aimx-remote-spawn-free.md)) has since +removed every Group 4 spawn call. --- @@ -412,27 +418,20 @@ pub fn build_supervisor( ) -> DbResult> ``` -**Bridge state inside this PR.** The supervisor body keeps two existing -`tokio::spawn` call sites untouched: the per-connection handler at -`supervisor.rs:122` and the per-subscription event streamer at -`handler.rs:1042`. The internal `AimDb::subscribe_record_updates` -([builder.rs:1409](../../aimdb-core/src/builder.rs#L1409)) — which today -uses `runtime.spawn(...)` — is rewritten in this PR to call `tokio::spawn` -directly, matching the rest of the AimX path. None of these depend on the -`Spawn` trait after the rewrite, so the trait can be deleted cleanly. - -This is a deliberate bridge: AimX is already `#[cfg(feature = "std")]`-gated, -so the temporary use of bare `tokio::spawn` does not regress portability. -The handler dispatch and supervisor still drop their `R: Spawn` bound (it -becomes `R: RuntimeAdapter`), which is the change that lets the rest of -the type system loosen. - -**Target state (follow-up issue).** The bare `tokio::spawn` calls are -replaced with nested `FuturesUnordered` driven by `select_biased!`, making -the AimX path runtime-agnostic. This is the prerequisite for ever lifting -the `std` gate from AimX. See -[AimX remote-access portability](../issues/aimx-remote-spawn-free.md) for -the full plan; not part of this PR. +**Target state achieved.** The bridge state described in v0.3 of this +design has been removed by the AimX spawn-free follow-up +([design 030](030-M13-aimx-remote-spawn-free.md), issue +[#114](https://github.com/aimdb-dev/aimdb/issues/114)). The supervisor +now pushes per-connection handler futures onto its own +`FuturesUnordered`; the handler does the same with per-subscription +futures backed by a `Stream`-returning helper (`stream_record_updates`); +`AimDb::subscribe_record_updates` is deleted. No `tokio::spawn` remains +in `aimdb-core/src/remote/`. + +The AimX path is now runtime-agnostic in shape (still `#[cfg(feature = +"std")]`-gated for transport reasons). Lifting the `std` gate itself +remains a separate, larger effort; see design 030 §"Out of Scope" for +the remaining work. ### `AimDb::spawn_task` deletion @@ -1079,25 +1078,23 @@ All design questions raised during review have been resolved: The following are explicitly **not** part of this PR / issue #88: -### AimX remote-access spawn-free refactor - -**Tracked in:** [AimX remote-access portability](../issues/aimx-remote-spawn-free.md) (TBD on filing). - -The AimX path retains three bare `tokio::spawn` (or `runtime.spawn`-via-bridge) -calls that are not addressed here: - -- `aimdb-core/src/remote/supervisor.rs:122` — per-connection handler spawn -- `aimdb-core/src/remote/handler.rs:1042` — per-subscription event-stream spawn -- The temporary `tokio::spawn` inside the rewritten `AimDb::subscribe_record_updates` (this PR replaces `runtime.spawn` with `tokio::spawn`; the follow-up replaces the whole method with a `Stream`-returning helper) +### ~~AimX remote-access spawn-free refactor~~ (resolved by design 030) -The follow-up issue replaces all three with nested `FuturesUnordered` driven -by `futures::select_biased!`, eliminates the per-subscription `oneshot` -cancel channel (cancellation = drop the future), and is a prerequisite for -ever lifting the `#[cfg(feature = "std")]` gate on AimX. +Originally deferred to a follow-up; landed via +[design 030](030-M13-aimx-remote-spawn-free.md) / +[issue #114](https://github.com/aimdb-dev/aimdb/issues/114). All three +bridge-state `tokio::spawn` sites in `aimdb-core/src/remote/` were +replaced with nested `FuturesUnordered`; `subscribe_record_updates` was +deleted in favour of a `Stream`-returning helper; per-subscription +`oneshot` cancel channels were replaced with `Arc` flags. -### WebSocket client reconnect spawn +### ~~WebSocket client reconnect spawn~~ (resolved by design 030) -The WS *client* connector's reconnect loop ([client/connector.rs:505,510](../../aimdb-websocket-connector/src/client/connector.rs#L505)) calls bare `tokio::spawn` on every reconnect. Same nested-`FuturesUnordered` pattern applies; bundled with the AimX follow-up. +Originally deferred alongside the AimX follow-up; resolved in the same +PR. The six `tokio::spawn` sites in +[`aimdb-websocket-connector/src/client/connector.rs`](../../aimdb-websocket-connector/src/client/connector.rs) +collapsed into one connector future that owns a `FuturesUnordered`; the +reconnect watcher sends `NewLoops` over an mpsc rather than spawning. ### Removing `R` from `Producer` and `Consumer` diff --git a/docs/design/030-M13-aimx-remote-spawn-free.md b/docs/design/030-M13-aimx-remote-spawn-free.md index 7e0ab58e..686b9116 100644 --- a/docs/design/030-M13-aimx-remote-spawn-free.md +++ b/docs/design/030-M13-aimx-remote-spawn-free.md @@ -1,12 +1,31 @@ # AimX Remote-Access: Spawn-Free via Nested `FuturesUnordered` -**Version:** 0.1 (proposed) -**Status:** 📝 Draft +**Version:** 0.2 (implemented; minor deviations from draft documented below) +**Status:** ✅ Implemented **Issue:** [#114](https://github.com/aimdb-dev/aimdb/issues/114) **Predecessor:** [Design 028 — Remove Spawn Trait](028-M13-remove-spawn-trait.md) **Last Updated:** May 26, 2026 **Milestone:** M13 — Architectural clean-up (follow-up) +## Implementation deviations from draft + +- **Select macro.** The draft specified `futures_util::select_biased!`. + Implementation uses `tokio::select! { biased; }` — same priority + semantics, no `pin_mut!` required for `!Unpin` futures (notably + `read_line`), and avoids adding the `async-await` feature flag on + `futures-util`. Inside `#[cfg(feature = "std")]`-gated code the + runtime-agnostic argument did not apply. +- **`AimxConfig` bounds shape.** The draft proposed + `max_connections: Option`. Implementation kept the existing + `max_connections: usize` (already in the public builder API; default + 16) and added a new `max_subs_per_connection: usize` (default 32). + Non-breaking; `max_connections` is now actually enforced by the + supervisor — previously the field existed but went unread. +- **`async_stream` vs `stream::unfold`.** Picked `futures_util::stream::unfold` + to avoid the new proc-macro dependency. Behaviour is identical. +- **WS client `WsClientConnectorImpl::connect` return shape.** Implemented + as `Result<(Self, BoxFuture), String>`, matching the draft. + --- ## Table of Contents From b3e624cfa6afa285b5bef2a4bdc3f6d1b6f518c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Schn=C3=B6rch?= Date: Wed, 27 May 2026 18:13:19 +0000 Subject: [PATCH 3/9] chore: update embassy subproject to latest commit --- _external/embassy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_external/embassy b/_external/embassy index 4bf217e8..8fe40948 160000 --- a/_external/embassy +++ b/_external/embassy @@ -1 +1 @@ -Subproject commit 4bf217e8b59df60f34820d62dc01094714c6dae2 +Subproject commit 8fe40948b9a474b78d461a35f656407528157c40 From ba32d4203c18490acd1a39d3d21421cd663a0755 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Schn=C3=B6rch?= Date: Wed, 27 May 2026 18:31:45 +0000 Subject: [PATCH 4/9] refactor: remove subscription_queue_size and update AimxConfig for spawn-free design --- aimdb-core/CHANGELOG.md | 6 +- aimdb-core/src/remote/config.rs | 19 +-- aimdb-core/src/remote/handler.rs | 134 +++++++++++++----- aimdb-core/src/remote/mod.rs | 2 +- aimdb-core/src/remote/stream.rs | 17 ++- aimdb-core/src/remote/supervisor.rs | 9 ++ .../src/client/connector.rs | 11 -- docs/design/030-M13-aimx-remote-spawn-free.md | 46 +++--- examples/remote-access-demo/src/client.rs | 5 +- examples/remote-access-demo/src/server.rs | 2 +- 10 files changed, 158 insertions(+), 93 deletions(-) diff --git a/aimdb-core/CHANGELOG.md b/aimdb-core/CHANGELOG.md index 5296b84d..7e9927fa 100644 --- a/aimdb-core/CHANGELOG.md +++ b/aimdb-core/CHANGELOG.md @@ -14,10 +14,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `AimDb::subscribe_record_updates` **deleted**. The method had no out-of-tree callers (the only caller was the AimX handler); replaced by `stream_record_updates` above. - Per-subscription `oneshot::Sender<()>` cancel channels and the `SubscriptionHandle` struct **deleted**. `ConnectionState::subscriptions` is now `HashMap>`; `record.unsubscribe` flips the flag, which the per-sub future observes after its next stream poll. - The two-task chain per subscription (buffer-reader task + JSON-event forwarder task) **collapsed** into one `run_subscription` future per subscription, held in the connection's `FuturesUnordered`. - - `AimxConfig` gains `max_subs_per_connection: usize` (default 32) — the dedicated per-connection cap that the handler had previously been spelling as `subscription_queue_size`. The existing `max_connections: usize` (previously declared but unread) is now actually enforced by the supervisor; over-cap connections are refused by closing the accepted `UnixStream` pre-handshake. ### Changed (breaking) +- **`AimxConfig` lost `subscription_queue_size` (Issue #114, Design 030).** The field bounded a per-subscription mpsc channel that no longer exists — subscriptions are now one future in a `FuturesUnordered`. The builder method `.subscription_queue_size(n)` is removed; replace it with `.max_subs_per_connection(n)` if you were using the value as a soft cap on subscription count, or just delete the call. +- **AimX `Welcome.max_subscriptions` now reports the real per-connection cap.** Previously it returned `subscription_queue_size` (default 100) while the actual cap was implicit; it now returns `max_subs_per_connection` (default 32). Clients that displayed this value will see the change. +- **AimX `record.subscribe` response no longer carries `queue_size`.** Result object is now `{ "subscription_id": "..." }` — the previous `"queue_size"` reported a number that no longer corresponded to anything in the implementation. +- **`AimxConfig` gains `max_subs_per_connection: usize` (default 32)** — the dedicated per-connection subscription cap. The existing `max_connections: usize` (previously declared but unread) is now actually enforced by the supervisor; over-cap connections are refused by closing the accepted `UnixStream` pre-handshake. + - **`Producer::produce` is now sync + infallible; `Consumer::subscribe` is now infallible (Design 029 follow-up, M14).** The pre-resolved `WriteHandle::push` cannot fail and the pre-resolved buffer Arc makes `subscribe()` infallible. Call sites collapse: `producer.produce(x).await?` → `producer.produce(x);` and `let Ok(reader) = consumer.subscribe() else { ... }` → `let reader = consumer.subscribe();`. The `ProducerTrait::produce_any` / `ConsumerTrait::subscribe_any` trait surfaces stay `Result`/`async` because the type-erasure downcast remains fallible. - `AimDb::produce(key, value) -> DbResult<()>` is now sync; `.await` on the call site goes away. Only the key lookup can fail. - `Database::produce` likewise sync. diff --git a/aimdb-core/src/remote/config.rs b/aimdb-core/src/remote/config.rs index dfc9c86e..8249b9e3 100644 --- a/aimdb-core/src/remote/config.rs +++ b/aimdb-core/src/remote/config.rs @@ -7,7 +7,7 @@ use crate::record_id::StringKey; /// Configuration for AimX remote access /// /// Defines how the remote access layer behaves, including socket path, -/// security policy, connection limits, and subscription queue sizes. +/// security policy and connection / subscription limits. #[derive(Debug, Clone)] pub struct AimxConfig { /// Path to Unix domain socket @@ -28,12 +28,6 @@ pub struct AimxConfig { /// until one is released via `record.unsubscribe`. pub max_subs_per_connection: usize, - /// Subscription queue size per client per subscription. Retained for - /// protocol-response compatibility; the per-sub mpsc channel it used - /// to bound was eliminated when subscriptions moved to a nested - /// [`FuturesUnordered`]. - pub subscription_queue_size: usize, - /// Optional authentication token pub auth_token: Option, @@ -50,7 +44,6 @@ impl AimxConfig { /// - Security policy: Read-only /// - Max connections: 16 /// - Max subscriptions per connection: 32 - /// - Subscription queue size: 100 /// - No auth token /// - Socket permissions: 0o600 (owner-only) pub fn uds_default() -> Self { @@ -59,7 +52,6 @@ impl AimxConfig { security_policy: SecurityPolicy::ReadOnly, max_connections: 16, max_subs_per_connection: 32, - subscription_queue_size: 100, auth_token: None, socket_permissions: Some(0o600), } @@ -89,12 +81,6 @@ impl AimxConfig { self } - /// Sets the subscription queue size per client - pub fn subscription_queue_size(mut self, size: usize) -> Self { - self.subscription_queue_size = size; - self - } - /// Sets an authentication token pub fn auth_token(mut self, token: impl Into) -> Self { self.auth_token = Some(token.into()); @@ -227,7 +213,6 @@ mod tests { assert_eq!(config.socket_path, PathBuf::from("/tmp/aimdb.sock")); assert_eq!(config.max_connections, 16); assert_eq!(config.max_subs_per_connection, 32); - assert_eq!(config.subscription_queue_size, 100); assert!(matches!(config.security_policy, SecurityPolicy::ReadOnly)); assert!(config.auth_token.is_none()); } @@ -239,14 +224,12 @@ mod tests { .socket_path("/var/run/aimdb.sock") .max_connections(32) .max_subs_per_connection(8) - .subscription_queue_size(200) .auth_token("secret-token") .socket_permissions(0o660); assert_eq!(config.socket_path, PathBuf::from("/var/run/aimdb.sock")); assert_eq!(config.max_connections, 32); assert_eq!(config.max_subs_per_connection, 8); - assert_eq!(config.subscription_queue_size, 200); assert_eq!(config.auth_token, Some("secret-token".to_string())); assert_eq!(config.socket_permissions, Some(0o660)); } diff --git a/aimdb-core/src/remote/handler.rs b/aimdb-core/src/remote/handler.rs index ca575788..0ad1919b 100644 --- a/aimdb-core/src/remote/handler.rs +++ b/aimdb-core/src/remote/handler.rs @@ -6,10 +6,13 @@ //! # Architecture: Event Funnel Pattern //! //! Subscriptions use a funnel pattern for clean event delivery: -//! - Each subscription spawns a consumer task that reads from the record buffer -//! - Consumer tasks send events to a shared mpsc channel (the "funnel") -//! - A single writer task drains the funnel and writes events to the UnixStream -//! - This ensures NDJSON line integrity and prevents write interleaving +//! - Each `record.subscribe` pushes a future onto a per-connection +//! [`futures_util::stream::FuturesUnordered`] that the connection's +//! outer `select!` loop drives. +//! - Subscription futures send events to a shared mpsc channel (the "funnel"). +//! - The same outer loop drains the funnel and writes events to the +//! `UnixStream`, so NDJSON line integrity is preserved without a +//! dedicated writer task. use crate::remote::{ AimxConfig, Event, HelloMessage, RecordMetadata, Request, Response, WelcomeMessage, @@ -93,22 +96,26 @@ impl ConnectionState { /// # Architecture /// /// ```text -/// ┌─────────────────┐ -/// │ Subscription 1 │───┐ -/// │ Consumer Task │ │ -/// └─────────────────┘ │ -/// ├──► Event Funnel ───► select! loop ───► UnixStream -/// ┌─────────────────┐ │ (mpsc) (interleaved -/// │ Subscription 2 │───┘ writes) -/// │ Consumer Task │ -/// └─────────────────┘ +/// ┌──────────────────────┐ +/// │ Subscription future 1│───┐ +/// │ (in FuturesUnordered)│ │ +/// └──────────────────────┘ │ +/// ├──► Event Funnel ───► select! loop ───► UnixStream +/// ┌──────────────────────┐ │ (mpsc) (interleaved +/// │ Subscription future 2│───┘ writes) +/// │ (in FuturesUnordered)│ +/// └──────────────────────┘ /// ``` /// -/// The main loop uses `tokio::select!` to interleave: +/// The main loop uses `tokio::select! { biased; }` to interleave: /// - Reading requests from the stream /// - Writing events from subscriptions +/// - Draining completed subscription futures /// -/// This ensures both responses and events are written without blocking. +/// `biased;` polls the request arm first so a chatty subscription +/// cannot starve the request path. Cancellation is by drop: when the +/// outer loop exits, the per-connection `FuturesUnordered` is dropped +/// and every subscription future with it. /// /// # Arguments /// * `db` - Database instance @@ -491,7 +498,7 @@ where server: "aimdb".to_string(), permissions, writable_records, - max_subscriptions: Some(config.subscription_queue_size), + max_subscriptions: Some(config.max_subs_per_connection), authenticated: Some(authenticated), }; @@ -937,7 +944,7 @@ where /// * `params` - Request parameters (must contain "name" field with record name) /// /// # Returns -/// Success response with subscription_id and queue_size, or error if: +/// Success response with subscription_id or error if: /// - Missing/invalid parameters /// - Record not found /// - Too many subscriptions @@ -1065,7 +1072,6 @@ where request_id, json!({ "subscription_id": subscription_id, - "queue_size": config.subscription_queue_size, }), ) } @@ -1633,19 +1639,21 @@ mod tests { #[tokio::test] async fn unsubscribe_flag_terminates_subscription_on_next_event() { - use futures_util::stream; - - // Stream that yields one value then pends forever — lets us - // observe the flag check that sits *after* `stream.next().await`. - let values = stream::iter(vec![serde_json::json!({"v": 1})]) - .chain(stream::pending::()); + use futures_util::stream::unfold; + + // Channel-backed stream so we control exactly when the future + // advances past each `stream.next().await`. Without this, a + // synchronous source like `stream::iter` would race past the + // flag-flip before the test can observe it. + let (val_tx, val_rx) = mpsc::unbounded_channel::(); + let values = unfold( + val_rx, + |mut rx| async move { rx.recv().await.map(|v| (v, rx)) }, + ); let (event_tx, mut event_rx) = mpsc::unbounded_channel::(); let cancel = Arc::new(AtomicBool::new(false)); - // Pre-flip the flag so the future exits after producing one event. - // (We could also flip after seeing the first event; either path - // demonstrates that the flag is observed.) let cancel_for_future = cancel.clone(); let handle = tokio::spawn(run_subscription( values, @@ -1654,19 +1662,73 @@ mod tests { cancel_for_future, )); - // Consume the one event the stream produces. + // Feed one value and confirm it propagates as an Event. + val_tx.send(serde_json::json!({"v": 1})).unwrap(); let event = event_rx.recv().await.expect("expected one event"); assert_eq!(event.subscription_id, "sub-1"); - // Now flip the cancel flag; the next stream.next() will pend - // forever, so the future would never exit without the flag. - // To observe the flag, we'd need a stream that wakes — but the - // semantic guarantee is that *the next event after the flag flip* - // exits the loop, which is what we actually need. + // Flip the cancel flag, then feed a second value to wake the + // stream. The future must observe the flag *after* the next + // `stream.next().await` returns and exit before sending the + // second Event. cancel.store(true, Ordering::Relaxed); + val_tx.send(serde_json::json!({"v": 2})).unwrap(); + + // The future must complete on its own — no abort. + handle + .await + .expect("subscription future should exit cleanly after cancel flag"); + + // And the second value must not have produced an Event. + assert!( + event_rx.try_recv().is_err(), + "no further events should be sent after cancel" + ); + } - // Abort cleanly so the test doesn't hang on the pending stream. - handle.abort(); - let _ = handle.await; + #[tokio::test] + async fn dropping_subs_set_drops_inner_stream_state() { + // Stronger integration-style check: a real channel-backed stream + // (the same shape `stream_record_updates` returns via `unfold`) + // is held inside a `run_subscription` future, which is held by a + // `FuturesUnordered`. Dropping the set must drop the channel's + // receiver, which we observe by `val_tx.send(...)` failing. + use futures_util::stream::unfold; + + let (val_tx, val_rx) = mpsc::unbounded_channel::(); + let values = unfold( + val_rx, + |mut rx| async move { rx.recv().await.map(|v| (v, rx)) }, + ); + + let (event_tx, mut event_rx) = mpsc::unbounded_channel::(); + let cancel = Arc::new(AtomicBool::new(false)); + + let mut subs: FuturesUnordered = FuturesUnordered::new(); + subs.push(Box::pin(run_subscription( + values, + "sub-1".to_string(), + event_tx, + cancel, + ))); + + // Drive the set until the subscription is observably alive. + val_tx.send(serde_json::json!({"v": 1})).unwrap(); + tokio::select! { + event = event_rx.recv() => { + assert_eq!(event.unwrap().subscription_id, "sub-1"); + } + _ = subs.next() => panic!("subscription future ended unexpectedly"), + } + + // Connection going away: drop the whole set. This must drop the + // boxed future, which drops the stream, which drops `val_rx`. + drop(subs); + + assert!( + val_tx.send(serde_json::json!({"v": 2})).is_err(), + "after dropping the FuturesUnordered, the inner stream's \ + receiver must be dropped — `send` is the observable proxy" + ); } } diff --git a/aimdb-core/src/remote/mod.rs b/aimdb-core/src/remote/mod.rs index 8053ff04..b1ad5bc9 100644 --- a/aimdb-core/src/remote/mod.rs +++ b/aimdb-core/src/remote/mod.rs @@ -28,7 +28,7 @@ //! .socket_path("/var/run/aimdb/aimdb.sock") //! .security_policy(SecurityPolicy::ReadOnly) //! .max_connections(16) -//! .subscription_queue_size(100) +//! .max_subs_per_connection(32) //! ) //! .build()?; //! ``` diff --git a/aimdb-core/src/remote/stream.rs b/aimdb-core/src/remote/stream.rs index b582867b..1c9bf89e 100644 --- a/aimdb-core/src/remote/stream.rs +++ b/aimdb-core/src/remote/stream.rs @@ -46,16 +46,23 @@ where .ok_or(DbError::InvalidRecordId { id: id.raw() })?; let reader = record.subscribe_json()?; - Ok(unfold(reader, |mut reader| async move { + // Pair the reader with an owned copy of the record key so lag/error + // logs identify which record fell behind — the previous mpsc-based + // path carried this via `record_metadata.name`, and dropping it + // hides which subscription is misbehaving in mixed-record traces. + let state = (reader, record_key.to_string()); + + Ok(unfold(state, |(mut reader, _key)| async move { loop { match reader.recv_json().await { - Ok(value) => return Some((value, reader)), + Ok(value) => return Some((value, (reader, _key))), Err(DbError::BufferLagged { lag_count: _lag_count, .. }) => { #[cfg(feature = "tracing")] tracing::warn!( + record = %_key, "stream_record_updates: subscription lagged by {} messages", _lag_count ); @@ -64,7 +71,11 @@ where Err(DbError::BufferClosed { .. }) => return None, Err(_e) => { #[cfg(feature = "tracing")] - tracing::error!("stream_record_updates: terminating on error: {:?}", _e); + tracing::error!( + record = %_key, + "stream_record_updates: terminating on error: {:?}", + _e + ); return None; } } diff --git a/aimdb-core/src/remote/supervisor.rs b/aimdb-core/src/remote/supervisor.rs index bc615261..28b6004a 100644 --- a/aimdb-core/src/remote/supervisor.rs +++ b/aimdb-core/src/remote/supervisor.rs @@ -128,6 +128,15 @@ where // Refuse if we are already at the connection cap. // The accepted `UnixStream` is dropped, which closes // the socket; the client sees a closed connection. + // + // `connections.len()` is conservative: a connection + // future that has completed but not yet been yielded + // by `connections.next()` still counts. With + // `biased;` the drain arm only runs once `accept` + // returns Pending, so back-to-back accepts can see + // a transiently inflated count after a disconnect + // burst. Erring toward refusing one extra client + // is fine — the cap is a soft ceiling, not an SLA. if connections.len() >= config.max_connections { #[cfg(feature = "tracing")] tracing::warn!( diff --git a/aimdb-websocket-connector/src/client/connector.rs b/aimdb-websocket-connector/src/client/connector.rs index 6ecdd7a6..09eb2a82 100644 --- a/aimdb-websocket-connector/src/client/connector.rs +++ b/aimdb-websocket-connector/src/client/connector.rs @@ -212,16 +212,12 @@ impl WsClientConnectorImpl { // Reconnect watcher. if auto_reconnect { let watcher_state = state_for_future.clone(); - let watcher_router = router_for_future.clone(); - let watcher_ctx = runtime_ctx_for_future.clone(); let watcher_tx = new_loops_tx.clone(); tasks.push(Box::pin(Self::run_reconnect_watcher( watcher_state, reconnect_url, reconnect_topics, - watcher_router, max_reconnect_attempts, - Some(watcher_ctx), watcher_tx, ))); } @@ -544,18 +540,11 @@ impl WsClientConnectorImpl { /// connector future, which translates it into a fresh write- and /// read-loop future pushed onto the connector's `FuturesUnordered`. /// The watcher itself never calls `tokio::spawn`. - /// - /// `_router` and `_runtime_ctx` are retained in the signature for - /// symmetry with the outer connector future — they are not used by - /// the watcher itself in the new design (the outer future supplies - /// its own clones to the spawned read-loop future). async fn run_reconnect_watcher( state: Arc>, url: String, subscribe_topics: Vec, - _router: Arc, max_attempts: usize, - _runtime_ctx: Option>, new_loops_tx: mpsc::UnboundedSender, ) { let backoff = [500u64, 1_000, 2_000, 4_000, 8_000]; diff --git a/docs/design/030-M13-aimx-remote-spawn-free.md b/docs/design/030-M13-aimx-remote-spawn-free.md index 686b9116..4a4a2db6 100644 --- a/docs/design/030-M13-aimx-remote-spawn-free.md +++ b/docs/design/030-M13-aimx-remote-spawn-free.md @@ -21,6 +21,15 @@ 16) and added a new `max_subs_per_connection: usize` (default 32). Non-breaking; `max_connections` is now actually enforced by the supervisor — previously the field existed but went unread. +- **`subscription_queue_size` removed (breaking).** The draft kept the + field "for per-sub channel depth," but the per-sub mpsc that bounded + is gone — subscriptions are now a single future in a + `FuturesUnordered`. Carrying a config value that no longer maps to + anything was actively misleading, including in the `Welcome.max_subscriptions` + field and the `record.subscribe` response's `queue_size` echo. Removed + the field, the builder method, the `Welcome` value (now reports + `max_subs_per_connection`), and the `queue_size` key from the + `record.subscribe` result object. - **`async_stream` vs `stream::unfold`.** Picked `futures_util::stream::unfold` to avoid the new proc-macro dependency. Behaviour is identical. - **WS client `WsClientConnectorImpl::connect` return shape.** Implemented @@ -474,33 +483,34 @@ specific subscription inside the set; if and when AimX un-gates from ## Bounds and Backpressure -[`AimxConfig`](../../aimdb-core/src/remote/config.rs) gains two optional -caps: +[`AimxConfig`](../../aimdb-core/src/remote/config.rs) carries two caps: ```rust pub struct AimxConfig { // ... existing fields ... - /// Maximum concurrent client connections. `None` = unbounded (current behaviour). - pub max_connections: Option, + /// Maximum concurrent client connections (default 16). + pub max_connections: usize, - /// Maximum subscriptions per connection. Today: implicit via - /// `subscription_queue_size`; this becomes an explicit cap. - pub max_subs_per_connection: Option, + /// Maximum subscriptions per connection (default 32). + pub max_subs_per_connection: usize, } ``` -`subscription_queue_size` stays (it controls per-sub channel depth, not -sub count). When `max_connections` is `Some(n)` and reached, the -supervisor refuses new connects by dropping the accepted `UnixStream` -without handshake — that maps to "connection refused" on the client -side. When `max_subs_per_connection` is `Some(n)` and reached, the -handler returns the existing `too_many_subscriptions` response code. - -Defaults: both `None` to preserve current behaviour for in-flight -deployments. The acceptance criterion in #114 is "pick one and document"; -this design picks **soft caps as opt-in**, documented in CHANGELOG and -the AimX guide. +Both are `usize` rather than `Option` — see the "Implementation +deviations" section at the top. `max_connections` was already in the +public builder API but went unread before this design; it is now +actually enforced. `max_subs_per_connection` is new. The previous +`subscription_queue_size` field has been **deleted** — it bounded a +per-sub mpsc that the new `FuturesUnordered` design eliminated, and +keeping a number that no longer mapped to anything was actively +misleading. + +When `max_connections` is reached, the supervisor refuses new +connects by dropping the accepted `UnixStream` without handshake — +that maps to "connection refused" on the client side. When +`max_subs_per_connection` is reached, the handler returns the existing +`too_many_subscriptions` response code. --- diff --git a/examples/remote-access-demo/src/client.rs b/examples/remote-access-demo/src/client.rs index bea90eaa..8f7b4c5f 100644 --- a/examples/remote-access-demo/src/client.rs +++ b/examples/remote-access-demo/src/client.rs @@ -559,8 +559,7 @@ fn main() -> Result<(), Box> { id: 13, method: "record.subscribe".to_string(), params: Some(json!({ - "name": "server::Temperature", - "queue_size": 50 + "name": "server::Temperature" })), }; @@ -578,9 +577,7 @@ fn main() -> Result<(), Box> { Response::Success { id, result } => { println!("✅ Subscribed! (request_id: {})", id); let sub_id = result["subscription_id"].as_str().unwrap().to_string(); - let queue_size = result["queue_size"].as_u64().unwrap(); println!(" Subscription ID: {}", sub_id); - println!(" Queue Size: {}", queue_size); println!(); println!("📊 Receiving live temperature updates (will receive 5 events)..."); println!(); diff --git a/examples/remote-access-demo/src/server.rs b/examples/remote-access-demo/src/server.rs index b57f2718..d5e3d335 100644 --- a/examples/remote-access-demo/src/server.rs +++ b/examples/remote-access-demo/src/server.rs @@ -84,7 +84,7 @@ async fn main() -> Result<(), Box> { .socket_path(socket_path) .security_policy(security_policy) .max_connections(10) - .subscription_queue_size(100); + .max_subs_per_connection(32); info!("📡 Remote access will be available at: {}", socket_path); info!("🔒 Security policy: ReadWrite"); From 0c7c907a7d5d6e0fd9b13de8cd0f7973d0e6f0ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Schn=C3=B6rch?= Date: Wed, 27 May 2026 18:59:18 +0000 Subject: [PATCH 5/9] refactor: replace AtomicBool with Notify for immediate subscription cancellation --- aimdb-core/src/remote/handler.rs | 117 +++++++++++++++++++------------ 1 file changed, 71 insertions(+), 46 deletions(-) diff --git a/aimdb-core/src/remote/handler.rs b/aimdb-core/src/remote/handler.rs index 0ad1919b..5e3347a5 100644 --- a/aimdb-core/src/remote/handler.rs +++ b/aimdb-core/src/remote/handler.rs @@ -22,8 +22,6 @@ use crate::{AimDb, DbError, DbResult}; #[cfg(feature = "std")] use std::collections::HashMap; #[cfg(feature = "std")] -use std::sync::atomic::{AtomicBool, Ordering}; -#[cfg(feature = "std")] use std::sync::Arc; #[cfg(feature = "std")] @@ -37,7 +35,7 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; #[cfg(feature = "std")] use tokio::net::UnixStream; #[cfg(feature = "std")] -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Notify}; #[cfg(feature = "std")] use crate::builder::BoxFuture; @@ -46,15 +44,17 @@ use crate::builder::BoxFuture; /// /// Tracks all active subscriptions for a single client connection. Each /// subscription is identified by its `subscription_id` and carries an -/// `Arc` that the `record.unsubscribe` handler flips to signal -/// the per-subscription future to exit on its next poll. Connection -/// teardown does not need to flip the flag — dropping the per-connection -/// `FuturesUnordered` (in [`handle_connection`]) drops every subscription -/// future, which is the primary cancellation path. +/// `Arc` that `record.unsubscribe` fires to wake the +/// per-subscription future, which then exits on its next poll — +/// cancellation is immediate (no need to wait for the next stream +/// event). Connection teardown does not need to fire the notify — +/// dropping the per-connection `FuturesUnordered` (in +/// [`handle_connection`]) drops every subscription future, which is the +/// primary cancellation path. #[cfg(feature = "std")] struct ConnectionState { - /// Active subscriptions by subscription_id → cancel flag. - subscriptions: HashMap>, + /// Active subscriptions by subscription_id → cancel notify. + subscriptions: HashMap>, /// Counter for generating unique subscription IDs next_subscription_id: u64, @@ -1039,13 +1039,14 @@ where } }; - // Generate unique subscription ID and cancel flag + // Generate unique subscription ID and cancel notify let subscription_id = conn_state.generate_subscription_id(); - let cancel = Arc::new(AtomicBool::new(false)); + let cancel = Arc::new(Notify::new()); // Push the subscription future onto the connection's set. The future - // is dropped — and therefore cancelled — when either the cancel flag - // is flipped (Unsubscribe) or the outer connection loop exits. + // exits — and is therefore dropped — when either the cancel notify + // fires (Unsubscribe, immediate) or the outer connection loop exits + // (drops `subs`, which drops the future). let event_tx = conn_state.event_tx.clone(); let sub_id_for_future = subscription_id.clone(); let cancel_for_future = cancel.clone(); @@ -1081,12 +1082,14 @@ where /// numbers and RFC-style "secs.nanos" timestamps. /// /// Exits when any of: -/// - the `cancel` flag is set (by `record.unsubscribe`) — checked after -/// each stream poll, so cancellation has up to a one-event delay; +/// - the `cancel` notify fires (by `record.unsubscribe`) — wakes the +/// future immediately; the in-flight `stream.next()` is cancelled by +/// `select!` losing its arm, which drops the underlying +/// `JsonBufferReader` even if the record is currently quiet; /// - the upstream stream ends (e.g. `BufferClosed`); /// - the event funnel is closed (connection going down). /// -/// Connection-close cancellation does not rely on the flag — the +/// Connection-close cancellation does not rely on the notify — the /// connection's `FuturesUnordered` is the sole owner of this future and /// dropping the set drops the future. #[cfg(feature = "std")] @@ -1094,7 +1097,7 @@ async fn run_subscription( stream: S, subscription_id: String, event_tx: mpsc::UnboundedSender, - cancel: Arc, + cancel: Arc, ) where S: Stream + Send + 'static, { @@ -1107,12 +1110,29 @@ async fn run_subscription( subscription_id ); - while let Some(json_value) = stream.next().await { - if cancel.load(Ordering::Relaxed) { - #[cfg(feature = "tracing")] - tracing::debug!("Subscription {} cancelled via Unsubscribe", subscription_id); - break; - } + loop { + // `biased;` polls the cancel arm first so a notify issued while + // a value is also ready terminates the subscription rather than + // emitting one more event. `Notify` stores a permit if no + // waiter is parked, so a notify-before-first-poll is honoured + // on the first iteration. + let json_value = tokio::select! { + biased; + + _ = cancel.notified() => { + #[cfg(feature = "tracing")] + tracing::debug!( + "Subscription {} cancelled via Unsubscribe", + subscription_id + ); + break; + } + + maybe_value = stream.next() => match maybe_value { + Some(v) => v, + None => break, + }, + }; // Generate timestamp in "secs.nanosecs" format let duration = std::time::SystemTime::now() @@ -1185,12 +1205,15 @@ async fn handle_record_unsubscribe( #[cfg(feature = "tracing")] tracing::debug!("Unsubscribing from subscription_id: {}", subscription_id); - // Look up and remove the subscription. Setting the cancel flag tells - // the per-subscription future to exit on its next poll; the future - // itself is reaped from `subs` by the connection's outer drain loop. + // Look up and remove the subscription. Firing the notify wakes the + // per-subscription future immediately — the `biased;` cancel arm in + // `run_subscription`'s select! returns next, the stream-poll future + // is dropped (releasing the underlying `JsonBufferReader` even if + // the record is quiet), and the subscription future exits. It is + // then reaped from `subs` by the connection's outer drain loop. match conn_state.subscriptions.remove(&subscription_id) { Some(cancel) => { - cancel.store(true, Ordering::Relaxed); + cancel.notify_one(); #[cfg(feature = "tracing")] tracing::debug!("Cancelled subscription {}", subscription_id); @@ -1604,7 +1627,7 @@ mod tests { }; let (event_tx, _event_rx) = mpsc::unbounded_channel::(); - let cancel = Arc::new(AtomicBool::new(false)); + let cancel = Arc::new(Notify::new()); let mut subs: FuturesUnordered = FuturesUnordered::new(); subs.push(Box::pin(run_subscription( @@ -1638,13 +1661,13 @@ mod tests { } #[tokio::test] - async fn unsubscribe_flag_terminates_subscription_on_next_event() { + async fn unsubscribe_terminates_subscription_immediately() { use futures_util::stream::unfold; - // Channel-backed stream so we control exactly when the future - // advances past each `stream.next().await`. Without this, a - // synchronous source like `stream::iter` would race past the - // flag-flip before the test can observe it. + // Channel-backed stream so the future is parked on `stream.next()` + // with no value pending — the whole point of switching from + // `AtomicBool` to `Notify` is that we no longer need a second + // value to wake the future. The notify itself must wake it. let (val_tx, val_rx) = mpsc::unbounded_channel::(); let values = unfold( val_rx, @@ -1652,7 +1675,7 @@ mod tests { ); let (event_tx, mut event_rx) = mpsc::unbounded_channel::(); - let cancel = Arc::new(AtomicBool::new(false)); + let cancel = Arc::new(Notify::new()); let cancel_for_future = cancel.clone(); let handle = tokio::spawn(run_subscription( @@ -1667,19 +1690,21 @@ mod tests { let event = event_rx.recv().await.expect("expected one event"); assert_eq!(event.subscription_id, "sub-1"); - // Flip the cancel flag, then feed a second value to wake the - // stream. The future must observe the flag *after* the next - // `stream.next().await` returns and exit before sending the - // second Event. - cancel.store(true, Ordering::Relaxed); - val_tx.send(serde_json::json!({"v": 2})).unwrap(); + // Fire the cancel notify WITHOUT feeding any further values. + // The future is parked on `stream.next()` over an empty + // channel; the notify must wake it via the `biased;` cancel + // arm of `select!`, even though the underlying stream is quiet. + cancel.notify_one(); - // The future must complete on its own — no abort. - handle + // The future must complete promptly on its own — no abort, + // no further values needed. Timeout caps the test in case + // immediate cancellation is silently broken. + tokio::time::timeout(std::time::Duration::from_secs(1), handle) .await - .expect("subscription future should exit cleanly after cancel flag"); + .expect("subscription future should exit promptly after notify_one()") + .expect("future panicked"); - // And the second value must not have produced an Event. + // And no further event should have been produced. assert!( event_rx.try_recv().is_err(), "no further events should be sent after cancel" @@ -1702,7 +1727,7 @@ mod tests { ); let (event_tx, mut event_rx) = mpsc::unbounded_channel::(); - let cancel = Arc::new(AtomicBool::new(false)); + let cancel = Arc::new(Notify::new()); let mut subs: FuturesUnordered = FuturesUnordered::new(); subs.push(Box::pin(run_subscription( From 860e318811db69df4565d6114e78a2f258f7e897 Mon Sep 17 00:00:00 2001 From: "sounds.like.lx" <147444674+lxsaah@users.noreply.github.com> Date: Wed, 27 May 2026 21:17:58 +0200 Subject: [PATCH 6/9] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- aimdb-core/src/remote/handler.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aimdb-core/src/remote/handler.rs b/aimdb-core/src/remote/handler.rs index 5e3347a5..b9e3ed31 100644 --- a/aimdb-core/src/remote/handler.rs +++ b/aimdb-core/src/remote/handler.rs @@ -157,8 +157,8 @@ where // Per-connection FuturesUnordered of subscription futures. Each // `record.subscribe` pushes one future here; cancellation flows - // through `Arc` (Unsubscribe) or by dropping `subs` when - // the connection ends. + // through `Arc` (Unsubscribe) or by dropping `subs` when the + // connection ends. let mut subs: FuturesUnordered = FuturesUnordered::new(); // Main loop: interleave reading requests, writing events, and draining From 738870e4c6a32ccceed490a02d3690f272321185 Mon Sep 17 00:00:00 2001 From: "sounds.like.lx" <147444674+lxsaah@users.noreply.github.com> Date: Wed, 27 May 2026 21:18:29 +0200 Subject: [PATCH 7/9] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- aimdb-core/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aimdb-core/CHANGELOG.md b/aimdb-core/CHANGELOG.md index 7e9927fa..b1f1d19c 100644 --- a/aimdb-core/CHANGELOG.md +++ b/aimdb-core/CHANGELOG.md @@ -12,7 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **AimX remote-access path is now spawn-free (Issue #114, Design 030).** Every remaining `tokio::spawn` in `aimdb-core/src/remote/` was removed; the supervisor's accept loop and each connection handler now own their own `FuturesUnordered` driven by `tokio::select! { biased; }`. Cancellation collapsed to one mechanism — dropping the future. - New `aimdb-core/src/remote/stream.rs` exports a `pub(crate) stream_record_updates` helper that adapts a record's `JsonBufferReader` into a `Stream` via `futures_util::stream::unfold`. No task, no channel — drop the stream to cancel. - `AimDb::subscribe_record_updates` **deleted**. The method had no out-of-tree callers (the only caller was the AimX handler); replaced by `stream_record_updates` above. - - Per-subscription `oneshot::Sender<()>` cancel channels and the `SubscriptionHandle` struct **deleted**. `ConnectionState::subscriptions` is now `HashMap>`; `record.unsubscribe` flips the flag, which the per-sub future observes after its next stream poll. + - Per-subscription `oneshot::Sender<()>` cancel channels and the `SubscriptionHandle` struct **deleted**. `ConnectionState::subscriptions` is now `HashMap>`; `record.unsubscribe` calls `notify_one()`, waking the per-sub future immediately (even when parked on `stream.next()`). - The two-task chain per subscription (buffer-reader task + JSON-event forwarder task) **collapsed** into one `run_subscription` future per subscription, held in the connection's `FuturesUnordered`. ### Changed (breaking) From 57768cc1147b66ef4a2f015c61d17149d2705d6f Mon Sep 17 00:00:00 2001 From: "sounds.like.lx" <147444674+lxsaah@users.noreply.github.com> Date: Wed, 27 May 2026 21:19:06 +0200 Subject: [PATCH 8/9] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- docs/design/028-M13-remove-spawn-trait.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/design/028-M13-remove-spawn-trait.md b/docs/design/028-M13-remove-spawn-trait.md index d612c6e0..d84c2593 100644 --- a/docs/design/028-M13-remove-spawn-trait.md +++ b/docs/design/028-M13-remove-spawn-trait.md @@ -1086,7 +1086,7 @@ Originally deferred to a follow-up; landed via bridge-state `tokio::spawn` sites in `aimdb-core/src/remote/` were replaced with nested `FuturesUnordered`; `subscribe_record_updates` was deleted in favour of a `Stream`-returning helper; per-subscription -`oneshot` cancel channels were replaced with `Arc` flags. +`oneshot` cancel channels were replaced with `Arc` notifies for immediate unsubscribe. ### ~~WebSocket client reconnect spawn~~ (resolved by design 030) From a08b51b681aeaed4f208d37af0f7d63dd156f651 Mon Sep 17 00:00:00 2001 From: "sounds.like.lx" <147444674+lxsaah@users.noreply.github.com> Date: Wed, 27 May 2026 21:21:26 +0200 Subject: [PATCH 9/9] Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- docs/design/030-M13-aimx-remote-spawn-free.md | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/docs/design/030-M13-aimx-remote-spawn-free.md b/docs/design/030-M13-aimx-remote-spawn-free.md index 4a4a2db6..390fe98f 100644 --- a/docs/design/030-M13-aimx-remote-spawn-free.md +++ b/docs/design/030-M13-aimx-remote-spawn-free.md @@ -462,22 +462,21 @@ publisher futures already collected today, all appended to | Event | Today | After | |---|---|---| | Connection closed by client | RX drops → buffer-reader exits → forwarder drains and exits | Outer handler loop breaks; per-conn `FuturesUnordered` dropped; all subs cancelled in one step | -| `record.unsubscribe` | `oneshot::Sender<()>::send(())` fires inner select-arm | `Arc::store(true)`; sub future exits at next `stream.next()` poll | +| `record.unsubscribe` | `oneshot::Sender<()>::send(())` fires inner select-arm | `Arc::notify_one()` wakes the sub future immediately (even if parked on `stream.next()`) | | Supervisor drop (runner shutdown) | `tokio::spawn`-ed connections orphan; runtime waits for them | Outer `FuturesUnordered` dropped → all connection futures dropped → all sub futures dropped | | Buffer producer closed | `BufferClosed` error → reader exits → forwarder mpsc-EOFs → exits | Stream yields `None` → sub future exits | -**Unsubscribe delay.** The Arc approach yields a delay of up -to one poll cycle: an idle subscription does not see the flag flip until -the next buffered value arrives. For AimX semantics this is acceptable — -`record.unsubscribe` has never been a synchronous contract. Document it -in the AimX protocol docs. +**Unsubscribe timing.** Using `Notify` makes Unsubscribe effectively immediate: +the per-subscription future is woken even when it is parked on +`stream.next()`, so a quiet record does not delay cancellation. +This is a deliberate change from the earlier AtomicBool-based draft. -**Why not `tokio::sync::Notify` or oneshot?** Both are tokio-specific -and would re-introduce the dual-path cancellation that this design is -trying to delete. The whole point is one mechanism: drop the future. -`AtomicBool` is the minimum primitive that lets Unsubscribe target a -specific subscription inside the set; if and when AimX un-gates from -`std`, the path is already runtime-agnostic. +**Why `tokio::sync::Notify`?** It enables targeted Unsubscribe to wake a +quiet subscription immediately without additional tasks. Connection-close +cancellation still collapses to one mechanism: dropping the future. +The implementation uses `Arc` as the unsubscribe primitive. If AimX +is later un-gated from `std` and moved to a non-Tokio runtime, this point +will need revisiting to keep the cancellation path runtime-agnostic. --- @@ -660,10 +659,10 @@ Beyond the cancellation-on-drop test in Step 7: ## Decisions -1. **Unsubscribe cancellation primitive** → **`Arc`.** - Honours #114's recommendation. Keeps the path runtime-agnostic for - the eventual `std`-gate lift. One-poll-cycle delay on idle - subscriptions is acceptable AimX semantics. +1. **Unsubscribe cancellation primitive** → **`Arc`.** + Enables immediate unsubscribe even when a subscription is parked on + `stream.next()`. The outer `FuturesUnordered` drop remains the primary + cancellation mechanism on connection close. 2. **`stream_record_updates` visibility** → **`pub(crate)`.** The only caller is `handler.rs`. Promoting to `pub` would re-create @@ -684,7 +683,7 @@ Beyond the cancellation-on-drop test in Step 7: 6. **`SubscriptionHandle` retention** → **Delete.** `record_name` was only used in tracing; `cancel_tx` is gone. - Replacing with `Arc` directly in the HashMap value + Replacing with `Arc` directly in the HashMap value is one fewer indirection. 7. **WS client reconnect coordination** → **mpsc from watcher to outer loop.**