Skip to content
2 changes: 1 addition & 1 deletion _external/embassy
Submodule embassy updated 55 files
+1 −0 embassy-executor/CHANGELOG.md
+5 −4 embassy-stm32-wpan/Cargo.toml
+52 −0 embassy-stm32-wpan/src/bluetooth/gap/aci_gap.rs
+21 −11 embassy-stm32-wpan/src/bluetooth/gap/advertiser.rs
+5 −0 embassy-stm32-wpan/src/bluetooth/gap/types.rs
+28 −15 embassy-stm32-wpan/src/bluetooth/gap_init.rs
+13 −5 embassy-stm32-wpan/src/bluetooth/mod.rs
+384 −3 embassy-stm32-wpan/src/bluetooth/security/mod.rs
+4 −5 embassy-stm32-wpan/src/net/control.rs
+6 −11 embassy-stm32-wpan/src/net/iface.rs
+6 −7 embassy-stm32-wpan/src/net/mod.rs
+6 −21 embassy-stm32-wpan/src/net/runner.rs
+13 −90 embassy-stm32-wpan/src/net/util.rs
+64 −29 embassy-stm32-wpan/src/wb/cmd.rs
+1 −0 embassy-stm32-wpan/src/wb/consts.rs
+54 −37 embassy-stm32-wpan/src/wb/evt.rs
+58 −27 embassy-stm32-wpan/src/wb/mod.rs
+36 −28 embassy-stm32-wpan/src/wb/sub/ble.rs
+17 −21 embassy-stm32-wpan/src/wb/sub/mac.rs
+2 −0 embassy-stm32-wpan/src/wb/sub/mod.rs
+17 −2 embassy-stm32-wpan/src/wb/sub/sys.rs
+212 −0 embassy-stm32-wpan/src/wb/sub/thread.rs
+20 −0 embassy-stm32-wpan/src/wb/tables.rs
+107 −31 embassy-stm32-wpan/src/wba/linklayer_plat.rs
+22 −22 embassy-stm32-wpan/src/wba/ll_sys/ll_sys_dp_slp.rs
+27 −9 embassy-stm32-wpan/src/wba/ll_sys/ll_sys_startup.rs
+5 −5 embassy-stm32-wpan/src/wba/ll_sys_if.rs
+2 −2 embassy-stm32-wpan/src/wba/mac_sys_if.rs
+1 −1 embassy-stm32-wpan/src/wba/mod.rs
+5 −0 embassy-stm32-wpan/src/wba/platform.rs
+1 −3 embassy-stm32/src/adc/v4.rs
+2 −2 embassy-stm32/src/can/bxcan/mod.rs
+2 −2 embassy-stm32/src/can/fdcan.rs
+13 −0 embassy-sync/CHANGELOG.md
+322 −8 embassy-sync/src/waitqueue/atomic_waker.rs
+28 −0 embassy-sync/src/waitqueue/critical_section_waker.rs
+3 −0 embassy-sync/src/waitqueue/mod.rs
+8 −3 examples/stm32wb/Cargo.toml
+2 −2 examples/stm32wb/src/bin/fus_update.rs
+8 −16 examples/stm32wb/src/bin/mac_ffd.rs
+7 −14 examples/stm32wb/src/bin/mac_rfd.rs
+1 −1 examples/stm32wb/src/bin/tl_mbox.rs
+2 −2 examples/stm32wb/src/bin/tl_mbox_mac.rs
+1 −1 examples/stm32wba/Cargo.toml
+8 −11 examples/stm32wba/src/bin/ble_secure.rs
+9 −0 examples/stm32wba6-lp/Cargo.toml
+180 −0 examples/stm32wba6-lp/src/bin/ble_advertiser_lp.rs
+1 −1 examples/stm32wba6/Cargo.toml
+361 −0 examples/stm32wba6/src/bin/ble_bonding.rs
+8 −11 examples/stm32wba6/src/bin/ble_secure.rs
+74 −0 examples/stm32wba6/verify_rpa.py
+11 −0 fmtall.ps1
+1 −1 tests/stm32/Cargo.toml
+1 −1 tests/stm32/src/bin/wpan_ble.rs
+4 −8 tests/stm32/src/bin/wpan_mac.rs
15 changes: 14 additions & 1 deletion aimdb-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Internal refactors

- **AimX remote-access path is now spawn-free (Issue #114, Design 030).** Every remaining `tokio::spawn` in `aimdb-core/src/remote/` was removed; the supervisor's accept loop and each connection handler now own their own `FuturesUnordered<BoxFuture>` driven by `tokio::select! { biased; }`. Cancellation collapsed to one mechanism — dropping the future.
- New `aimdb-core/src/remote/stream.rs` exports a `pub(crate) stream_record_updates` helper that adapts a record's `JsonBufferReader` into a `Stream<Item = serde_json::Value>` via `futures_util::stream::unfold`. No task, no channel — drop the stream to cancel.
- `AimDb::subscribe_record_updates` **deleted**. The method had no out-of-tree callers (the only caller was the AimX handler); replaced by `stream_record_updates` above.
- Per-subscription `oneshot::Sender<()>` cancel channels and the `SubscriptionHandle` struct **deleted**. `ConnectionState::subscriptions` is now `HashMap<String, Arc<tokio::sync::Notify>>`; `record.unsubscribe` calls `notify_one()`, waking the per-sub future immediately (even when parked on `stream.next()`).
- The two-task chain per subscription (buffer-reader task + JSON-event forwarder task) **collapsed** into one `run_subscription` future per subscription, held in the connection's `FuturesUnordered`.

### Changed (breaking)

- **`AimxConfig` lost `subscription_queue_size` (Issue #114, Design 030).** The field bounded a per-subscription mpsc channel that no longer exists — subscriptions are now one future in a `FuturesUnordered`. The builder method `.subscription_queue_size(n)` is removed; replace it with `.max_subs_per_connection(n)` if you were using the value as a soft cap on subscription count, or just delete the call.
- **AimX `Welcome.max_subscriptions` now reports the real per-connection cap.** Previously it returned `subscription_queue_size` (default 100) while the actual cap was implicit; it now returns `max_subs_per_connection` (default 32). Clients that displayed this value will see the change.
- **AimX `record.subscribe` response no longer carries `queue_size`.** Result object is now `{ "subscription_id": "..." }` — the previous `"queue_size"` reported a number that no longer corresponded to anything in the implementation.
- **`AimxConfig` gains `max_subs_per_connection: usize` (default 32)** — the dedicated per-connection subscription cap. The existing `max_connections: usize` (previously declared but unread) is now actually enforced by the supervisor; over-cap connections are refused by closing the accepted `UnixStream` pre-handshake.

- **`Producer::produce` is now sync + infallible; `Consumer::subscribe` is now infallible (Design 029 follow-up, M14).** The pre-resolved `WriteHandle::push` cannot fail and the pre-resolved buffer Arc makes `subscribe()` infallible. Call sites collapse: `producer.produce(x).await?` → `producer.produce(x);` and `let Ok(reader) = consumer.subscribe() else { ... }` → `let reader = consumer.subscribe();`. The `ProducerTrait::produce_any` / `ConsumerTrait::subscribe_any` trait surfaces stay `Result`/`async` because the type-erasure downcast remains fallible.
- `AimDb::produce<T>(key, value) -> DbResult<()>` is now sync; `.await` on the call site goes away. Only the key lookup can fail.
- `Database::produce` likewise sync.
Expand Down Expand Up @@ -36,7 +49,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Join transforms now hoist their per-input forwarder construction to build time — `JoinPipeline::into_descriptor()` returns a `CollectedTransform { task_future, fanin_futures }` and the lazy `runtime.spawn(forwarder)` inside `run_join_transform` is gone.
- `ConnectorBuilder::build()` now returns `Vec<BoxFuture<'static, ()>>` instead of `Arc<dyn Connector>` (which `AimDbBuilder` already discarded).
- Unsafe `impl Send/Sync` blocks on `Producer<T, R>` / `Consumer<T, R>` deleted — they auto-derive now.
- On the AimX remote-access path, three `runtime.spawn(...)` call sites bridge to `tokio::spawn` directly under `#[cfg(feature = "std")]`. These (per-connection handler, per-subscription event stream, `subscribe_record_updates`) are addressed in the AimX portability follow-up.
- On the AimX remote-access path, three `runtime.spawn(...)` call sites were temporarily bridged to bare `tokio::spawn` under `#[cfg(feature = "std")]`. These have since been removed by the AimX spawn-free follow-up — see the "AimX remote-access path is now spawn-free" entry above.
- `on_start` no_std bifurcation collapsed: a single `StartFnType<R>` alias replaces the byte-identical std/no_std pair.

## [1.1.0] - 2026-05-22
Expand Down
149 changes: 0 additions & 149 deletions aimdb-core/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1327,155 +1327,6 @@ impl<R: aimdb_executor::RuntimeAdapter + 'static> AimDb<R> {
self.inner.set_record_from_json(record_name, json_value)
}

/// Subscribe to record updates as JSON stream (std only)
///
/// Creates a subscription to a record's buffer and forwards updates as JSON
/// to a bounded channel. This is used internally by the remote access protocol
/// for implementing `record.subscribe`.
///
/// # Architecture
///
/// Spawns a consumer task that:
/// 1. Subscribes to the record's buffer using the existing buffer API
/// 2. Reads values as they arrive
/// 3. Serializes each value to JSON
/// 4. Sends JSON values to a bounded channel (with backpressure handling)
/// 5. Terminates when either:
/// - The cancel signal is received (unsubscribe)
/// - The channel receiver is dropped (client disconnected)
///
/// # Arguments
/// * `record_key` - Key of the record to subscribe to
/// * `queue_size` - Size of the bounded channel for this subscription
///
/// # Returns
/// `Ok((receiver, cancel_tx))` where:
/// - `receiver`: Bounded channel receiver for JSON values
/// - `cancel_tx`: One-shot sender to cancel the subscription
///
/// `Err` if:
/// - Record not found for the given key
/// - Record not configured with `.with_remote_access()`
/// - Failed to subscribe to buffer
///
/// # Example (internal use)
///
/// ```rust,ignore
/// let (mut rx, cancel_tx) = db.subscribe_record_updates("sensor.temp", 100)?;
///
/// // Read events
/// while let Some(json_value) = rx.recv().await {
/// // Forward to client...
/// }
///
/// // Cancel subscription
/// let _ = cancel_tx.send(());
/// ```
#[cfg(feature = "std")]
#[allow(unused_variables)] // Variables used only in tracing feature
pub fn subscribe_record_updates(
&self,
record_key: &str,
queue_size: usize,
) -> DbResult<(
tokio::sync::mpsc::Receiver<serde_json::Value>,
tokio::sync::oneshot::Sender<()>,
)> {
use tokio::sync::{mpsc, oneshot};

// Find the record by key
let id = self
.inner
.resolve_str(record_key)
.ok_or_else(|| DbError::RecordKeyNotFound {
key: record_key.to_string(),
})?;

let record = self
.inner
.storage(id)
.ok_or_else(|| DbError::InvalidRecordId { id: id.raw() })?;

// Subscribe to the record's buffer as JSON stream
// This will fail if record not configured with .with_remote_access()
let mut json_reader = record.subscribe_json()?;

// Create channels for the subscription
let (value_tx, value_rx) = mpsc::channel(queue_size);
let (cancel_tx, mut cancel_rx) = oneshot::channel();

// Get metadata for logging
let type_id = self.inner.types[id.index()];
let key = self.inner.keys[id.index()];
let record_metadata = record.collect_metadata(type_id, key, id);

// Bridge state (design 028 §"Remote supervisor"): drop into `tokio::spawn`
// directly. The nested-`FuturesUnordered` rewrite is the AimX follow-up.
tokio::spawn(async move {
#[cfg(feature = "tracing")]
tracing::debug!(
"Subscription consumer task started for {}",
record_metadata.name
);

// Main event loop: read from buffer and forward to channel
loop {
tokio::select! {
// Handle cancellation signal
_ = &mut cancel_rx => {
#[cfg(feature = "tracing")]
tracing::debug!("Subscription cancelled");
break;
}
// Read next JSON value from buffer
result = json_reader.recv_json() => {
match result {
Ok(json_val) => {
// Send JSON value to subscription channel
if value_tx.send(json_val).await.is_err() {
#[cfg(feature = "tracing")]
tracing::debug!("Subscription receiver dropped");
break;
}
}
Err(DbError::BufferLagged { lag_count, .. }) => {
// Consumer fell behind - log warning but continue
#[cfg(feature = "tracing")]
tracing::warn!(
"Subscription for {} lagged by {} messages",
record_metadata.name,
lag_count
);
// Continue reading - next recv will get latest
}
Err(DbError::BufferClosed { .. }) => {
// Buffer closed (shutdown) - exit gracefully
#[cfg(feature = "tracing")]
tracing::debug!("Buffer closed for {}", record_metadata.name);
break;
}
Err(e) => {
// Other error (shouldn't happen in practice)
#[cfg(feature = "tracing")]
tracing::error!(
"Subscription error for {}: {:?}",
record_metadata.name,
e
);
break;
}
}
}
}
}

#[cfg(feature = "tracing")]
tracing::debug!("Subscription consumer task terminated");
});

Ok((value_rx, cancel_tx))
}

/// Collects inbound connector routes for automatic router construction (std only)
///
/// Iterates all records, filters their inbound_connectors by scheme,
Expand Down
30 changes: 18 additions & 12 deletions aimdb-core/src/remote/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::record_id::StringKey;
/// Configuration for AimX remote access
///
/// Defines how the remote access layer behaves, including socket path,
/// security policy, connection limits, and subscription queue sizes.
/// security policy and connection / subscription limits.
#[derive(Debug, Clone)]
pub struct AimxConfig {
/// Path to Unix domain socket
Expand All @@ -16,11 +16,17 @@ pub struct AimxConfig {
/// Security policy (read-only or read-write)
pub security_policy: SecurityPolicy,

/// Maximum number of concurrent connections
/// Maximum number of concurrent client connections accepted by the
/// supervisor. When the in-flight connection count reaches this
/// value, newly-accepted Unix sockets are closed immediately (the
/// client sees a closed socket on connect).
pub max_connections: usize,

/// Subscription queue size per client per subscription
pub subscription_queue_size: usize,
/// Maximum number of concurrent subscriptions allowed per connection.
/// Once a connection holds this many subscriptions, further
/// `record.subscribe` calls receive a `too_many_subscriptions` error
/// until one is released via `record.unsubscribe`.
pub max_subs_per_connection: usize,

/// Optional authentication token
pub auth_token: Option<String>,
Expand All @@ -37,15 +43,15 @@ impl AimxConfig {
/// - Socket path: `/tmp/aimdb.sock`
/// - Security policy: Read-only
/// - Max connections: 16
/// - Subscription queue size: 100
/// - Max subscriptions per connection: 32
/// - No auth token
/// - Socket permissions: 0o600 (owner-only)
pub fn uds_default() -> Self {
Self {
socket_path: PathBuf::from("/tmp/aimdb.sock"),
security_policy: SecurityPolicy::ReadOnly,
max_connections: 16,
subscription_queue_size: 100,
max_subs_per_connection: 32,
auth_token: None,
socket_permissions: Some(0o600),
}
Expand All @@ -69,9 +75,9 @@ impl AimxConfig {
self
}

/// Sets the subscription queue size per client
pub fn subscription_queue_size(mut self, size: usize) -> Self {
self.subscription_queue_size = size;
/// Sets the maximum number of concurrent subscriptions per connection
pub fn max_subs_per_connection(mut self, max: usize) -> Self {
self.max_subs_per_connection = max;
self
}

Expand Down Expand Up @@ -206,7 +212,7 @@ mod tests {
let config = AimxConfig::uds_default();
assert_eq!(config.socket_path, PathBuf::from("/tmp/aimdb.sock"));
assert_eq!(config.max_connections, 16);
assert_eq!(config.subscription_queue_size, 100);
assert_eq!(config.max_subs_per_connection, 32);
assert!(matches!(config.security_policy, SecurityPolicy::ReadOnly));
assert!(config.auth_token.is_none());
}
Expand All @@ -217,13 +223,13 @@ mod tests {
let config = AimxConfig::uds_default()
.socket_path("/var/run/aimdb.sock")
.max_connections(32)
.subscription_queue_size(200)
.max_subs_per_connection(8)
.auth_token("secret-token")
.socket_permissions(0o660);

assert_eq!(config.socket_path, PathBuf::from("/var/run/aimdb.sock"));
assert_eq!(config.max_connections, 32);
assert_eq!(config.subscription_queue_size, 200);
assert_eq!(config.max_subs_per_connection, 8);
assert_eq!(config.auth_token, Some("secret-token".to_string()));
assert_eq!(config.socket_permissions, Some(0o660));
}
Expand Down
Loading