diff --git a/README.md b/README.md
index 5863766..67d71d2 100644
--- a/README.md
+++ b/README.md
@@ -1,13 +1,14 @@
-

-
Python CQRS
-
Event-Driven Architecture Framework for Distributed Systems
+
Event-Driven Architecture Framework for Distributed Systems
+
+ Python 3.10+ · Full documentation: mkdocs.python-cqrs.dev
+
@@ -41,6 +42,29 @@
>
> Starting with version 5.0.0, Pydantic support will become optional. The default implementations of `Request`, `Response`, `DomainEvent`, and `NotificationEvent` will be migrated to dataclasses-based implementations.
+## Table of Contents
+
+- [Overview](#overview)
+- [Installation](#installation)
+- [Quick Start](#quick-start)
+- [Request and Response Types](#request-and-response-types)
+- [Request Handlers](#request-handlers)
+- [Mapping](#mapping)
+- [DI container](#di-container)
+- [Bootstrap](#bootstrap)
+- [Saga Pattern](#saga-pattern)
+- [Producing Notification Events](#producing-notification-events)
+- [Kafka broker](#kafka-broker)
+- [Transactional Outbox](#transactional-outbox)
+- [Producing Events from Outbox to Kafka](#producing-events-from-outbox-to-kafka)
+- [Transaction log tailing](#transaction-log-tailing)
+- [Event Handlers](#event-handlers)
+- [Integration with presentation layers](#integration-with-presentation-layers)
+- [Protobuf messaging](#protobuf-messaging)
+- [Contributing](#contributing)
+- [Changelog](#changelog)
+- [License](#license)
+
## Overview
An event-driven framework for building distributed systems in Python. It centers on CQRS (Command Query Responsibility Segregation) and extends into messaging, sagas, and reliable event delivery — so you can separate read and write flows, react to events from the bus, run distributed transactions with compensation, and publish events via Transaction Outbox. The result is clearer structure, better scalability, and easier evolution of the application.
@@ -67,6 +91,104 @@ project ([documentation](https://akhundmurad.github.io/diator/)) with several en
- **Documentation:** Built-in Mermaid diagram generation (Sequence and Class diagrams).
- **Protobuf:** Interface-level support for converting Notification events to Protobuf and back.
+## Installation
+
+**Python 3.10+** is required.
+
+```bash
+pip install python-cqrs
+```
+
+Optional dependencies (see [pyproject.toml](https://github.com/vadikko2/python-cqrs/blob/master/pyproject.toml) for full list):
+
+```bash
+pip install python-cqrs[kafka] # Kafka broker (aiokafka)
+pip install python-cqrs[examples] # FastAPI, FastStream, uvicorn, etc.
+pip install python-cqrs[aiobreaker] # Circuit breaker for saga fallbacks
+```
+
+## Quick Start
+
+Define a command, a handler, bind them, and run via the mediator:
+
+```python
+import di
+import cqrs
+from cqrs.requests import bootstrap
+
+class CreateOrderCommand(cqrs.Request):
+ order_id: str
+ amount: float
+
+class CreateOrderHandler(cqrs.RequestHandler[CreateOrderCommand, None]):
+ async def handle(self, request: CreateOrderCommand) -> None:
+ print(f"Order {request.order_id}, amount {request.amount}")
+
+def commands_mapper(mapper: cqrs.RequestMap) -> None:
+ mapper.bind(CreateOrderCommand, CreateOrderHandler)
+
+container = di.Container()
+mediator = bootstrap.bootstrap(di_container=container, commands_mapper=commands_mapper)
+await mediator.send(CreateOrderCommand(order_id="ord-1", amount=99.99))
+```
+
+For full setup with DI, events, and outbox, see the [documentation](https://mkdocs.python-cqrs.dev/) and the [examples](https://github.com/vadikko2/python-cqrs/tree/master/examples) directory.
+
+## Request and Response Types
+
+The library supports both Pydantic-based (`PydanticRequest`/`PydanticResponse`, aliased as `Request`/`Response`) and Dataclass-based (`DCRequest`/`DCResponse`) implementations. You can also implement custom classes by implementing the `IRequest`/`IResponse` interfaces directly.
+
+```python
+import dataclasses
+
+# Pydantic-based (default)
+class CreateUserCommand(cqrs.Request):
+ username: str
+ email: str
+
+class UserResponse(cqrs.Response):
+ user_id: str
+ username: str
+
+# Dataclass-based
+@dataclasses.dataclass
+class CreateProductCommand(cqrs.DCRequest):
+ name: str
+ price: float
+
+@dataclasses.dataclass
+class ProductResponse(cqrs.DCResponse):
+ product_id: str
+ name: str
+
+# Custom implementation
+class CustomRequest(cqrs.IRequest):
+ def __init__(self, user_id: str, action: str):
+ self.user_id = user_id
+ self.action = action
+
+ def to_dict(self) -> dict:
+ return {"user_id": self.user_id, "action": self.action}
+
+ @classmethod
+ def from_dict(cls, **kwargs) -> "CustomRequest":
+ return cls(user_id=kwargs["user_id"], action=kwargs["action"])
+
+class CustomResponse(cqrs.IResponse):
+ def __init__(self, result: str, status: int):
+ self.result = result
+ self.status = status
+
+ def to_dict(self) -> dict:
+ return {"result": self.result, "status": self.status}
+
+ @classmethod
+ def from_dict(cls, **kwargs) -> "CustomResponse":
+ return cls(result=kwargs["result"], status=kwargs["status"])
+```
+
+A complete example can be found in [request_response_types.py](https://github.com/vadikko2/python-cqrs/blob/master/examples/request_response_types.py)
+
## Request Handlers
Request handlers can be divided into two main types:
@@ -98,7 +220,7 @@ class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
```
A complete example can be found in
-the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/request_handler.py)
+the [documentation](https://github.com/vadikko2/python-cqrs/blob/master/examples/request_handler.py)
### Query handler
@@ -128,7 +250,7 @@ class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryR
```
A complete example can be found in
-the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/request_handler.py)
+the [documentation](https://github.com/vadikko2/python-cqrs/blob/master/examples/request_handler.py)
### Streaming Request Handler
@@ -164,7 +286,7 @@ class ProcessFilesCommandHandler(StreamingRequestHandler[ProcessFilesCommand, Fi
```
A complete example can be found in
-the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/streaming_handler_parallel_events.py)
+the [documentation](https://github.com/vadikko2/python-cqrs/blob/master/examples/streaming_handler_parallel_events.py)
### Chain of Responsibility Request Handler
@@ -227,7 +349,7 @@ def payment_mapper(mapper: cqrs.RequestMap) -> None:
```
A complete example can be found in
-the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/cor_request_handler.py)
+the [documentation](https://github.com/vadikko2/python-cqrs/blob/master/examples/cor_request_handler.py)
#### Mermaid Diagram Generation
@@ -247,86 +369,126 @@ sequence_diagram = generator.sequence()
class_diagram = generator.class_diagram()
```
-Complete example: [CoR Mermaid Diagrams](https://github.com/vadikko2/cqrs/blob/master/examples/cor_mermaid.py)
+Complete example: [CoR Mermaid Diagrams](https://github.com/vadikko2/python-cqrs/blob/master/examples/cor_mermaid.py)
-## Request and Response Types
+## Mapping
-The library supports both Pydantic-based (`PydanticRequest`/`PydanticResponse`, aliased as `Request`/`Response`) and Dataclass-based (`DCRequest`/`DCResponse`) implementations. You can also implement custom classes by implementing the `IRequest`/`IResponse` interfaces directly.
+To bind commands, queries and events with specific handlers, you can use the registries `EventMap`, `RequestMap`, and `SagaMap`.
+
+**Commands, queries and events:**
```python
-import dataclasses
+from cqrs import requests, events
-# Pydantic-based (default)
-class CreateUserCommand(cqrs.Request):
- username: str
- email: str
+from app import commands, command_handlers
+from app import queries, query_handlers
+from app import events as event_models, event_handlers
-class UserResponse(cqrs.Response):
- user_id: str
- username: str
-# Dataclass-based
-@dataclasses.dataclass
-class CreateProductCommand(cqrs.DCRequest):
- name: str
- price: float
+def init_commands(mapper: requests.RequestMap) -> None:
+ mapper.bind(commands.JoinMeetingCommand, command_handlers.JoinMeetingCommandHandler)
-@dataclasses.dataclass
-class ProductResponse(cqrs.DCResponse):
- product_id: str
- name: str
+def init_queries(mapper: requests.RequestMap) -> None:
+ mapper.bind(queries.ReadMeetingQuery, query_handlers.ReadMeetingQueryHandler)
-# Custom implementation
-class CustomRequest(cqrs.IRequest):
- def __init__(self, user_id: str, action: str):
- self.user_id = user_id
- self.action = action
+def init_events(mapper: events.EventMap) -> None:
+ mapper.bind(events.NotificationEvent[event_models.NotificationMeetingRoomClosed], event_handlers.MeetingRoomClosedNotificationHandler)
+ mapper.bind(events.NotificationEvent[event_models.ECSTMeetingRoomClosed], event_handlers.UpdateMeetingRoomReadModelHandler)
+```
- def to_dict(self) -> dict:
- return {"user_id": self.user_id, "action": self.action}
+**Chain of Responsibility** — bind a list of handlers (the first one that can handle the request processes it, otherwise the request is passed to the next):
- @classmethod
- def from_dict(cls, **kwargs) -> "CustomRequest":
- return cls(user_id=kwargs["user_id"], action=kwargs["action"])
+```python
+def payment_mapper(mapper: cqrs.RequestMap) -> None:
+ mapper.bind(
+ ProcessPaymentCommand,
+ [
+ CreditCardPaymentHandler,
+ PayPalPaymentHandler,
+ DefaultPaymentHandler, # Fallback
+ ],
+ )
+```
-class CustomResponse(cqrs.IResponse):
- def __init__(self, result: str, status: int):
- self.result = result
- self.status = status
+**Streaming handler** — bind a command to a `StreamingRequestHandler` (results are yielded as they become available):
- def to_dict(self) -> dict:
- return {"result": self.result, "status": self.status}
+```python
+def commands_mapper(mapper: cqrs.RequestMap) -> None:
+ mapper.bind(ProcessOrdersCommand, ProcessOrdersCommandHandler) # StreamingRequestHandler
+```
- @classmethod
- def from_dict(cls, **kwargs) -> "CustomResponse":
- return cls(result=kwargs["result"], status=kwargs["status"])
+**Saga (including with fallback)** — bind the saga context type to the saga class in `SagaMap`:
+
+```python
+def saga_mapper(mapper: cqrs.SagaMap) -> None:
+ mapper.bind(OrderContext, OrderSaga)
+ mapper.bind(OrderContext, OrderSagaWithFallback)
```
-A complete example can be found in [request_response_types.py](https://github.com/vadikko2/cqrs/blob/master/examples/request_response_types.py)
+## DI container
-## Mapping
+Use the following example to set up dependency injection in your command, query and event handlers. This will make
+dependency management simpler.
+
+The package supports two DI container libraries:
-To bind commands, queries and events with specific handlers, you can use the registries `EventMap` and `RequestMap`.
+### di library
```python
-from cqrs import requests, events
+import di
+...
-from app import commands, command_handlers
-from app import queries, query_handlers
-from app import events as event_models, event_handlers
+def setup_di() -> di.Container:
+ """
+ Binds implementations to dependencies
+ """
+ container = di.Container()
+ container.bind(
+ di.bind_by_type(
+ dependent.Dependent(cqrs.SqlAlchemyOutboxedEventRepository, scope="request"),
+ cqrs.OutboxedEventRepository
+ )
+ )
+ container.bind(
+ di.bind_by_type(
+ dependent.Dependent(MeetingAPIImplementaion, scope="request"),
+ MeetingAPIProtocol
+ )
+ )
+ return container
+```
+A complete example can be found in
+the [documentation](https://github.com/vadikko2/python-cqrs/blob/master/examples/dependency_injection.py)
-def init_commands(mapper: requests.RequestMap) -> None:
- mapper.bind(commands.JoinMeetingCommand, command_handlers.JoinMeetingCommandHandler)
+### dependency-injector library
-def init_queries(mapper: requests.RequestMap) -> None:
- mapper.bind(queries.ReadMeetingQuery, query_handlers.ReadMeetingQueryHandler)
+The package also supports [dependency-injector](https://github.com/ets-labs/python-dependency-injector) library.
+You can use `DependencyInjectorCQRSContainer` adapter to integrate dependency-injector containers with python-cqrs.
-def init_events(mapper: events.EventMap) -> None:
- mapper.bind(events.NotificationEvent[event_models.NotificationMeetingRoomClosed], event_handlers.MeetingRoomClosedNotificationHandler)
- mapper.bind(events.NotificationEvent[event_models.ECSTMeetingRoomClosed], event_handlers.UpdateMeetingRoomReadModelHandler)
+```python
+from dependency_injector import containers, providers
+from cqrs.container.dependency_injector import DependencyInjectorCQRSContainer
+
+class ApplicationContainer(containers.DeclarativeContainer):
+ # Define your providers
+ service = providers.Factory(ServiceImplementation)
+
+# Create CQRS container adapter
+cqrs_container = DependencyInjectorCQRSContainer(ApplicationContainer())
+
+# Use with bootstrap
+mediator = bootstrap.bootstrap(
+ di_container=cqrs_container,
+ commands_mapper=commands_mapper,
+ ...
+)
```
+Complete examples can be found in:
+- [Simple example](https://github.com/vadikko2/python-cqrs/blob/master/examples/dependency_injector_integration_simple_example.py)
+- [Practical example with FastAPI](https://github.com/vadikko2/python-cqrs/blob/master/examples/dependency_injector_integration_practical_example.py)
+
## Bootstrap
The `python-cqrs` package implements a set of bootstrap utilities designed to simplify the initial configuration of an
@@ -359,6 +521,16 @@ def event_mediator_factory():
events_mapper=mapping.init_events,
on_startup=[orm.init_store_event_mapper],
)
+
+
+@functools.lru_cache
+def saga_mediator_factory():
+ return saga_bootstrap.bootstrap(
+ di_container=dependencies.setup_di(),
+ sagas_mapper=mapping.init_sagas,
+ domain_events_mapper=mapping.init_events,
+ saga_storage=MemorySagaStorage(),
+ )
```
## Saga Pattern
@@ -512,68 +684,7 @@ sequence_diagram = generator.sequence()
class_diagram = generator.class_diagram()
```
-Complete example: [Saga Mermaid Diagrams](https://github.com/vadikko2/cqrs/blob/master/examples/saga_mermaid.py)
-
-## Event Handlers
-
-Event handlers are designed to process `Notification` and `ECST` events that are consumed from the broker.
-To configure event handling, you need to implement a broker consumer on the side of your application.
-Below is an example of `Kafka event consuming` that can be used in the Presentation Layer.
-
-```python
-class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
- def __init__(self):
- self._events = []
-
- @property
- def events(self):
- return self._events
-
- async def handle(self, request: JoinMeetingCommand) -> None:
- STORAGE[request.meeting_id].append(request.user_id)
- self._events.append(
- UserJoined(user_id=request.user_id, meeting_id=request.meeting_id),
- )
- print(f"User {request.user_id} joined meeting {request.meeting_id}")
-
-
-class UserJoinedEventHandler(cqrs.EventHandler[UserJoined]):
- async def handle(self, event: UserJoined) -> None:
- print(f"Handle user {event.user_id} joined meeting {event.meeting_id} event")
-```
-
-A complete example can be found in
-the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/domain_event_handler.py)
-
-### Parallel Event Processing
-
-Both `RequestMediator` and `StreamingRequestMediator` support parallel processing of domain events. You can control
-the number of event handlers that run simultaneously using the `max_concurrent_event_handlers` parameter.
-
-This feature is especially useful when:
-- Multiple event handlers need to process events independently
-- You want to improve performance by processing events concurrently
-- You need to limit resource consumption by controlling concurrency
-
-**Configuration:**
-
-```python
-from cqrs.requests import bootstrap
-
-mediator = bootstrap.bootstrap_streaming(
- di_container=container,
- commands_mapper=commands_mapper,
- domain_events_mapper=domain_events_mapper,
- message_broker=broker,
- max_concurrent_event_handlers=3, # Process up to 3 events in parallel
- concurrent_event_handle_enable=True, # Enable parallel processing
-)
-```
-
-> [!TIP]
-> - Set `max_concurrent_event_handlers` to limit the number of simultaneously running event handlers
-> - Set `concurrent_event_handle_enable=False` to disable parallel processing and process events sequentially
-> - The default value for `max_concurrent_event_handlers` is `10` for `StreamingRequestMediator` and `1` for `RequestMediator`
+Complete example: [Saga Mermaid Diagrams](https://github.com/vadikko2/python-cqrs/blob/master/examples/saga_mermaid.py)
## Producing Notification Events
@@ -613,7 +724,7 @@ class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
```
A complete example can be found in
-the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/event_producing.py)
+the [documentation](https://github.com/vadikko2/python-cqrs/blob/master/examples/event_producing.py)
After processing the command/request, if there are any Notification/ECST events,
the EventEmitter is invoked to produce the events via the message broker.
@@ -648,13 +759,6 @@ The package implements the [Transactional Outbox](https://microservices.io/patte
pattern, which ensures that messages are produced to the broker according to the at-least-once semantics.
```python
-def do_some_logic(meeting_room_id: int, session: sql_session.AsyncSession):
- """
- Make changes to the database
- """
- session.add(...)
-
-
class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
def __init__(self, outbox: cqrs.OutboxedEventRepository):
self.outbox = outbox
@@ -665,35 +769,33 @@ class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
async def handle(self, request: JoinMeetingCommand) -> None:
print(f"User {request.user_id} joined meeting {request.meeting_id}")
- async with self.outbox as session:
- do_some_logic(request.meeting_id, session) # business logic
- self.outbox.add(
- session,
- cqrs.NotificationEvent[UserJoinedNotificationPayload](
- event_name="UserJoined",
- topic="user_notification_events",
- payload=UserJoinedNotificationPayload(
- user_id=request.user_id,
- meeting_id=request.meeting_id,
- ),
+ # Outbox repository is bound to a session (e.g. via DI request scope).
+ # add() takes only the event; commit() persists the outbox and your changes.
+ self.outbox.add(
+ cqrs.NotificationEvent[UserJoinedNotificationPayload](
+ event_name="UserJoined",
+ topic="user_notification_events",
+ payload=UserJoinedNotificationPayload(
+ user_id=request.user_id,
+ meeting_id=request.meeting_id,
),
- )
- self.outbox.add(
- session,
- cqrs.NotificationEvent[UserJoinedECSTPayload](
- event_name="UserJoined",
- topic="user_ecst_events",
- payload=UserJoinedECSTPayload(
- user_id=request.user_id,
- meeting_id=request.meeting_id,
- ),
+ ),
+ )
+ self.outbox.add(
+ cqrs.NotificationEvent[UserJoinedECSTPayload](
+ event_name="UserJoined",
+ topic="user_ecst_events",
+ payload=UserJoinedECSTPayload(
+ user_id=request.user_id,
+ meeting_id=request.meeting_id,
),
- )
- await self.outbox.commit(session)
+ ),
+ )
+ await self.outbox.commit()
```
A complete example can be found in
-the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/save_events_into_outbox.py)
+the [documentation](https://github.com/vadikko2/python-cqrs/blob/master/examples/save_events_into_outbox.py)
> [!TIP]
> You can specify the name of the Outbox table using the environment variable `OUTBOX_SQLA_TABLE`.
@@ -701,8 +803,8 @@ the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/save_e
> [!TIP]
> If you use the protobuf events you should specify `OutboxedEventRepository`
-> by [protobuf serialize](https://github.com/vadikko2/cqrs/blob/master/src/cqrs/serializers/protobuf.py). A complete example can be found in
-the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/save_proto_events_into_outbox.py)
+> by [protobuf serialize](https://github.com/vadikko2/python-cqrs/blob/master/src/cqrs/serializers/protobuf.py). A complete example can be found in
+the [documentation](https://github.com/vadikko2/python-cqrs/blob/master/examples/save_proto_events_into_outbox.py)
## Producing Events from Outbox to Kafka
@@ -728,89 +830,93 @@ broker = kafka.KafkaMessageBroker(
producer=kafka_adapters.kafka_producer_factory(dsn="localhost:9092"),
)
-producer = cqrs.EventProducer(broker, cqrs.SqlAlchemyOutboxedEventRepository(session_factory, zlib.ZlibCompressor()))
-
-
-async def periodically_task():
- async for messages in producer.event_batch_generator():
- for message in messages:
- await producer.send_message(message)
- await producer.repository.commit()
- await asyncio.sleep(10)
+# SqlAlchemyOutboxedEventRepository expects (session, compressor), not session_factory.
+async with session_factory() as session:
+ repository = cqrs.SqlAlchemyOutboxedEventRepository(session, zlib.ZlibCompressor())
+ producer = cqrs.EventProducer(broker, repository)
-
-loop = asyncio.get_event_loop()
-loop.run_until_complete(periodically_task())
+ async for messages in producer.event_batch_generator():
+ for message in messages:
+ await producer.send_message(message)
+ await producer.repository.commit()
+ await asyncio.sleep(10)
```
A complete example can be found in
-the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/kafka_outboxed_event_producing.py)
+the [documentation](https://github.com/vadikko2/python-cqrs/blob/master/examples/kafka_outboxed_event_producing.py)
-**Transaction log tailing.** If Outbox polling does not suit you, consider [Transaction Log Tailing](https://microservices.io/patterns/data/transaction-log-tailing.html). The package does not implement it; you can use [Debezium + Kafka Connect](https://debezium.io/documentation/reference/stable/architecture.html) to tail the Outbox and produce events to Kafka.
+## Transaction log tailing
-## DI container
+If the Outbox polling strategy does not suit your needs, I recommend exploring
+the [Transaction Log Tailing](https://microservices.io/patterns/data/transaction-log-tailing.html) pattern.
+The current version of the python-cqrs package does not support the implementation of this pattern.
-Use the following example to set up dependency injection in your command, query and event handlers. This will make
-dependency management simpler.
+> [!TIP]
+> However, it can be implemented
+> using [Debezium + Kafka Connect](https://debezium.io/documentation/reference/stable/architecture.html),
+> which allows you to produce all newly created events within the Outbox storage directly to the corresponding topic in
+> Kafka (or any other broker).
-The package supports two DI container libraries:
+## Event Handlers
-### di library
+Event handlers are designed to process `Notification` and `ECST` events that are consumed from the broker.
+To configure event handling, you need to implement a broker consumer on the side of your application.
+Below is an example of `Kafka event consuming` that can be used in the Presentation Layer.
```python
-import di
-...
+class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
+ def __init__(self):
+ self._events = []
-def setup_di() -> di.Container:
- """
- Binds implementations to dependencies
- """
- container = di.Container()
- container.bind(
- di.bind_by_type(
- dependent.Dependent(cqrs.SqlAlchemyOutboxedEventRepository, scope="request"),
- cqrs.OutboxedEventRepository
- )
- )
- container.bind(
- di.bind_by_type(
- dependent.Dependent(MeetingAPIImplementaion, scope="request"),
- MeetingAPIProtocol
+ @property
+ def events(self):
+ return self._events
+
+ async def handle(self, request: JoinMeetingCommand) -> None:
+ STORAGE[request.meeting_id].append(request.user_id)
+ self._events.append(
+ UserJoined(user_id=request.user_id, meeting_id=request.meeting_id),
)
- )
- return container
+ print(f"User {request.user_id} joined meeting {request.meeting_id}")
+
+
+class UserJoinedEventHandler(cqrs.EventHandler[UserJoined]):
+ async def handle(self, event: UserJoined) -> None:
+ print(f"Handle user {event.user_id} joined meeting {event.meeting_id} event")
```
A complete example can be found in
-the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/dependency_injection.py)
+the [documentation](https://github.com/vadikko2/python-cqrs/blob/master/examples/domain_event_handler.py)
-### dependency-injector library
+### Parallel Event Processing
-The package also supports [dependency-injector](https://github.com/ets-labs/python-dependency-injector) library.
-You can use `DependencyInjectorCQRSContainer` adapter to integrate dependency-injector containers with python-cqrs.
+Both `RequestMediator` and `StreamingRequestMediator` support parallel processing of domain events. You can control
+the number of event handlers that run simultaneously using the `max_concurrent_event_handlers` parameter.
-```python
-from dependency_injector import containers, providers
-from cqrs.container.dependency_injector import DependencyInjectorCQRSContainer
+This feature is especially useful when:
+- Multiple event handlers need to process events independently
+- You want to improve performance by processing events concurrently
+- You need to limit resource consumption by controlling concurrency
-class ApplicationContainer(containers.DeclarativeContainer):
- # Define your providers
- service = providers.Factory(ServiceImplementation)
+**Configuration:**
-# Create CQRS container adapter
-cqrs_container = DependencyInjectorCQRSContainer(ApplicationContainer())
+```python
+from cqrs.requests import bootstrap
-# Use with bootstrap
-mediator = bootstrap.bootstrap(
- di_container=cqrs_container,
+mediator = bootstrap.bootstrap_streaming(
+ di_container=container,
commands_mapper=commands_mapper,
- ...
+ domain_events_mapper=domain_events_mapper,
+ message_broker=broker,
+ max_concurrent_event_handlers=3, # Process up to 3 events in parallel
+ concurrent_event_handle_enable=True, # Enable parallel processing
)
```
-Complete examples can be found in:
-- [Simple example](https://github.com/vadikko2/cqrs/blob/master/examples/dependency_injector_integration_simple_example.py)
-- [Practical example with FastAPI](https://github.com/vadikko2/cqrs/blob/master/examples/dependency_injector_integration_practical_example.py)
+> [!TIP]
+> - Set `max_concurrent_event_handlers` to limit the number of simultaneously running event handlers
+> - Set `concurrent_event_handle_enable=False` to disable parallel processing and process events sequentially
+> - The default value for `max_concurrent_event_handlers` is `10` for `StreamingRequestMediator` and `1` for `RequestMediator`
## Integration with presentation layers
@@ -832,7 +938,7 @@ In this case you can use python-cqrs to route requests to the appropriate handle
import fastapi
import pydantic
-from app import dependecies, commands
+from app import dependencies, commands
router = fastapi.APIRouter(prefix="/meetings")
@@ -848,7 +954,7 @@ async def join_metting(
```
A complete example can be found in
-the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/fastapi_integration.py)
+the [documentation](https://github.com/vadikko2/python-cqrs/blob/master/examples/fastapi_integration.py)
### Kafka events consuming
@@ -944,9 +1050,71 @@ async def process_files_stream(
```
A complete example can be found in
-the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/fastapi_sse_streaming.py)
+the [documentation](https://github.com/vadikko2/python-cqrs/blob/master/examples/fastapi_sse_streaming.py)
## Protobuf messaging
The `python-cqrs` package supports integration with [protobuf](https://developers.google.com/protocol-buffers/).
-There is interface-level support for converting Notification events to Protobuf and back. Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.
+Notification events can be serialized to Protobuf and back: implement the `proto()` method (returns a protobuf message) and the class method `from_proto()` (creates an event instance from proto) on your event class.
+
+Example (assuming generated `user_joined_pb2` from your `.proto` with fields `event_id`, `event_timestamp`, `event_name`, `payload`):
+
+```python
+import uuid
+from datetime import datetime
+
+import cqrs
+from app.generated import user_joined_pb2 # generated from .proto
+
+
+class UserJoinedPayload(cqrs.Response):
+ user_id: str
+ meeting_id: str
+
+
+class UserJoinedNotificationEvent(cqrs.NotificationEvent[UserJoinedPayload]):
+ """Event with Protobuf serialization support."""
+
+ event_name: str = "UserJoined"
+
+ def proto(self):
+ msg = user_joined_pb2.UserJoinedNotification()
+ msg.event_id = str(self.event_id)
+ msg.event_timestamp = self.event_timestamp.isoformat()
+ msg.event_name = self.event_name
+ msg.payload.user_id = self.payload.user_id
+ msg.payload.meeting_id = self.payload.meeting_id
+ return msg
+
+ @classmethod
+ def from_proto(cls, proto_msg):
+ return cls(
+ event_id=uuid.UUID(proto_msg.event_id),
+ event_timestamp=datetime.fromisoformat(proto_msg.event_timestamp),
+ event_name=proto_msg.event_name,
+ topic="user_notification_events",
+ payload=UserJoinedPayload(
+ user_id=proto_msg.payload.user_id,
+ meeting_id=proto_msg.payload.meeting_id,
+ ),
+ )
+```
+
+## Contributing
+
+Contributions are welcome. To develop locally:
+
+1. Clone the repository and create a virtual environment.
+2. Install dev dependencies: `pip install -e ".[dev]"`.
+3. Run tests: `pytest`.
+4. Install pre-commit and run hooks: `pre-commit install && pre-commit run --all-files`.
+
+The project uses [ruff](https://docs.astral.sh/ruff/) for linting and [pyright](https://microsoft.github.io/pyright/) for type checking.
+
+## Changelog
+
+Release notes and migration guides are published on [GitHub Releases](https://github.com/vadikko2/python-cqrs/releases).
+
+## License
+
+This project is licensed under the MIT License — see the [LICENSE](LICENSE) file for details.