[Feature] Fallbacks for requests and events handling#64
Conversation
📝 WalkthroughWalkthroughAdds a unified ICircuitBreaker protocol and should_use_fallback helper; introduces RequestHandlerFallback and EventHandlerFallback types; updates dispatchers, emitter, and streaming to run primary+fallback (with optional circuit breakers and failure_exceptions); adds examples and tests; bumps version to 4.10.0. Changes
Sequence DiagramssequenceDiagram
participant Client
participant RequestDispatcher
participant CircuitBreaker
participant PrimaryHandler
participant FallbackHandler
Client->>RequestDispatcher: dispatch(request)
RequestDispatcher->>CircuitBreaker: optional call(identifier, primary_func)
CircuitBreaker->>PrimaryHandler: execute primary
alt primary succeeds
PrimaryHandler-->>RequestDispatcher: result
RequestDispatcher-->>Client: return result
else primary fails
PrimaryHandler-->>RequestDispatcher: raises
RequestDispatcher->>CircuitBreaker: should_use_fallback?
alt fallback applicable
RequestDispatcher->>FallbackHandler: resolve & execute fallback
FallbackHandler-->>RequestDispatcher: fallback result
RequestDispatcher-->>Client: return fallback result
else not applicable
RequestDispatcher-->>Client: propagate original error
end
end
sequenceDiagram
participant EmitterClient
participant EventEmitter
participant CircuitBreaker
participant PrimaryEventHandler
participant FallbackEventHandler
EmitterClient->>EventEmitter: emit(event)
EventEmitter->>CircuitBreaker: optional call(identifier, primary_func)
CircuitBreaker->>PrimaryEventHandler: execute primary
alt primary succeeds
PrimaryEventHandler-->>EventEmitter: follow-up events
EventEmitter-->>EmitterClient: complete
else primary fails / breaker open
PrimaryEventHandler-->>EventEmitter: raises
EventEmitter->>CircuitBreaker: should_use_fallback?
alt fallback applicable
EventEmitter->>FallbackEventHandler: execute fallback
FallbackEventHandler-->>EventEmitter: fallback follow-ups
EventEmitter-->>EmitterClient: complete with fallback follow-ups
else not applicable
EventEmitter-->>EmitterClient: propagate original error
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #64 +/- ##
==========================================
- Coverage 87.67% 86.28% -1.39%
==========================================
Files 70 74 +4
Lines 2636 2836 +200
==========================================
+ Hits 2311 2447 +136
- Misses 325 389 +64 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Merging this PR will not alter performance
Comparing Footnotes
|
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (8)
tests/unit/test_request_fallback.py (2)
55-75: Container implementation is clean for test purposes.Minor note:
_TestRequestContainersatisfies theContainerprotocol structurally without an explicit inheritance declaration. This is fine for test code but could be fragile if theContainerprotocol evolves — a type checker won't flag the mismatch at definition time.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/test_request_fallback.py` around lines 55 - 75, The test-only class _TestRequestContainer currently implements the Container protocol structurally which can be fragile for type checkers; explicitly declare it to implement the protocol by inheriting from the Container protocol (e.g., change the class definition to inherit from Container) and add the necessary import, and ensure the external_container property and attach_external_container method signatures match the Container protocol types; update _TestRequestContainer, external_container, attach_external_container, and resolve to use the Container/typing imports so a static type checker will validate the implementation.
78-92: Good coverage of the core fallback path.Consider adding a complementary test where the primary handler succeeds to verify the happy path returns the primary result and the fallback is never invoked. This would round out the three key scenarios (success, fallback-triggered failure, non-fallback failure).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/test_request_fallback.py` around lines 78 - 92, Add a complementary unit test that verifies the happy path where the primary handler succeeds and the fallback is not invoked: create a test similar to test_request_fallback_no_cb_primary_fails_uses_fallback but dispatch a SimpleCommand that the PrimaryHandler handles successfully, bind the command with RequestHandlerFallback(PrimaryHandler, FallbackHandler) on a RequestMap, use _TestRequestContainer and RequestDispatcher, then assert the returned result comes from the primary (e.g., primary response value) and that container._primary.called is True while container._fallback.called is False; this ensures RequestDispatcher, RequestHandlerFallback, PrimaryHandler, FallbackHandler, RequestMap, and _TestRequestContainer cover the success scenario.tests/unit/test_event_fallback.py (1)
74-109: Good coverage for the two primary paths; consider adding a "matching filter triggers fallback" case.The existing tests verify "no filter → fallback used" and "non-matching filter → re-raise." A third test where
failure_exceptions=(RuntimeError,)would confirm that a matching filter correctly triggers the fallback, completing the three-way matrix.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/test_event_fallback.py` around lines 74 - 109, Add a new unit test to cover the case where the failure_exceptions filter matches and therefore the fallback is invoked: create a test (e.g., test_event_fallback_matching_filter_triggers_fallback) that binds SampleEvent to EventHandlerFallback(PrimaryEventHandler, FallbackEventHandler, failure_exceptions=(RuntimeError,)), instantiate _TestEventContainer and EventEmitter, call await emitter.emit(SampleEvent(id="e1")), then assert container._primary.called and container._fallback.called and that follow_ups == [] (or the expected follow-ups) to verify the fallback path is exercised when the exception type matches.src/cqrs/dispatcher/streaming.py (2)
84-117: Partial primary results are not rolled back on fallback switch.When the primary handler raises mid-stream, some
RequestDispatchResults have already been yielded to the consumer. The fallback then streams all its results from the beginning. The consumer receives partial primary output followed by the full fallback output, which may include duplicates of already-yielded items.This is consistent with the example in
examples/streaming_handler_fallback.py, so it appears intentional. If so, consider adding a brief doc note on_dispatch_impl(or the class docstring) clarifying this "resume from scratch" semantic so consumers know they must handle potential duplicates.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/dispatcher/streaming.py` around lines 84 - 117, Add a short doc note to the dispatcher explaining that when a primary streaming handler fails mid-stream, already-yielded RequestDispatchResult items are not rolled back and the fallback handler will stream from its start (i.e., results may be duplicated), by updating the _dispatch_impl function docstring or the class docstring in the same module; reference the streaming path via _stream_from_handler, StreamingRequestHandler, and handler_type (primary/fallback) so callers understand the "resume from scratch" semantic and can de-duplicate if needed.
90-116: Fallback decision logic is duplicated in three dispatchers.The same
should_fallbackdecision tree (circuit-breaker check → failure_exceptions match → catch-all) is repeated verbatim instreaming.py,request.py(lines 92-107), andevent_emitter.py(lines 131-146). Consider extracting a shared helper, e.g.:♻️ Suggested helper
# e.g. in cqrs/requests/fallback.py or a shared utils module def should_use_fallback( error: Exception, failure_exceptions: tuple[type[Exception], ...], circuit_breaker: "ICircuitBreaker | None", ) -> bool: if circuit_breaker is not None and circuit_breaker.is_circuit_breaker_error(error): return True if failure_exceptions: return isinstance(error, failure_exceptions) return True🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/dispatcher/streaming.py` around lines 90 - 116, Extract the duplicated fallback decision logic into a shared helper (e.g., should_use_fallback) and replace the repeated block in streaming.py, request.py and event_emitter.py: move the logic that checks handler_type.circuit_breaker.is_circuit_breaker_error(primary_error), handler_type.failure_exceptions and isinstance(primary_error, handler_type.failure_exceptions), and the default true case into the helper, then call it where should_fallback is computed (replace the current should_fallback assignment in the except blocks that reference handler_type, primary_error, failure_exceptions and circuit_breaker) so the code yields fallback via _stream_from_handler or raises the error based on the helper result.src/cqrs/requests/fallback.py (1)
8-42: No type-level enforcement that primary and fallback are the same handler kind.
RequestHandlerTallows mixingRequestHandlerandStreamingRequestHandlerforprimary/fallback. A user could accidentally pair aRequestHandlerprimary with aStreamingRequestHandlerfallback. The streaming dispatcher guards against this at runtime (lines 77-83 ofstreaming.py), but the request dispatcher (request.py) does not perform a similar check — it would fail with a confusing error at call time.Consider either:
- Adding a
__post_init__validation that both are subclasses of the same base, or- Narrowing with separate
Generic-based types.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/requests/fallback.py` around lines 8 - 42, The RequestHandlerFallback dataclass allows mixing RequestHandler and StreamingRequestHandler via RequestHandlerT, so add a validation in RequestHandlerFallback.__post_init__ that ensures primary and fallback are both subclasses of the same base type (e.g., both subclass RequestHandler or both subclass StreamingRequestHandler) and raise a clear TypeError if they differ; implement this check using issubclass on the primary and fallback class attributes and ensure it handles None/invalid inputs safely.src/cqrs/events/event_emitter.py (1)
114-158: Double warning log on circuit-breaker-triggered fallback.When
is_circuit_breaker_errorisTrue,should_fallbackis set at line 141, then the generic "Primary event handler failed" warning fires again at line 149. The consumer sees two warnings for the same incident. The same pattern exists inrequest.pyandstreaming.py.♻️ Suggested fix – skip the generic log when the CB-specific log was already emitted
if should_fallback: - logger.warning( - "Primary event handler %s failed: %s. Switching to fallback %s.", - fallback_config.primary.__name__, - primary_error, - fallback_config.fallback.__name__, - ) + if not (fallback_config.circuit_breaker is not None and fallback_config.circuit_breaker.is_circuit_breaker_error(primary_error)): + logger.warning( + "Primary event handler %s failed: %s. Switching to fallback %s.", + fallback_config.primary.__name__, + primary_error, + fallback_config.fallback.__name__, + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/events/event_emitter.py` around lines 114 - 158, In _handle_with_fallback, avoid emitting the generic "Primary event handler failed" warning when the circuit-breaker-specific warning was already logged: add a small flag (e.g., cb_logged or already_logged) or branch so that when fallback_config.circuit_breaker.is_circuit_breaker_error(primary_error) sets should_fallback True, you do not also run the generic logger for the same error; only log the generic warning for non-circuit-breaker failures before resolving and invoking the fallback handler (apply the same change pattern to the corresponding functions in request.py and streaming.py).examples/event_fallback.py (1)
166-172: Same misleadingtry/except ImportErrorpattern as inrequest_fallback.py.
from cqrs.adapters.circuit_breaker import AioBreakerAdapterwill always succeed sinceAioBreakerAdapteris part of thecqrspackage. TheImportErroris raised during instantiation (line 174), not import. The guard inmain()prevents this from being a runtime issue, but the try/except is still misleading. See the comment onexamples/request_fallback.pyline 157–162 for the suggested fix pattern.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/event_fallback.py` around lines 166 - 172, The try/except around importing AioBreakerAdapter in events_mapper_with_circuit_breaker is misleading because AioBreakerAdapter is part of the package and the ImportError can occur during instantiation; remove the guarded import and import AioBreakerAdapter normally, then wrap only the instantiation of AioBreakerAdapter in a try/except ImportError (or the specific error raised during instantiation) so that on failure you fall back to calling events_mapper(mapper) and return; update references inside events_mapper_with_circuit_breaker to use the unguarded import and handle fallback when creating the AioBreakerAdapter instance.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@examples/request_fallback.py`:
- Around line 157-162: The try/except around the import of AioBreakerAdapter in
commands_mapper_with_circuit_breaker is ineffective because the ImportError is
raised during AioBreakerAdapter.__init__, not at import time; update the code to
import AioBreakerAdapter normally and move the ImportError guard to the
instantiation site (i.e., wrap the creation call of AioBreakerAdapter in a
try/except ImportError and fall back to commands_mapper(mapper) on exception),
and apply the same change to the analogous function in event_fallback.py to
avoid misleading dead-code guards.
In `@src/cqrs/circuit_breaker.py`:
- Around line 14-16: Remove the misleading "Attributes:" block from the
class-level docstring (the Protocol has no attributes) and ensure the parameter
"identifier" is documented only on the call() method's docstring; update the
Protocol class docstring to describe purpose only, and move or keep the detailed
description of the "identifier" parameter inside the call(self, ..., identifier:
str, ...) method's docstring (referencing the Protocol class name and the call()
method to locate the change).
In `@src/cqrs/types.py`:
- Around line 1-7: The module docstring claims it re-exports ReqT and ResT for
backward compatibility but the file does not actually import or export them; add
the missing re-exports by importing ReqT and ResT from cqrs.requests.request
(and any interfaces from cqrs.response you intend to expose) and include them in
the module's exports (e.g., via __all__) so that "from cqrs.types import ReqT,
ResT" works as advertised; alternatively, if you prefer a breaking change,
update the docstring to remove the promise of re-exports and document the new
import locations instead.
---
Nitpick comments:
In `@examples/event_fallback.py`:
- Around line 166-172: The try/except around importing AioBreakerAdapter in
events_mapper_with_circuit_breaker is misleading because AioBreakerAdapter is
part of the package and the ImportError can occur during instantiation; remove
the guarded import and import AioBreakerAdapter normally, then wrap only the
instantiation of AioBreakerAdapter in a try/except ImportError (or the specific
error raised during instantiation) so that on failure you fall back to calling
events_mapper(mapper) and return; update references inside
events_mapper_with_circuit_breaker to use the unguarded import and handle
fallback when creating the AioBreakerAdapter instance.
In `@src/cqrs/dispatcher/streaming.py`:
- Around line 84-117: Add a short doc note to the dispatcher explaining that
when a primary streaming handler fails mid-stream, already-yielded
RequestDispatchResult items are not rolled back and the fallback handler will
stream from its start (i.e., results may be duplicated), by updating the
_dispatch_impl function docstring or the class docstring in the same module;
reference the streaming path via _stream_from_handler, StreamingRequestHandler,
and handler_type (primary/fallback) so callers understand the "resume from
scratch" semantic and can de-duplicate if needed.
- Around line 90-116: Extract the duplicated fallback decision logic into a
shared helper (e.g., should_use_fallback) and replace the repeated block in
streaming.py, request.py and event_emitter.py: move the logic that checks
handler_type.circuit_breaker.is_circuit_breaker_error(primary_error),
handler_type.failure_exceptions and isinstance(primary_error,
handler_type.failure_exceptions), and the default true case into the helper,
then call it where should_fallback is computed (replace the current
should_fallback assignment in the except blocks that reference handler_type,
primary_error, failure_exceptions and circuit_breaker) so the code yields
fallback via _stream_from_handler or raises the error based on the helper
result.
In `@src/cqrs/events/event_emitter.py`:
- Around line 114-158: In _handle_with_fallback, avoid emitting the generic
"Primary event handler failed" warning when the circuit-breaker-specific warning
was already logged: add a small flag (e.g., cb_logged or already_logged) or
branch so that when
fallback_config.circuit_breaker.is_circuit_breaker_error(primary_error) sets
should_fallback True, you do not also run the generic logger for the same error;
only log the generic warning for non-circuit-breaker failures before resolving
and invoking the fallback handler (apply the same change pattern to the
corresponding functions in request.py and streaming.py).
In `@src/cqrs/requests/fallback.py`:
- Around line 8-42: The RequestHandlerFallback dataclass allows mixing
RequestHandler and StreamingRequestHandler via RequestHandlerT, so add a
validation in RequestHandlerFallback.__post_init__ that ensures primary and
fallback are both subclasses of the same base type (e.g., both subclass
RequestHandler or both subclass StreamingRequestHandler) and raise a clear
TypeError if they differ; implement this check using issubclass on the primary
and fallback class attributes and ensure it handles None/invalid inputs safely.
In `@tests/unit/test_event_fallback.py`:
- Around line 74-109: Add a new unit test to cover the case where the
failure_exceptions filter matches and therefore the fallback is invoked: create
a test (e.g., test_event_fallback_matching_filter_triggers_fallback) that binds
SampleEvent to EventHandlerFallback(PrimaryEventHandler, FallbackEventHandler,
failure_exceptions=(RuntimeError,)), instantiate _TestEventContainer and
EventEmitter, call await emitter.emit(SampleEvent(id="e1")), then assert
container._primary.called and container._fallback.called and that follow_ups ==
[] (or the expected follow-ups) to verify the fallback path is exercised when
the exception type matches.
In `@tests/unit/test_request_fallback.py`:
- Around line 55-75: The test-only class _TestRequestContainer currently
implements the Container protocol structurally which can be fragile for type
checkers; explicitly declare it to implement the protocol by inheriting from the
Container protocol (e.g., change the class definition to inherit from Container)
and add the necessary import, and ensure the external_container property and
attach_external_container method signatures match the Container protocol types;
update _TestRequestContainer, external_container, attach_external_container, and
resolve to use the Container/typing imports so a static type checker will
validate the implementation.
- Around line 78-92: Add a complementary unit test that verifies the happy path
where the primary handler succeeds and the fallback is not invoked: create a
test similar to test_request_fallback_no_cb_primary_fails_uses_fallback but
dispatch a SimpleCommand that the PrimaryHandler handles successfully, bind the
command with RequestHandlerFallback(PrimaryHandler, FallbackHandler) on a
RequestMap, use _TestRequestContainer and RequestDispatcher, then assert the
returned result comes from the primary (e.g., primary response value) and that
container._primary.called is True while container._fallback.called is False;
this ensures RequestDispatcher, RequestHandlerFallback, PrimaryHandler,
FallbackHandler, RequestMap, and _TestRequestContainer cover the success
scenario.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
src/cqrs/dispatcher/streaming.py (1)
56-64: Return type annotation is imprecise —AsyncGeneratoris more accurate thanAsyncIterator.
_stream_from_handlercontains ayield, making it an async generator function whose return type isAsyncGenerator[RequestDispatchResult, None].AsyncIteratoris a valid supertype so it compiles, but the more precise annotation helps callers and type checkers. The same applies to_dispatch_implat line 69.✏️ Proposed fix
`@staticmethod` async def _stream_from_handler( request: IRequest, handler: StreamingRequestHandler, -) -> typing.AsyncIterator[RequestDispatchResult]: +) -> typing.AsyncGenerator[RequestDispatchResult, None]:async def _dispatch_impl( self, request: IRequest, -) -> typing.AsyncIterator[RequestDispatchResult]: +) -> typing.AsyncGenerator[RequestDispatchResult, None]:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/dispatcher/streaming.py` around lines 56 - 64, Update the async generator return annotations to be precise: change the return type of _stream_from_handler and _dispatch_impl from typing.AsyncIterator[...] to typing.AsyncGenerator[RequestDispatchResult, None] (or the equivalent generic using the concrete RequestDispatchResult) and add/import typing.AsyncGenerator if not already present; ensure the annotations reference RequestDispatchResult so type checkers treat these as async generators rather than plain async iterators.src/cqrs/dispatcher/request.py (1)
100-117: Consider extracting the fallback-log-selection block into a shared helper.The block that picks between two
logger.warningmessages (CB-open vs. primary-failed) is copy-pasted identically inrequest.py(lines 100–117),streaming.py(lines 109–126), andevent_emitter.py(lines 139–156). A small helper incqrs/circuit_breaker.py(or a shared_log_fallback_switchincqrs/dispatcher/) would eliminate the duplication and centralise the message format.♻️ Suggested helper (example)
# e.g. in cqrs/circuit_breaker.py or a shared module def log_fallback_switch( logger: logging.Logger, primary_cls: type, fallback_cls: type, primary_error: Exception, circuit_breaker: "ICircuitBreaker | None", handler_kind: str = "handler", ) -> None: if circuit_breaker is not None and circuit_breaker.is_circuit_breaker_error(primary_error): logger.warning( "Circuit breaker open for %s %s, switching to fallback %s", handler_kind, primary_cls.__name__, fallback_cls.__name__, ) else: logger.warning( "Primary %s %s failed: %s. Switching to fallback %s.", handler_kind, primary_cls.__name__, primary_error, fallback_cls.__name__, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/dispatcher/request.py` around lines 100 - 117, Extract the duplicated fallback logging into a shared helper (e.g., log_fallback_switch) and replace the copy-pasted blocks in request.py, streaming.py, and event_emitter.py with calls to that helper; the helper should accept the Logger instance, primary class (fallback_config.primary), fallback class (fallback_config.fallback), the primary_error, the circuit breaker instance (fallback_config.circuit_breaker) and an optional handler_kind string, call circuit_breaker.is_circuit_breaker_error(primary_error) to decide which warning message to emit, and format messages exactly as the existing logger.warning calls to preserve behavior and context.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cqrs/dispatcher/streaming.py`:
- Around line 86-133: The streaming dispatcher currently calls primary/fallback
streaming handlers via _stream_from_handler and never invokes
handler_type.circuit_breaker.call, so streaming failures don't update or trip
the circuit breaker; to fix, add an explicit guard in
StreamingRequestDispatcher._dispatch_impl (when isinstance(handler_type,
RequestHandlerFallback) and handlers are async generators) to detect a non-None
handler_type.circuit_breaker and either raise a ValueError or log/warn that
circuit breakers are unsupported for streaming fallback handlers (and recommend
removing the circuit_breaker or using non-streaming handlers); reference
RequestHandlerFallback, StreamingRequestHandler, _stream_from_handler, and
_dispatch_impl to locate and implement this validation.
---
Duplicate comments:
In `@src/cqrs/events/event_emitter.py`:
- Around line 115-160: Duplicate fallback-log-selection logic appears in
_handle_with_fallback and two other dispatchers; extract it into a shared helper
(e.g., log_fallback_selection or choose_and_log_fallback) and call that helper
from each site. The helper should accept the primary handler identifier
(fallback_config.primary), fallback identifier (fallback_config.fallback), the
caught exception (primary_error) and the circuit_breaker object
(fallback_config.circuit_breaker), determine whether the error is a
circuit-breaker error using
circuit_breaker.is_circuit_breaker_error(primary_error) when present, and emit
the same warning messages currently in lines 139–156; then replace the
duplicated warning blocks in _handle_with_fallback and the two request.py
locations with calls to this new helper.
---
Nitpick comments:
In `@src/cqrs/dispatcher/request.py`:
- Around line 100-117: Extract the duplicated fallback logging into a shared
helper (e.g., log_fallback_switch) and replace the copy-pasted blocks in
request.py, streaming.py, and event_emitter.py with calls to that helper; the
helper should accept the Logger instance, primary class
(fallback_config.primary), fallback class (fallback_config.fallback), the
primary_error, the circuit breaker instance (fallback_config.circuit_breaker)
and an optional handler_kind string, call
circuit_breaker.is_circuit_breaker_error(primary_error) to decide which warning
message to emit, and format messages exactly as the existing logger.warning
calls to preserve behavior and context.
In `@src/cqrs/dispatcher/streaming.py`:
- Around line 56-64: Update the async generator return annotations to be
precise: change the return type of _stream_from_handler and _dispatch_impl from
typing.AsyncIterator[...] to typing.AsyncGenerator[RequestDispatchResult, None]
(or the equivalent generic using the concrete RequestDispatchResult) and
add/import typing.AsyncGenerator if not already present; ensure the annotations
reference RequestDispatchResult so type checkers treat these as async generators
rather than plain async iterators.
There was a problem hiding this comment.
🧹 Nitpick comments (2)
src/cqrs/requests/fallback.py (1)
52-63: Missing explicitRequestHandler/StreamingRequestHandlersubclass check — misleading error for non-handler classes.Unlike
EventHandlerFallback(which explicitly verifiesissubclass(…, EventHandler)on Lines 56–63 ofsrc/cqrs/events/fallback.py),RequestHandlerFallbackonly checks streaming parity viaissubclass(…, StreamingRequestHandler). If a user passes a completely unrelated class (e.g. a plainobjectsubclass), the streaming check passes (bothFalse), generic-arg extraction returnsNone, and the user gets"must be parameterized with concrete types"— which is confusing when the real problem is that the class isn't a handler at all.Proposed fix
if not isinstance(self.primary, type) or not isinstance(self.fallback, type): raise TypeError( "RequestHandlerFallback primary and fallback must be handler classes", ) + if not (issubclass(self.primary, (RequestHandler, StreamingRequestHandler))): + raise TypeError( + f"RequestHandlerFallback primary ({self.primary.__name__}) " + "must be a subclass of RequestHandler or StreamingRequestHandler", + ) + if not (issubclass(self.fallback, (RequestHandler, StreamingRequestHandler))): + raise TypeError( + f"RequestHandlerFallback fallback ({self.fallback.__name__}) " + "must be a subclass of RequestHandler or StreamingRequestHandler", + ) primary_streaming = issubclass(self.primary, StreamingRequestHandler)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/requests/fallback.py` around lines 52 - 63, The __post_init__ of RequestHandlerFallback should explicitly verify that primary and fallback are valid handler subclasses: check that each is a subclass of RequestHandler when not streaming or a subclass of StreamingRequestHandler when streaming; if not, raise a clear TypeError (e.g. "primary and fallback must be RequestHandler subclasses" or "must be StreamingRequestHandler subclasses"). Concretely, inside RequestHandlerFallback.__post_init__, after computing primary_streaming and fallback_streaming, add explicit issubclass checks against RequestHandler or StreamingRequestHandler as appropriate and raise a descriptive TypeError if any class fails the handler-type check, instead of relying only on streaming parity.src/cqrs/dispatcher/streaming.py (1)
84-93: Both handlers are eagerly resolved even when primary may succeed.
fallback_handleris resolved from the container (Line 86) before the primary handler is even attempted. If handler construction is expensive (e.g. allocates DB connections), this is wasteful in the success path.Consider deferring fallback resolution into the
exceptblock:Proposed change
if isinstance(handler_type, RequestHandlerFallback): primary = await self._container.resolve(handler_type.primary) - fallback_handler = await self._container.resolve(handler_type.fallback) - if not inspect.isasyncgenfunction(primary.handle) or not inspect.isasyncgenfunction( - fallback_handler.handle, - ): + if not inspect.isasyncgenfunction(primary.handle): raise TypeError( "RequestHandlerFallback with StreamingRequestDispatcher requires " "both primary and fallback to be async generator handlers", ) try: async for result in self._stream_from_handler( request, typing.cast(StreamingRequestHandler, primary), ): yield result except Exception as primary_error: should_fallback = should_use_fallback( primary_error, handler_type.circuit_breaker, handler_type.failure_exceptions, ) if should_fallback: + fallback_handler = await self._container.resolve(handler_type.fallback) + if not inspect.isasyncgenfunction(fallback_handler.handle): + raise TypeError( + "RequestHandlerFallback with StreamingRequestDispatcher requires " + "both primary and fallback to be async generator handlers", + )That said, eager resolution has the upside of failing fast on misconfiguration. This is a trade-off — mentioning for awareness.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/dispatcher/streaming.py` around lines 84 - 93, Currently the code eagerly resolves both primary and fallback handlers by calling self._container.resolve on handler_type.primary and handler_type.fallback; change it to resolve only the primary first (await self._container.resolve(handler_type.primary)), check that primary.handle is an async generator (using inspect.isasyncgenfunction), then attempt the primary and only resolve the fallback (await self._container.resolve(handler_type.fallback)) inside the exception/except path if primary fails or when you need to fall back; keep the same TypeError checks for async generator handlers (inspect.isasyncgenfunction on primary.handle and fallback.handle) but perform the fallback.handle check after resolving the fallback so you avoid expensive eager construction while preserving validation when the fallback is actually used (reference RequestHandlerFallback, primary, fallback_handler, self._container.resolve, and inspect.isasyncgenfunction).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/cqrs/dispatcher/streaming.py`:
- Around line 94-131: The streaming path currently calls
_stream_from_handler(request, primary) directly so the circuit breaker never
records streaming failures; change the primary stream invocation to go through
handler_type.circuit_breaker.call(...) when a circuit breaker exists so failures
update CB state and CircuitBreakerError can be detected by
handler_type.circuit_breaker.is_circuit_breaker_error(primary_error).
Concretely: when handler_type.circuit_breaker is not None, invoke
handler_type.circuit_breaker.call(lambda: self._stream_from_handler(request,
typing.cast(StreamingRequestHandler, primary))) (or wrap _stream_from_handler in
an async function that returns/iterates the async generator) and then async for
over the result of that call; keep the fallback path the same and retain the
existing should_use_fallback and is_circuit_breaker_error checks.
---
Nitpick comments:
In `@src/cqrs/dispatcher/streaming.py`:
- Around line 84-93: Currently the code eagerly resolves both primary and
fallback handlers by calling self._container.resolve on handler_type.primary and
handler_type.fallback; change it to resolve only the primary first (await
self._container.resolve(handler_type.primary)), check that primary.handle is an
async generator (using inspect.isasyncgenfunction), then attempt the primary and
only resolve the fallback (await self._container.resolve(handler_type.fallback))
inside the exception/except path if primary fails or when you need to fall back;
keep the same TypeError checks for async generator handlers
(inspect.isasyncgenfunction on primary.handle and fallback.handle) but perform
the fallback.handle check after resolving the fallback so you avoid expensive
eager construction while preserving validation when the fallback is actually
used (reference RequestHandlerFallback, primary, fallback_handler,
self._container.resolve, and inspect.isasyncgenfunction).
In `@src/cqrs/requests/fallback.py`:
- Around line 52-63: The __post_init__ of RequestHandlerFallback should
explicitly verify that primary and fallback are valid handler subclasses: check
that each is a subclass of RequestHandler when not streaming or a subclass of
StreamingRequestHandler when streaming; if not, raise a clear TypeError (e.g.
"primary and fallback must be RequestHandler subclasses" or "must be
StreamingRequestHandler subclasses"). Concretely, inside
RequestHandlerFallback.__post_init__, after computing primary_streaming and
fallback_streaming, add explicit issubclass checks against RequestHandler or
StreamingRequestHandler as appropriate and raise a descriptive TypeError if any
class fails the handler-type check, instead of relying only on streaming parity.
✨ Fallbacks for requests and events handling
This release adds fallback support for request handlers and event handlers, with optional circuit breaker integration. You can now define primary and fallback handlers for commands, queries, streaming handlers, and domain events, and optionally protect them with a shared circuit breaker protocol.
🆕 What's new
📨 Request handler fallbacks
RequestHandlerFallback— wrap a primary and fallback request handler so that when the primary fails (or the circuit is open), the fallback is used.RequestHandler[Req, Res]and streamingStreamingRequestHandler[Req, Res].failure_exceptionsto trigger fallback only for specific exception types (e.g.ConnectionError,TimeoutError).Example:
📢 Event handler fallbacks
EventHandlerFallback— wrap a primary and fallback event handler so that when the primary fails (or the circuit is open), the fallback is used.failure_exceptionsand optional circuit breaker.EventMap).Example:
🔌 Unified circuit breaker protocol
ICircuitBreaker– a single protocol used by:AioBreakerAdapternow implementsICircuitBreaker(and stillISagaStepCircuitBreakerfor backward compatibility).📡 Streaming handlers
RequestHandlerFallbackis supported for streaming handlers: when the primary streaming handler fails, the fallback stream is used from the start (no resume from the same stream).🛠️ Helpers
get_generic_args_for_origin()incqrs.generic_utils– used to validate that primary and fallback handlers handle the same request/event and (for requests) the same response type.should_use_fallback()incqrs.circuit_breaker– central logic to decide whether to run the fallback after a primary failure (circuit open or exception infailure_exceptions, or any exception iffailure_exceptionsis empty).📚 Documentation and examples
examples/request_fallback.py– request handler fallback with optional circuit breakerexamples/cor_request_fallback.py– CoR (chain of responsibility) with fallbackexamples/event_fallback.py– event handler fallbackexamples/streaming_handler_fallback.py– streaming request handler fallbacktest_request_fallback.py,test_event_fallback.py; integration tests for the circuit breaker adapter updated.📋 API summary
cqrs.RequestHandlerFallbackfailure_exceptions, optionalcircuit_breaker).cqrs.EventHandlerFallbackcqrs.ICircuitBreakercall(identifier, func, *args, **kwargs)andis_circuit_breaker_error(exc).cqrs.circuit_breaker.should_use_fallbackcqrs.generic_utils.get_generic_args_for_origin✅ Compatibility
AioBreakerAdapterusage continue to work.AioBreakerAdapternow implementsICircuitBreakerin addition toISagaStepCircuitBreaker.aiobreakerfor circuit breaker support (pip install python-cqrs[aiobreaker]).