Skip to content

[FEAT] Non-blocking fallible try_produce for bounded / non-overwriting buffers #116

@lxsaah

Description

@lxsaah

Background

After M14 (Design 029), Producer::produce is synchronous and infallible:

impl<T> Producer<T> {
    pub fn produce(&self, value: T);
}

That's correct for today's buffers (SpmcRing, SingleLatest, Mailbox) —
they all overwrite on overflow, so the caller can't observe a "full" condition
and the value is never lost in a recoverable way. The producer either gets the
write through or another writer overwrites it; either is acceptable for the
fire-and-forget telemetry workloads we target today.

Problem

For some upcoming buffer types, overwrite is the wrong default:

  • Bounded, non-overwriting — e.g. a command queue where dropping the
    oldest command is unsafe. The producer needs to know push failed so it can
    retry, escalate, or back off.
  • Hot loops with explicit drop policy — telemetry producers that want
    "if the consumer is behind, drop this sample and log a counter" rather
    than the implicit silent-overwrite of SpmcRing.
  • Caller-side rate limiting — applications that want to react when
    pressure builds up (e.g. switch to a slower mode, raise an alert) instead
    of being shielded from overflow.

Producer::produce(value) swallows the value into the buffer unconditionally;
the caller has no way to recover the value or even know it didn't land cleanly.

Note on AimDB semantics: Full here reflects consumer lag on a
bounded buffer
, not multi-producer contention. AimDB's single-writer-per-key
invariant is unchanged — only one Producer<T> writes a given record.

Proposed API

Add a non-blocking, fallible variant on Producer<T> alongside today's
produce. The error must carry the value back so the caller doesn't lose it:

impl<T> Producer<T> {
    /// Sync, infallible. Today's API. Best for overwriting / fire-and-forget
    /// buffers (`SpmcRing`, `SingleLatest`, `Mailbox`).
    pub fn produce(&self, value: T);

    /// Non-blocking. Returns the value back if the buffer is full or closed,
    /// so the caller can decide what to do (retry, drop, escalate, log).
    /// Maps to `tokio::sync::mpsc::Sender::try_send` semantics.
    pub fn try_produce(&self, value: T) -> Result<(), TryProduceError<T>>;
}

/// Non-blocking push error — carries the value back to the caller.
pub enum TryProduceError<T> {
    /// Buffer is at capacity and configured not to overwrite.
    Full(T),
    /// Buffer / record has been torn down (e.g. shutdown).
    Closed(T),
}

Example usage (illustrative)

// Bounded command queue: caller decides what to do on backpressure.
match producer.try_produce(cmd) {
    Ok(()) => {}
    Err(TryProduceError::Full(cmd))   => retry_queue.push(cmd),
    Err(TryProduceError::Closed(_))   => return Err(Shutdown),
}

The consumer side already covers this asymmetry via BufferReader::try_recv()
— no consumer-side change needed.

WriteHandle trait extension

Extend the crate-private trait in aimdb-core/src/buffer/traits.rs with a
default-implemented non-blocking variant so existing buffer impls compile
unchanged:

pub(crate) trait WriteHandle<T: Clone + Send + 'static>: Send + Sync {
    fn push(&self, value: T);

    /// Default: delegate to `push` and return `Ok(())`. Overwriting buffers
    /// cannot fail, so the value is always accepted. Bounded / non-overwriting
    /// buffers override this to return `Full(value)` or `Closed(value)`.
    fn try_push(&self, value: T) -> Result<(), TryProduceError<T>> {
        self.push(value);
        Ok(())
    }
}

This means SpmcRing / SingleLatest / Mailbox need no changes —
they get try_produce for free, always succeeding (which is the correct
semantics for overwrite-on-overflow).

Decisions (final — please don't relitigate in the PR)

# Decision
1 Naming: try_produce (mirrors mpsc::Sender::try_send).
2 Error type: dedicated TryProduceError<T> enum — not (DbError, T). The Full vs Closed distinction is the whole point.
3 Location: TryProduceError<T> lives in aimdb-core::buffer (next to the traits that produce it), re-exported from the crate root alongside the existing Producer re-export.
4 No changes to ProducerTrait::produce_any or any connector-side code in this PR.
5 No metrics wiring in this PR. Auto-incrementing dropped_count on Full is a clean follow-up; first land the API.

Doc-comment guidance

The PR should include short rustdoc on both produce and try_produce so
callers can pick the right one without reading the impl. Suggested phrasing:

  • produce: "Push a value. Infallible — overwrite-on-overflow buffers
    cannot reject. Use this for fire-and-forget telemetry."
  • try_produce: "Non-blocking push. Returns the value back via
    TryProduceError::Full if a bounded buffer is at capacity, or
    TryProduceError::Closed if the record is shutting down. Use when the
    caller has a meaningful response to backpressure."

Acceptance criteria

  • Producer::produce unchanged — no call-site breakage anywhere in the workspace.
  • Producer::try_produce exists with signature fn try_produce(&self, value: T) -> Result<(), TryProduceError<T>>.
  • TryProduceError<T> exists in aimdb-core::buffer, distinguishes Full(T) (transient) from Closed(T) (terminal), and carries the value back in both arms.
  • WriteHandle::try_push is added with a default impl that delegates to push and returns Ok(()).
  • Existing buffers (SpmcRing, SingleLatest, Mailbox) compile and pass their existing tests with no changes to their files.
  • A new test WriteHandle impl in the existing #[cfg(test)] module in aimdb-core/src/buffer/traits.rs overrides try_push to return Full(value), and a test verifies the value round-trips intact through Producer::try_produce.
  • Both produce and try_produce carry rustdoc explaining when to pick which (see "Doc-comment guidance" above).
  • Verification commands all pass:
    • cargo fmt --check
    • cargo clippy --all-targets --all-features -- -D warnings
    • cargo test -p aimdb-core

Out of scope

  • Implementing an actual non-overwriting buffer (separate milestone, driven
    by a concrete need: command queue / bounded fan-in / explicit drop policy).
  • produce_async — covered by the sibling issue (backpressure-aware variant).
  • Consumer<T>::try_subscribeBufferReader::try_recv already covers
    non-blocking consumption.
  • Any change to ProducerTrait::produce_any or connector ingress paths.
  • Wiring Full returns into BufferMetricsSnapshot::dropped_count.

Depends on / blocks

  • Depends on: Design 029 — Remove R from Producer<T> / Consumer<T>
    (M14, ✅ Implemented). The WriteHandle<T> indirection is what makes this
    cheap to add.
  • Blocks: any new buffer impl with bounded non-overwriting semantics,
    and any caller that wants to react to backpressure explicitly without
    going async.

References

  • docs/design/029-M14-remove-r-from-typed-handles.mdDecisions item 2
    set up this issue.
  • aimdb-core/src/buffer/traits.rsWriteHandle<T> trait to extend
    (line 108); test WriteHandle impls already live in the #[cfg(test)]
    module at the bottom of the file — add the new one there.
  • aimdb-core/src/typed_api.rsProducer<T>::produce at line 139, extend here.
  • aimdb-core/src/buffer/traits.rsBufferReader::try_recv is the symmetric
    consumer-side API; mirror its doc-comment style.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions