Skip to content

Comments

[Feature] Fallbacks for requests and events handling#64

Merged
vadikko2 merged 3 commits intomasterfrom
feature/add-fallbacks
Feb 21, 2026
Merged

[Feature] Fallbacks for requests and events handling#64
vadikko2 merged 3 commits intomasterfrom
feature/add-fallbacks

Conversation

@vadikko2
Copy link
Collaborator

@vadikko2 vadikko2 commented Feb 21, 2026

✨ Fallbacks for requests and events handling

TL;DR — Request and event handler fallbacks with optional circuit breaker; unified ICircuitBreaker protocol; new examples and tests.

This release adds fallback support for request handlers and event handlers, with optional circuit breaker integration. You can now define primary and fallback handlers for commands, queries, streaming handlers, and domain events, and optionally protect them with a shared circuit breaker protocol.


🆕 What's new

📨 Request handler fallbacks

  • RequestHandlerFallback — wrap a primary and fallback request handler so that when the primary fails (or the circuit is open), the fallback is used.
  • Works with both sync RequestHandler[Req, Res] and streaming StreamingRequestHandler[Req, Res].
  • Optional failure_exceptions to trigger fallback only for specific exception types (e.g. ConnectionError, TimeoutError).
  • Optional circuit breaker so that after N failures the primary is skipped and the fallback runs directly.

Example:

from cqrs import RequestHandlerFallback
from cqrs.adapters.circuit_breaker import AioBreakerAdapter

request_cb = AioBreakerAdapter(fail_max=5, timeout_duration=60)
request_map.bind(
    GetOrderCommand,
    RequestHandlerFallback(
        GetOrderHandler,
        GetOrderFallbackHandler,
        failure_exceptions=(ConnectionError, TimeoutError),
        circuit_breaker=request_cb,
    ),
)

📢 Event handler fallbacks

  • EventHandlerFallback — wrap a primary and fallback event handler so that when the primary fails (or the circuit is open), the fallback is used.
  • Same options: failure_exceptions and optional circuit breaker.
  • Supported in both event dispatcher and event emitter (handlers registered in EventMap).

Example:

from cqrs import EventHandlerFallback

event_cb = AioBreakerAdapter(fail_max=5, timeout_duration=60)
event_map.bind(
    OrderCreatedEvent,
    EventHandlerFallback(
        SendEmailHandler,
        SendEmailFallbackHandler,
        circuit_breaker=event_cb,
    ),
)

🔌 Unified circuit breaker protocol

  • ICircuitBreaker – a single protocol used by:
    • Saga step fallbacks (existing)
    • Request handler fallbacks (new)
    • Event handler fallbacks (new)
  • AioBreakerAdapter now implements ICircuitBreaker (and still ISagaStepCircuitBreaker for backward compatibility).
  • One adapter instance per “domain” (e.g. one for requests, one for events) is enough; each handler/step is namespaced by type.

📡 Streaming handlers

  • RequestHandlerFallback is supported for streaming handlers: when the primary streaming handler fails, the fallback stream is used from the start (no resume from the same stream).

🛠️ Helpers

  • get_generic_args_for_origin() in cqrs.generic_utils – used to validate that primary and fallback handlers handle the same request/event and (for requests) the same response type.
  • should_use_fallback() in cqrs.circuit_breaker – central logic to decide whether to run the fallback after a primary failure (circuit open or exception in failure_exceptions, or any exception if failure_exceptions is empty).

📚 Documentation and examples

  • Examples added:
    • examples/request_fallback.py – request handler fallback with optional circuit breaker
    • examples/cor_request_fallback.py – CoR (chain of responsibility) with fallback
    • examples/event_fallback.py – event handler fallback
    • examples/streaming_handler_fallback.py – streaming request handler fallback
  • Unit tests: test_request_fallback.py, test_event_fallback.py; integration tests for the circuit breaker adapter updated.

📋 API summary

Item Description
cqrs.RequestHandlerFallback Fallback wrapper for request/streaming handlers (primary, fallback, optional failure_exceptions, optional circuit_breaker).
cqrs.EventHandlerFallback Fallback wrapper for event handlers (same options).
cqrs.ICircuitBreaker Protocol: call(identifier, func, *args, **kwargs) and is_circuit_breaker_error(exc).
cqrs.circuit_breaker.should_use_fallback Helper to decide if fallback should run after primary error.
cqrs.generic_utils.get_generic_args_for_origin Extract generic type args from handler classes (for validation).

✅ Compatibility

  • Backward compatible: Existing saga fallbacks and AioBreakerAdapter usage continue to work. AioBreakerAdapter now implements ICircuitBreaker in addition to ISagaStepCircuitBreaker.
  • Python: 3.10+
  • Optional: aiobreaker for circuit breaker support (pip install python-cqrs[aiobreaker]).

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 21, 2026

📝 Walkthrough

Walkthrough

Adds a unified ICircuitBreaker protocol and should_use_fallback helper; introduces RequestHandlerFallback and EventHandlerFallback types; updates dispatchers, emitter, and streaming to run primary+fallback (with optional circuit breakers and failure_exceptions); adds examples and tests; bumps version to 4.10.0.

Changes

Cohort / File(s) Summary
Circuit breaker core
src/cqrs/circuit_breaker.py
New ICircuitBreaker Protocol and should_use_fallback helper to standardize circuit-breaker calls and fallback decision.
Circuit breaker adapter
src/cqrs/adapters/circuit_breaker.py, tests/integration/test_pybreaker_adapter.py
AioBreakerAdapter now implements ICircuitBreaker and uses identifier-based namespaces; tests updated call sites to use identifier.
Request fallback types & maps
src/cqrs/requests/fallback.py, src/cqrs/requests/map.py, src/cqrs/requests/request.py, src/cqrs/requests/request_handler.py, src/cqrs/requests/cor_request_handler.py
New RequestHandlerFallback dataclass; handler type alias expanded to accept fallbacks and streaming parity enforced; ReqT/ResT TypeVars moved to requests.request and imports adjusted.
Event fallback types & maps
src/cqrs/events/fallback.py, src/cqrs/events/map.py, src/cqrs/events/__init__.py
New EventHandlerFallback dataclass with validation; EventMap updated to accept fallback entries; public re-export added.
Dispatcher & emitter
src/cqrs/dispatcher/request.py, src/cqrs/dispatcher/streaming.py, src/cqrs/dispatcher/event.py, src/cqrs/events/event_emitter.py
Dispatching and event emission now detect fallback configs, run primary (optionally via circuit breaker), decide via should_use_fallback, and invoke fallback (including streaming switch-over); new helper methods and streaming conversion added.
Saga & saga fallback adjustments
src/cqrs/saga/fallback.py, src/cqrs/saga/execution.py
Saga fallback dataclass now frozen and uses ICircuitBreaker; execution uses positional call style compatible with new adapter API.
Generic utilities
src/cqrs/generic_utils.py
Added helper get_generic_args_for_origin for extracting concrete generic arguments from handler subclasses (used by fallback validation).
Public API & typing refactors
src/cqrs/__init__.py, src/cqrs/types.py, src/cqrs/middlewares/base.py
Re-exports updated to include ICircuitBreaker, EventHandlerFallback, RequestHandlerFallback; ReqT/ResT re-exported from requests.request; minor typing import path adjustments.
Examples
examples/request_fallback.py, examples/event_fallback.py, examples/cor_request_fallback.py, examples/streaming_handler_fallback.py
New runnable examples demonstrating request, event, COR, and streaming fallback patterns, each showing optional circuit-breaker integration and handler tracking.
Tests
tests/unit/test_request_fallback.py, tests/unit/test_event_fallback.py
New unit tests validating fallback triggering, failure_exceptions filtering, and correct invocation of primary vs fallback handlers.
Metadata
pyproject.toml
Project version bumped from 4.9.0 to 4.10.0.

Sequence Diagrams

sequenceDiagram
    participant Client
    participant RequestDispatcher
    participant CircuitBreaker
    participant PrimaryHandler
    participant FallbackHandler

    Client->>RequestDispatcher: dispatch(request)
    RequestDispatcher->>CircuitBreaker: optional call(identifier, primary_func)
    CircuitBreaker->>PrimaryHandler: execute primary
    alt primary succeeds
        PrimaryHandler-->>RequestDispatcher: result
        RequestDispatcher-->>Client: return result
    else primary fails
        PrimaryHandler-->>RequestDispatcher: raises
        RequestDispatcher->>CircuitBreaker: should_use_fallback?
        alt fallback applicable
            RequestDispatcher->>FallbackHandler: resolve & execute fallback
            FallbackHandler-->>RequestDispatcher: fallback result
            RequestDispatcher-->>Client: return fallback result
        else not applicable
            RequestDispatcher-->>Client: propagate original error
        end
    end
Loading
sequenceDiagram
    participant EmitterClient
    participant EventEmitter
    participant CircuitBreaker
    participant PrimaryEventHandler
    participant FallbackEventHandler

    EmitterClient->>EventEmitter: emit(event)
    EventEmitter->>CircuitBreaker: optional call(identifier, primary_func)
    CircuitBreaker->>PrimaryEventHandler: execute primary
    alt primary succeeds
        PrimaryEventHandler-->>EventEmitter: follow-up events
        EventEmitter-->>EmitterClient: complete
    else primary fails / breaker open
        PrimaryEventHandler-->>EventEmitter: raises
        EventEmitter->>CircuitBreaker: should_use_fallback?
        alt fallback applicable
            EventEmitter->>FallbackEventHandler: execute fallback
            FallbackEventHandler-->>EventEmitter: fallback follow-ups
            EventEmitter-->>EmitterClient: complete with fallback follow-ups
        else not applicable
            EventEmitter-->>EmitterClient: propagate original error
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Poem

🐰 I hopped through types and breaker lanes,
I nudged the primary when it strained.
If circuits blink or sources fall,
I leap in soft — a fallback call.
🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 30.53% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title '[Feature] Fallbacks for requests and events handling' directly and clearly describes the main change: adding fallback mechanisms for both request and event handlers, which aligns with the comprehensive feature additions across the codebase.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/add-fallbacks

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov
Copy link

codecov bot commented Feb 21, 2026

Codecov Report

❌ Patch coverage is 73.10924% with 64 lines in your changes missing coverage. Please review.
✅ Project coverage is 86.28%. Comparing base (bf46947) to head (62115e6).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
src/cqrs/dispatcher/event.py 14.81% 23 Missing ⚠️
src/cqrs/dispatcher/streaming.py 48.48% 17 Missing ⚠️
src/cqrs/generic_utils.py 56.25% 7 Missing ⚠️
src/cqrs/events/fallback.py 87.09% 4 Missing ⚠️
src/cqrs/dispatcher/request.py 88.00% 3 Missing ⚠️
src/cqrs/events/event_emitter.py 88.00% 3 Missing ⚠️
src/cqrs/requests/fallback.py 90.90% 3 Missing ⚠️
src/cqrs/types.py 0.00% 2 Missing ⚠️
src/cqrs/adapters/circuit_breaker.py 92.85% 1 Missing ⚠️
src/cqrs/circuit_breaker.py 91.66% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master      #64      +/-   ##
==========================================
- Coverage   87.67%   86.28%   -1.39%     
==========================================
  Files          70       74       +4     
  Lines        2636     2836     +200     
==========================================
+ Hits         2311     2447     +136     
- Misses        325      389      +64     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@codspeed-hq
Copy link
Contributor

codspeed-hq bot commented Feb 21, 2026

Merging this PR will not alter performance

✅ 70 untouched benchmarks
⏩ 10 skipped benchmarks1


Comparing feature/add-fallbacks (62115e6) with master (bf46947)

Open in CodSpeed

Footnotes

  1. 10 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (8)
tests/unit/test_request_fallback.py (2)

55-75: Container implementation is clean for test purposes.

Minor note: _TestRequestContainer satisfies the Container protocol structurally without an explicit inheritance declaration. This is fine for test code but could be fragile if the Container protocol evolves — a type checker won't flag the mismatch at definition time.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/test_request_fallback.py` around lines 55 - 75, The test-only
class _TestRequestContainer currently implements the Container protocol
structurally which can be fragile for type checkers; explicitly declare it to
implement the protocol by inheriting from the Container protocol (e.g., change
the class definition to inherit from Container) and add the necessary import,
and ensure the external_container property and attach_external_container method
signatures match the Container protocol types; update _TestRequestContainer,
external_container, attach_external_container, and resolve to use the
Container/typing imports so a static type checker will validate the
implementation.

78-92: Good coverage of the core fallback path.

Consider adding a complementary test where the primary handler succeeds to verify the happy path returns the primary result and the fallback is never invoked. This would round out the three key scenarios (success, fallback-triggered failure, non-fallback failure).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/test_request_fallback.py` around lines 78 - 92, Add a
complementary unit test that verifies the happy path where the primary handler
succeeds and the fallback is not invoked: create a test similar to
test_request_fallback_no_cb_primary_fails_uses_fallback but dispatch a
SimpleCommand that the PrimaryHandler handles successfully, bind the command
with RequestHandlerFallback(PrimaryHandler, FallbackHandler) on a RequestMap,
use _TestRequestContainer and RequestDispatcher, then assert the returned result
comes from the primary (e.g., primary response value) and that
container._primary.called is True while container._fallback.called is False;
this ensures RequestDispatcher, RequestHandlerFallback, PrimaryHandler,
FallbackHandler, RequestMap, and _TestRequestContainer cover the success
scenario.
tests/unit/test_event_fallback.py (1)

74-109: Good coverage for the two primary paths; consider adding a "matching filter triggers fallback" case.

The existing tests verify "no filter → fallback used" and "non-matching filter → re-raise." A third test where failure_exceptions=(RuntimeError,) would confirm that a matching filter correctly triggers the fallback, completing the three-way matrix.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/test_event_fallback.py` around lines 74 - 109, Add a new unit test
to cover the case where the failure_exceptions filter matches and therefore the
fallback is invoked: create a test (e.g.,
test_event_fallback_matching_filter_triggers_fallback) that binds SampleEvent to
EventHandlerFallback(PrimaryEventHandler, FallbackEventHandler,
failure_exceptions=(RuntimeError,)), instantiate _TestEventContainer and
EventEmitter, call await emitter.emit(SampleEvent(id="e1")), then assert
container._primary.called and container._fallback.called and that follow_ups ==
[] (or the expected follow-ups) to verify the fallback path is exercised when
the exception type matches.
src/cqrs/dispatcher/streaming.py (2)

84-117: Partial primary results are not rolled back on fallback switch.

When the primary handler raises mid-stream, some RequestDispatchResults have already been yielded to the consumer. The fallback then streams all its results from the beginning. The consumer receives partial primary output followed by the full fallback output, which may include duplicates of already-yielded items.

This is consistent with the example in examples/streaming_handler_fallback.py, so it appears intentional. If so, consider adding a brief doc note on _dispatch_impl (or the class docstring) clarifying this "resume from scratch" semantic so consumers know they must handle potential duplicates.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/dispatcher/streaming.py` around lines 84 - 117, Add a short doc note
to the dispatcher explaining that when a primary streaming handler fails
mid-stream, already-yielded RequestDispatchResult items are not rolled back and
the fallback handler will stream from its start (i.e., results may be
duplicated), by updating the _dispatch_impl function docstring or the class
docstring in the same module; reference the streaming path via
_stream_from_handler, StreamingRequestHandler, and handler_type
(primary/fallback) so callers understand the "resume from scratch" semantic and
can de-duplicate if needed.

90-116: Fallback decision logic is duplicated in three dispatchers.

The same should_fallback decision tree (circuit-breaker check → failure_exceptions match → catch-all) is repeated verbatim in streaming.py, request.py (lines 92-107), and event_emitter.py (lines 131-146). Consider extracting a shared helper, e.g.:

♻️ Suggested helper
# e.g. in cqrs/requests/fallback.py or a shared utils module
def should_use_fallback(
    error: Exception,
    failure_exceptions: tuple[type[Exception], ...],
    circuit_breaker: "ICircuitBreaker | None",
) -> bool:
    if circuit_breaker is not None and circuit_breaker.is_circuit_breaker_error(error):
        return True
    if failure_exceptions:
        return isinstance(error, failure_exceptions)
    return True
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/dispatcher/streaming.py` around lines 90 - 116, Extract the
duplicated fallback decision logic into a shared helper (e.g.,
should_use_fallback) and replace the repeated block in streaming.py, request.py
and event_emitter.py: move the logic that checks
handler_type.circuit_breaker.is_circuit_breaker_error(primary_error),
handler_type.failure_exceptions and isinstance(primary_error,
handler_type.failure_exceptions), and the default true case into the helper,
then call it where should_fallback is computed (replace the current
should_fallback assignment in the except blocks that reference handler_type,
primary_error, failure_exceptions and circuit_breaker) so the code yields
fallback via _stream_from_handler or raises the error based on the helper
result.
src/cqrs/requests/fallback.py (1)

8-42: No type-level enforcement that primary and fallback are the same handler kind.

RequestHandlerT allows mixing RequestHandler and StreamingRequestHandler for primary / fallback. A user could accidentally pair a RequestHandler primary with a StreamingRequestHandler fallback. The streaming dispatcher guards against this at runtime (lines 77-83 of streaming.py), but the request dispatcher (request.py) does not perform a similar check — it would fail with a confusing error at call time.

Consider either:

  • Adding a __post_init__ validation that both are subclasses of the same base, or
  • Narrowing with separate Generic-based types.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/requests/fallback.py` around lines 8 - 42, The
RequestHandlerFallback dataclass allows mixing RequestHandler and
StreamingRequestHandler via RequestHandlerT, so add a validation in
RequestHandlerFallback.__post_init__ that ensures primary and fallback are both
subclasses of the same base type (e.g., both subclass RequestHandler or both
subclass StreamingRequestHandler) and raise a clear TypeError if they differ;
implement this check using issubclass on the primary and fallback class
attributes and ensure it handles None/invalid inputs safely.
src/cqrs/events/event_emitter.py (1)

114-158: Double warning log on circuit-breaker-triggered fallback.

When is_circuit_breaker_error is True, should_fallback is set at line 141, then the generic "Primary event handler failed" warning fires again at line 149. The consumer sees two warnings for the same incident. The same pattern exists in request.py and streaming.py.

♻️ Suggested fix – skip the generic log when the CB-specific log was already emitted
             if should_fallback:
-                logger.warning(
-                    "Primary event handler %s failed: %s. Switching to fallback %s.",
-                    fallback_config.primary.__name__,
-                    primary_error,
-                    fallback_config.fallback.__name__,
-                )
+                if not (fallback_config.circuit_breaker is not None and fallback_config.circuit_breaker.is_circuit_breaker_error(primary_error)):
+                    logger.warning(
+                        "Primary event handler %s failed: %s. Switching to fallback %s.",
+                        fallback_config.primary.__name__,
+                        primary_error,
+                        fallback_config.fallback.__name__,
+                    )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/events/event_emitter.py` around lines 114 - 158, In
_handle_with_fallback, avoid emitting the generic "Primary event handler failed"
warning when the circuit-breaker-specific warning was already logged: add a
small flag (e.g., cb_logged or already_logged) or branch so that when
fallback_config.circuit_breaker.is_circuit_breaker_error(primary_error) sets
should_fallback True, you do not also run the generic logger for the same error;
only log the generic warning for non-circuit-breaker failures before resolving
and invoking the fallback handler (apply the same change pattern to the
corresponding functions in request.py and streaming.py).
examples/event_fallback.py (1)

166-172: Same misleading try/except ImportError pattern as in request_fallback.py.

from cqrs.adapters.circuit_breaker import AioBreakerAdapter will always succeed since AioBreakerAdapter is part of the cqrs package. The ImportError is raised during instantiation (line 174), not import. The guard in main() prevents this from being a runtime issue, but the try/except is still misleading. See the comment on examples/request_fallback.py line 157–162 for the suggested fix pattern.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/event_fallback.py` around lines 166 - 172, The try/except around
importing AioBreakerAdapter in events_mapper_with_circuit_breaker is misleading
because AioBreakerAdapter is part of the package and the ImportError can occur
during instantiation; remove the guarded import and import AioBreakerAdapter
normally, then wrap only the instantiation of AioBreakerAdapter in a try/except
ImportError (or the specific error raised during instantiation) so that on
failure you fall back to calling events_mapper(mapper) and return; update
references inside events_mapper_with_circuit_breaker to use the unguarded import
and handle fallback when creating the AioBreakerAdapter instance.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@examples/request_fallback.py`:
- Around line 157-162: The try/except around the import of AioBreakerAdapter in
commands_mapper_with_circuit_breaker is ineffective because the ImportError is
raised during AioBreakerAdapter.__init__, not at import time; update the code to
import AioBreakerAdapter normally and move the ImportError guard to the
instantiation site (i.e., wrap the creation call of AioBreakerAdapter in a
try/except ImportError and fall back to commands_mapper(mapper) on exception),
and apply the same change to the analogous function in event_fallback.py to
avoid misleading dead-code guards.

In `@src/cqrs/circuit_breaker.py`:
- Around line 14-16: Remove the misleading "Attributes:" block from the
class-level docstring (the Protocol has no attributes) and ensure the parameter
"identifier" is documented only on the call() method's docstring; update the
Protocol class docstring to describe purpose only, and move or keep the detailed
description of the "identifier" parameter inside the call(self, ..., identifier:
str, ...) method's docstring (referencing the Protocol class name and the call()
method to locate the change).

In `@src/cqrs/types.py`:
- Around line 1-7: The module docstring claims it re-exports ReqT and ResT for
backward compatibility but the file does not actually import or export them; add
the missing re-exports by importing ReqT and ResT from cqrs.requests.request
(and any interfaces from cqrs.response you intend to expose) and include them in
the module's exports (e.g., via __all__) so that "from cqrs.types import ReqT,
ResT" works as advertised; alternatively, if you prefer a breaking change,
update the docstring to remove the promise of re-exports and document the new
import locations instead.

---

Nitpick comments:
In `@examples/event_fallback.py`:
- Around line 166-172: The try/except around importing AioBreakerAdapter in
events_mapper_with_circuit_breaker is misleading because AioBreakerAdapter is
part of the package and the ImportError can occur during instantiation; remove
the guarded import and import AioBreakerAdapter normally, then wrap only the
instantiation of AioBreakerAdapter in a try/except ImportError (or the specific
error raised during instantiation) so that on failure you fall back to calling
events_mapper(mapper) and return; update references inside
events_mapper_with_circuit_breaker to use the unguarded import and handle
fallback when creating the AioBreakerAdapter instance.

In `@src/cqrs/dispatcher/streaming.py`:
- Around line 84-117: Add a short doc note to the dispatcher explaining that
when a primary streaming handler fails mid-stream, already-yielded
RequestDispatchResult items are not rolled back and the fallback handler will
stream from its start (i.e., results may be duplicated), by updating the
_dispatch_impl function docstring or the class docstring in the same module;
reference the streaming path via _stream_from_handler, StreamingRequestHandler,
and handler_type (primary/fallback) so callers understand the "resume from
scratch" semantic and can de-duplicate if needed.
- Around line 90-116: Extract the duplicated fallback decision logic into a
shared helper (e.g., should_use_fallback) and replace the repeated block in
streaming.py, request.py and event_emitter.py: move the logic that checks
handler_type.circuit_breaker.is_circuit_breaker_error(primary_error),
handler_type.failure_exceptions and isinstance(primary_error,
handler_type.failure_exceptions), and the default true case into the helper,
then call it where should_fallback is computed (replace the current
should_fallback assignment in the except blocks that reference handler_type,
primary_error, failure_exceptions and circuit_breaker) so the code yields
fallback via _stream_from_handler or raises the error based on the helper
result.

In `@src/cqrs/events/event_emitter.py`:
- Around line 114-158: In _handle_with_fallback, avoid emitting the generic
"Primary event handler failed" warning when the circuit-breaker-specific warning
was already logged: add a small flag (e.g., cb_logged or already_logged) or
branch so that when
fallback_config.circuit_breaker.is_circuit_breaker_error(primary_error) sets
should_fallback True, you do not also run the generic logger for the same error;
only log the generic warning for non-circuit-breaker failures before resolving
and invoking the fallback handler (apply the same change pattern to the
corresponding functions in request.py and streaming.py).

In `@src/cqrs/requests/fallback.py`:
- Around line 8-42: The RequestHandlerFallback dataclass allows mixing
RequestHandler and StreamingRequestHandler via RequestHandlerT, so add a
validation in RequestHandlerFallback.__post_init__ that ensures primary and
fallback are both subclasses of the same base type (e.g., both subclass
RequestHandler or both subclass StreamingRequestHandler) and raise a clear
TypeError if they differ; implement this check using issubclass on the primary
and fallback class attributes and ensure it handles None/invalid inputs safely.

In `@tests/unit/test_event_fallback.py`:
- Around line 74-109: Add a new unit test to cover the case where the
failure_exceptions filter matches and therefore the fallback is invoked: create
a test (e.g., test_event_fallback_matching_filter_triggers_fallback) that binds
SampleEvent to EventHandlerFallback(PrimaryEventHandler, FallbackEventHandler,
failure_exceptions=(RuntimeError,)), instantiate _TestEventContainer and
EventEmitter, call await emitter.emit(SampleEvent(id="e1")), then assert
container._primary.called and container._fallback.called and that follow_ups ==
[] (or the expected follow-ups) to verify the fallback path is exercised when
the exception type matches.

In `@tests/unit/test_request_fallback.py`:
- Around line 55-75: The test-only class _TestRequestContainer currently
implements the Container protocol structurally which can be fragile for type
checkers; explicitly declare it to implement the protocol by inheriting from the
Container protocol (e.g., change the class definition to inherit from Container)
and add the necessary import, and ensure the external_container property and
attach_external_container method signatures match the Container protocol types;
update _TestRequestContainer, external_container, attach_external_container, and
resolve to use the Container/typing imports so a static type checker will
validate the implementation.
- Around line 78-92: Add a complementary unit test that verifies the happy path
where the primary handler succeeds and the fallback is not invoked: create a
test similar to test_request_fallback_no_cb_primary_fails_uses_fallback but
dispatch a SimpleCommand that the PrimaryHandler handles successfully, bind the
command with RequestHandlerFallback(PrimaryHandler, FallbackHandler) on a
RequestMap, use _TestRequestContainer and RequestDispatcher, then assert the
returned result comes from the primary (e.g., primary response value) and that
container._primary.called is True while container._fallback.called is False;
this ensures RequestDispatcher, RequestHandlerFallback, PrimaryHandler,
FallbackHandler, RequestMap, and _TestRequestContainer cover the success
scenario.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
src/cqrs/dispatcher/streaming.py (1)

56-64: Return type annotation is imprecise — AsyncGenerator is more accurate than AsyncIterator.

_stream_from_handler contains a yield, making it an async generator function whose return type is AsyncGenerator[RequestDispatchResult, None]. AsyncIterator is a valid supertype so it compiles, but the more precise annotation helps callers and type checkers. The same applies to _dispatch_impl at line 69.

✏️ Proposed fix
 `@staticmethod`
 async def _stream_from_handler(
     request: IRequest,
     handler: StreamingRequestHandler,
-) -> typing.AsyncIterator[RequestDispatchResult]:
+) -> typing.AsyncGenerator[RequestDispatchResult, None]:
 async def _dispatch_impl(
     self,
     request: IRequest,
-) -> typing.AsyncIterator[RequestDispatchResult]:
+) -> typing.AsyncGenerator[RequestDispatchResult, None]:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/dispatcher/streaming.py` around lines 56 - 64, Update the async
generator return annotations to be precise: change the return type of
_stream_from_handler and _dispatch_impl from typing.AsyncIterator[...] to
typing.AsyncGenerator[RequestDispatchResult, None] (or the equivalent generic
using the concrete RequestDispatchResult) and add/import typing.AsyncGenerator
if not already present; ensure the annotations reference RequestDispatchResult
so type checkers treat these as async generators rather than plain async
iterators.
src/cqrs/dispatcher/request.py (1)

100-117: Consider extracting the fallback-log-selection block into a shared helper.

The block that picks between two logger.warning messages (CB-open vs. primary-failed) is copy-pasted identically in request.py (lines 100–117), streaming.py (lines 109–126), and event_emitter.py (lines 139–156). A small helper in cqrs/circuit_breaker.py (or a shared _log_fallback_switch in cqrs/dispatcher/) would eliminate the duplication and centralise the message format.

♻️ Suggested helper (example)
# e.g. in cqrs/circuit_breaker.py or a shared module
def log_fallback_switch(
    logger: logging.Logger,
    primary_cls: type,
    fallback_cls: type,
    primary_error: Exception,
    circuit_breaker: "ICircuitBreaker | None",
    handler_kind: str = "handler",
) -> None:
    if circuit_breaker is not None and circuit_breaker.is_circuit_breaker_error(primary_error):
        logger.warning(
            "Circuit breaker open for %s %s, switching to fallback %s",
            handler_kind, primary_cls.__name__, fallback_cls.__name__,
        )
    else:
        logger.warning(
            "Primary %s %s failed: %s. Switching to fallback %s.",
            handler_kind, primary_cls.__name__, primary_error, fallback_cls.__name__,
        )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/dispatcher/request.py` around lines 100 - 117, Extract the
duplicated fallback logging into a shared helper (e.g., log_fallback_switch) and
replace the copy-pasted blocks in request.py, streaming.py, and event_emitter.py
with calls to that helper; the helper should accept the Logger instance, primary
class (fallback_config.primary), fallback class (fallback_config.fallback), the
primary_error, the circuit breaker instance (fallback_config.circuit_breaker)
and an optional handler_kind string, call
circuit_breaker.is_circuit_breaker_error(primary_error) to decide which warning
message to emit, and format messages exactly as the existing logger.warning
calls to preserve behavior and context.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/cqrs/dispatcher/streaming.py`:
- Around line 86-133: The streaming dispatcher currently calls primary/fallback
streaming handlers via _stream_from_handler and never invokes
handler_type.circuit_breaker.call, so streaming failures don't update or trip
the circuit breaker; to fix, add an explicit guard in
StreamingRequestDispatcher._dispatch_impl (when isinstance(handler_type,
RequestHandlerFallback) and handlers are async generators) to detect a non-None
handler_type.circuit_breaker and either raise a ValueError or log/warn that
circuit breakers are unsupported for streaming fallback handlers (and recommend
removing the circuit_breaker or using non-streaming handlers); reference
RequestHandlerFallback, StreamingRequestHandler, _stream_from_handler, and
_dispatch_impl to locate and implement this validation.

---

Duplicate comments:
In `@src/cqrs/events/event_emitter.py`:
- Around line 115-160: Duplicate fallback-log-selection logic appears in
_handle_with_fallback and two other dispatchers; extract it into a shared helper
(e.g., log_fallback_selection or choose_and_log_fallback) and call that helper
from each site. The helper should accept the primary handler identifier
(fallback_config.primary), fallback identifier (fallback_config.fallback), the
caught exception (primary_error) and the circuit_breaker object
(fallback_config.circuit_breaker), determine whether the error is a
circuit-breaker error using
circuit_breaker.is_circuit_breaker_error(primary_error) when present, and emit
the same warning messages currently in lines 139–156; then replace the
duplicated warning blocks in _handle_with_fallback and the two request.py
locations with calls to this new helper.

---

Nitpick comments:
In `@src/cqrs/dispatcher/request.py`:
- Around line 100-117: Extract the duplicated fallback logging into a shared
helper (e.g., log_fallback_switch) and replace the copy-pasted blocks in
request.py, streaming.py, and event_emitter.py with calls to that helper; the
helper should accept the Logger instance, primary class
(fallback_config.primary), fallback class (fallback_config.fallback), the
primary_error, the circuit breaker instance (fallback_config.circuit_breaker)
and an optional handler_kind string, call
circuit_breaker.is_circuit_breaker_error(primary_error) to decide which warning
message to emit, and format messages exactly as the existing logger.warning
calls to preserve behavior and context.

In `@src/cqrs/dispatcher/streaming.py`:
- Around line 56-64: Update the async generator return annotations to be
precise: change the return type of _stream_from_handler and _dispatch_impl from
typing.AsyncIterator[...] to typing.AsyncGenerator[RequestDispatchResult, None]
(or the equivalent generic using the concrete RequestDispatchResult) and
add/import typing.AsyncGenerator if not already present; ensure the annotations
reference RequestDispatchResult so type checkers treat these as async generators
rather than plain async iterators.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
src/cqrs/requests/fallback.py (1)

52-63: Missing explicit RequestHandler/StreamingRequestHandler subclass check — misleading error for non-handler classes.

Unlike EventHandlerFallback (which explicitly verifies issubclass(…, EventHandler) on Lines 56–63 of src/cqrs/events/fallback.py), RequestHandlerFallback only checks streaming parity via issubclass(…, StreamingRequestHandler). If a user passes a completely unrelated class (e.g. a plain object subclass), the streaming check passes (both False), generic-arg extraction returns None, and the user gets "must be parameterized with concrete types" — which is confusing when the real problem is that the class isn't a handler at all.

Proposed fix
         if not isinstance(self.primary, type) or not isinstance(self.fallback, type):
             raise TypeError(
                 "RequestHandlerFallback primary and fallback must be handler classes",
             )
+        if not (issubclass(self.primary, (RequestHandler, StreamingRequestHandler))):
+            raise TypeError(
+                f"RequestHandlerFallback primary ({self.primary.__name__}) "
+                "must be a subclass of RequestHandler or StreamingRequestHandler",
+            )
+        if not (issubclass(self.fallback, (RequestHandler, StreamingRequestHandler))):
+            raise TypeError(
+                f"RequestHandlerFallback fallback ({self.fallback.__name__}) "
+                "must be a subclass of RequestHandler or StreamingRequestHandler",
+            )
         primary_streaming = issubclass(self.primary, StreamingRequestHandler)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/requests/fallback.py` around lines 52 - 63, The __post_init__ of
RequestHandlerFallback should explicitly verify that primary and fallback are
valid handler subclasses: check that each is a subclass of RequestHandler when
not streaming or a subclass of StreamingRequestHandler when streaming; if not,
raise a clear TypeError (e.g. "primary and fallback must be RequestHandler
subclasses" or "must be StreamingRequestHandler subclasses"). Concretely, inside
RequestHandlerFallback.__post_init__, after computing primary_streaming and
fallback_streaming, add explicit issubclass checks against RequestHandler or
StreamingRequestHandler as appropriate and raise a descriptive TypeError if any
class fails the handler-type check, instead of relying only on streaming parity.
src/cqrs/dispatcher/streaming.py (1)

84-93: Both handlers are eagerly resolved even when primary may succeed.

fallback_handler is resolved from the container (Line 86) before the primary handler is even attempted. If handler construction is expensive (e.g. allocates DB connections), this is wasteful in the success path.

Consider deferring fallback resolution into the except block:

Proposed change
         if isinstance(handler_type, RequestHandlerFallback):
             primary = await self._container.resolve(handler_type.primary)
-            fallback_handler = await self._container.resolve(handler_type.fallback)
-            if not inspect.isasyncgenfunction(primary.handle) or not inspect.isasyncgenfunction(
-                fallback_handler.handle,
-            ):
+            if not inspect.isasyncgenfunction(primary.handle):
                 raise TypeError(
                     "RequestHandlerFallback with StreamingRequestDispatcher requires "
                     "both primary and fallback to be async generator handlers",
                 )
             try:
                 async for result in self._stream_from_handler(
                     request,
                     typing.cast(StreamingRequestHandler, primary),
                 ):
                     yield result
             except Exception as primary_error:
                 should_fallback = should_use_fallback(
                     primary_error,
                     handler_type.circuit_breaker,
                     handler_type.failure_exceptions,
                 )
                 if should_fallback:
+                    fallback_handler = await self._container.resolve(handler_type.fallback)
+                    if not inspect.isasyncgenfunction(fallback_handler.handle):
+                        raise TypeError(
+                            "RequestHandlerFallback with StreamingRequestDispatcher requires "
+                            "both primary and fallback to be async generator handlers",
+                        )

That said, eager resolution has the upside of failing fast on misconfiguration. This is a trade-off — mentioning for awareness.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/dispatcher/streaming.py` around lines 84 - 93, Currently the code
eagerly resolves both primary and fallback handlers by calling
self._container.resolve on handler_type.primary and handler_type.fallback;
change it to resolve only the primary first (await
self._container.resolve(handler_type.primary)), check that primary.handle is an
async generator (using inspect.isasyncgenfunction), then attempt the primary and
only resolve the fallback (await self._container.resolve(handler_type.fallback))
inside the exception/except path if primary fails or when you need to fall back;
keep the same TypeError checks for async generator handlers
(inspect.isasyncgenfunction on primary.handle and fallback.handle) but perform
the fallback.handle check after resolving the fallback so you avoid expensive
eager construction while preserving validation when the fallback is actually
used (reference RequestHandlerFallback, primary, fallback_handler,
self._container.resolve, and inspect.isasyncgenfunction).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@src/cqrs/dispatcher/streaming.py`:
- Around line 94-131: The streaming path currently calls
_stream_from_handler(request, primary) directly so the circuit breaker never
records streaming failures; change the primary stream invocation to go through
handler_type.circuit_breaker.call(...) when a circuit breaker exists so failures
update CB state and CircuitBreakerError can be detected by
handler_type.circuit_breaker.is_circuit_breaker_error(primary_error).
Concretely: when handler_type.circuit_breaker is not None, invoke
handler_type.circuit_breaker.call(lambda: self._stream_from_handler(request,
typing.cast(StreamingRequestHandler, primary))) (or wrap _stream_from_handler in
an async function that returns/iterates the async generator) and then async for
over the result of that call; keep the fallback path the same and retain the
existing should_use_fallback and is_circuit_breaker_error checks.

---

Nitpick comments:
In `@src/cqrs/dispatcher/streaming.py`:
- Around line 84-93: Currently the code eagerly resolves both primary and
fallback handlers by calling self._container.resolve on handler_type.primary and
handler_type.fallback; change it to resolve only the primary first (await
self._container.resolve(handler_type.primary)), check that primary.handle is an
async generator (using inspect.isasyncgenfunction), then attempt the primary and
only resolve the fallback (await self._container.resolve(handler_type.fallback))
inside the exception/except path if primary fails or when you need to fall back;
keep the same TypeError checks for async generator handlers
(inspect.isasyncgenfunction on primary.handle and fallback.handle) but perform
the fallback.handle check after resolving the fallback so you avoid expensive
eager construction while preserving validation when the fallback is actually
used (reference RequestHandlerFallback, primary, fallback_handler,
self._container.resolve, and inspect.isasyncgenfunction).

In `@src/cqrs/requests/fallback.py`:
- Around line 52-63: The __post_init__ of RequestHandlerFallback should
explicitly verify that primary and fallback are valid handler subclasses: check
that each is a subclass of RequestHandler when not streaming or a subclass of
StreamingRequestHandler when streaming; if not, raise a clear TypeError (e.g.
"primary and fallback must be RequestHandler subclasses" or "must be
StreamingRequestHandler subclasses"). Concretely, inside
RequestHandlerFallback.__post_init__, after computing primary_streaming and
fallback_streaming, add explicit issubclass checks against RequestHandler or
StreamingRequestHandler as appropriate and raise a descriptive TypeError if any
class fails the handler-type check, instead of relying only on streaming parity.

@vadikko2 vadikko2 changed the title [Feature] Add Fallback for requests and events [Feature] Fallbacks for requests and events handling Feb 21, 2026
@vadikko2 vadikko2 merged commit 95aed36 into master Feb 21, 2026
9 of 11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant