Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/workflows/codspeed.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ jobs:
echo "MySQL did not become ready in time"
exit 1

- name: Wait for PostgreSQL
run: |
for i in $(seq 1 30); do
if docker compose -f docker-compose-test.yml exec -T postgres_tests pg_isready -h localhost -U cqrs -q 2>/dev/null; then
echo "PostgreSQL is ready"
exit 0
fi
echo "Waiting for PostgreSQL... ($i/30)"
sleep 2
done
echo "PostgreSQL did not become ready in time"
exit 1

- name: Wait for Redis
run: |
for i in $(seq 1 15); do
Expand Down
18 changes: 18 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ jobs:
vermin --target=3.10- --violations --eval-annotations --backport typing_extensions --exclude=venv --exclude=build --exclude=.git --exclude=.venv src examples tests

test:
name: test (py ${{ matrix.python-version }})
runs-on: ubuntu-latest
strategy:
fail-fast: false
Expand Down Expand Up @@ -105,6 +106,19 @@ jobs:
echo "MySQL did not become ready in time"
exit 1

- name: Wait for PostgreSQL
run: |
for i in $(seq 1 30); do
if docker compose -f docker-compose-test.yml exec -T postgres_tests pg_isready -h localhost -U cqrs -q 2>/dev/null; then
echo "PostgreSQL is ready"
exit 0
fi
echo "Waiting for PostgreSQL... ($i/30)"
sleep 2
done
echo "PostgreSQL did not become ready in time"
exit 1

- name: Wait for Redis
run: |
for i in $(seq 1 15); do
Expand All @@ -119,6 +133,10 @@ jobs:
exit 1

- name: Run all tests with coverage
env:
DATABASE_DSN: mysql+asyncmy://cqrs:cqrs@localhost:3307/test_cqrs
DATABASE_DSN_MYSQL: mysql+asyncmy://cqrs:cqrs@localhost:3307/test_cqrs
DATABASE_DSN_POSTGRESQL: postgresql+asyncpg://cqrs:cqrs@localhost:5433/cqrs
run: |
pytest -c ./tests/pytest-config.ini --cov=src --cov-report=xml --cov-report=term -o cache_dir=/tmp/pytest_cache ./tests/unit ./tests/integration

Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repos:
- id: check-added-large-files
- args:
- --pytest-test-first
exclude: (^tests/mock/|^tests/integration/|^tests/fixtures)
exclude: (^tests/mock/|^tests/integration/|^tests/fixtures|conftest\.py$)
id: name-tests-test
- id: check-merge-conflict
- id: check-json
Expand Down
181 changes: 85 additions & 96 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,29 @@

## Overview

This is a package for implementing the CQRS (Command Query Responsibility Segregation) pattern in Python applications.
It provides a set of abstractions and utilities to help separate read and write use cases, ensuring better scalability,
performance, and maintainability of the application.
An event-driven framework for building distributed systems in Python. It centers on CQRS (Command Query Responsibility Segregation) and extends into messaging, sagas, and reliable event delivery — so you can separate read and write flows, react to events from the bus, run distributed transactions with compensation, and publish events via Transaction Outbox. The result is clearer structure, better scalability, and easier evolution of the application.

This package is a fork of the [diator](https://github.com/akhundMurad/diator)
project ([documentation](https://akhundmurad.github.io/diator/)) with several enhancements:

1. Support for Pydantic [v2.*](https://docs.pydantic.dev/2.8/);
2. `Kafka` support using [aiokafka](https://github.com/aio-libs/aiokafka);
3. Added `EventMediator` for handling `Notification` and `ECST` events coming from the bus;
4. Redesigned the event and request mapping mechanism to handlers;
5. Added `bootstrap` for easy setup;
6. Added support for [Transaction Outbox](https://microservices.io/patterns/data/transactional-outbox.html), ensuring
that `Notification` and `ECST` events are sent to the broker;
7. FastAPI supporting;
8. FastStream supporting;
9. [Protobuf](https://protobuf.dev/) events supporting;
10. `StreamingRequestMediator` and `StreamingRequestHandler` for handling streaming requests with real-time progress updates;
11. Parallel event processing with configurable concurrency limits;
12. Chain of Responsibility pattern support with `CORRequestHandler` for processing requests through multiple handlers in sequence;
13. Orchestrated Saga pattern support for managing distributed transactions with automatic compensation and recovery mechanisms;
14. Built-in Mermaid diagram generation, enabling automatic generation of Sequence and Class diagrams for documentation and visualization;
15. Flexible Request and Response types support - use Pydantic-based or Dataclass-based implementations, with the ability to mix and match types based on your needs.
project ([documentation](https://akhundmurad.github.io/diator/)) with several enhancements, ordered by importance:

**Core framework**

1. Redesigned the event and request mapping mechanism to handlers;
2. `EventMediator` for handling `Notification` and `ECST` events coming from the bus;
3. `bootstrap` for easy setup;
4. **Transaction Outbox**, ensuring that `Notification` and `ECST` events are sent to the broker;
5. **Orchestrated Saga** pattern for distributed transactions with automatic compensation and recovery;
6. `StreamingRequestMediator` and `StreamingRequestHandler` for streaming requests with real-time progress updates;
7. **Chain of Responsibility** with `CORRequestHandler` for processing requests through multiple handlers in sequence;
8. **Parallel event processing** with configurable concurrency limits.

**Also**

- **Typing:** Pydantic [v2.*](https://docs.pydantic.dev/2.8/) and `IRequest`/`IResponse` interfaces — use Pydantic-based, dataclass-based, or custom Request/Response implementations.
- **Broker:** Kafka via [aiokafka](https://github.com/aio-libs/aiokafka).
- **Integration:** Ready for integration with FastAPI and FastStream.
- **Documentation:** Built-in Mermaid diagram generation (Sequence and Class diagrams).
- **Protobuf:** Interface-level support for converting Notification events to Protobuf and back.

## Request Handlers

Expand All @@ -87,7 +87,7 @@ class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):

def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
self.events: list[Event] = []
self._events: list[Event] = []

@property
def events(self) -> typing.List[events.Event]:
Expand Down Expand Up @@ -115,7 +115,7 @@ class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryR

def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
self.events: list[Event] = []
self._events: list[Event] = []

@property
def events(self) -> typing.List[events.Event]:
Expand Down Expand Up @@ -304,6 +304,63 @@ class CustomResponse(cqrs.IResponse):

A complete example can be found in [request_response_types.py](https://github.com/vadikko2/cqrs/blob/master/examples/request_response_types.py)

## Mapping

To bind commands, queries and events with specific handlers, you can use the registries `EventMap` and `RequestMap`.

```python
from cqrs import requests, events

from app import commands, command_handlers
from app import queries, query_handlers
from app import events as event_models, event_handlers


def init_commands(mapper: requests.RequestMap) -> None:
mapper.bind(commands.JoinMeetingCommand, command_handlers.JoinMeetingCommandHandler)

def init_queries(mapper: requests.RequestMap) -> None:
mapper.bind(queries.ReadMeetingQuery, query_handlers.ReadMeetingQueryHandler)

def init_events(mapper: events.EventMap) -> None:
mapper.bind(events.NotificationEvent[event_models.NotificationMeetingRoomClosed], event_handlers.MeetingRoomClosedNotificationHandler)
mapper.bind(events.NotificationEvent[event_models.ECSTMeetingRoomClosed], event_handlers.UpdateMeetingRoomReadModelHandler)
```

## Bootstrap

The `python-cqrs` package implements a set of bootstrap utilities designed to simplify the initial configuration of an
application.

```python
import functools

from cqrs.events import bootstrap as event_bootstrap
from cqrs.requests import bootstrap as request_bootstrap

from app import dependencies, mapping, orm


@functools.lru_cache
def mediator_factory():
return request_bootstrap.bootstrap(
di_container=dependencies.setup_di(),
commands_mapper=mapping.init_commands,
queries_mapper=mapping.init_queries,
domain_events_mapper=mapping.init_events,
on_startup=[orm.init_store_event_mapper],
)


@functools.lru_cache
def event_mediator_factory():
return event_bootstrap.bootstrap(
di_container=dependencies.setup_di(),
events_mapper=mapping.init_events,
on_startup=[orm.init_store_event_mapper],
)
```

## Saga Pattern

The package implements the Orchestrated Saga pattern for managing distributed transactions across multiple services or operations.
Expand Down Expand Up @@ -689,17 +746,7 @@ loop.run_until_complete(periodically_task())
A complete example can be found in
the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/kafka_outboxed_event_producing.py)

## Transaction log tailing

If the Outbox polling strategy does not suit your needs, I recommend exploring
the [Transaction Log Tailing](https://microservices.io/patterns/data/transaction-log-tailing.html) pattern.
The current version of the python-cqrs package does not support the implementation of this pattern.

> [!TIP]
> However, it can be implemented
> using [Debezium + Kafka Connect](https://debezium.io/documentation/reference/stable/architecture.html),
> which allows you to produce all newly created events within the Outbox storage directly to the corresponding topic in
> Kafka (or any other broker).
**Transaction log tailing.** If Outbox polling does not suit you, consider [Transaction Log Tailing](https://microservices.io/patterns/data/transaction-log-tailing.html). The package does not implement it; you can use [Debezium + Kafka Connect](https://debezium.io/documentation/reference/stable/architecture.html) to tail the Outbox and produce events to Kafka.

## DI container

Expand Down Expand Up @@ -765,65 +812,10 @@ Complete examples can be found in:
- [Simple example](https://github.com/vadikko2/cqrs/blob/master/examples/dependency_injector_integration_simple_example.py)
- [Practical example with FastAPI](https://github.com/vadikko2/cqrs/blob/master/examples/dependency_injector_integration_practical_example.py)

## Mapping

To bind commands, queries and events with specific handlers, you can use the registries `EventMap` and `RequestMap`.

```python
from cqrs import requests, events

from app import commands, command_handlers
from app import queries, query_handlers
from app import events as event_models, event_handlers


def init_commands(mapper: requests.RequestMap) -> None:
mapper.bind(commands.JoinMeetingCommand, command_handlers.JoinMeetingCommandHandler)

def init_queries(mapper: requests.RequestMap) -> None:
mapper.bind(queries.ReadMeetingQuery, query_handlers.ReadMeetingQueryHandler)

def init_events(mapper: events.EventMap) -> None:
mapper.bind(events.NotificationEvent[events_models.NotificationMeetingRoomClosed], event_handlers.MeetingRoomClosedNotificationHandler)
mapper.bind(events.NotificationEvent[event_models.ECSTMeetingRoomClosed], event_handlers.UpdateMeetingRoomReadModelHandler)
```

## Bootstrap

The `python-cqrs` package implements a set of bootstrap utilities designed to simplify the initial configuration of an
application.

```python
import functools

from cqrs.events import bootstrap as event_bootstrap
from cqrs.requests import bootstrap as request_bootstrap

from app import dependencies, mapping, orm


@functools.lru_cache
def mediator_factory():
return request_bootstrap.bootstrap(
di_container=dependencies.setup_di(),
commands_mapper=mapping.init_commands,
queries_mapper=mapping.init_queries,
domain_events_mapper=mapping.init_events,
on_startup=[orm.init_store_event_mapper],
)


@functools.lru_cache
def event_mediator_factory():
return event_bootstrap.bootstrap(
di_container=dependencies.setup_di(),
events_mapper=mapping.init_events,
on_startup=[orm.init_store_event_mapper],
)
```

## Integration with presentation layers

The framework is ready for integration with **FastAPI** and **FastStream**.

> [!TIP]
> I recommend reading the useful
> paper [Onion Architecture Used in Software Development](https://www.researchgate.net/publication/371006360_Onion_Architecture_Used_in_Software_Development).
Expand Down Expand Up @@ -956,8 +948,5 @@ the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/fastap

## Protobuf messaging

The `python-cqrs` package supports integration with [protobuf](https://developers.google.com/protocol-buffers/).\\
Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data –
think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use
special generated source code to easily write and read your structured data to and from a variety of data streams and
using a variety of languages.
The `python-cqrs` package supports integration with [protobuf](https://developers.google.com/protocol-buffers/).
There is interface-level support for converting Notification events to Protobuf and back. Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.
10 changes: 10 additions & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ services:
command: --init-file /data/application/init.sql
volumes:
- ./tests/init_database.sql:/data/application/init.sql
postgres_dev:
image: postgres:15.4
hostname: postgres-dev
restart: always
environment:
POSTGRES_USER: cqrs
POSTGRES_PASSWORD: cqrs
POSTGRES_DB: cqrs
ports:
- "5433:5432"
kafka0:
image: confluentinc/cp-kafka:7.2.1
hostname: kafka0
Expand Down
10 changes: 10 additions & 0 deletions docker-compose-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ services:
command: --init-file /data/application/init.sql
volumes:
- ./tests/init_database.sql:/data/application/init.sql
postgres_tests:
image: postgres:15.4
hostname: postgres-test
restart: always
environment:
POSTGRES_USER: cqrs
POSTGRES_PASSWORD: cqrs
POSTGRES_DB: cqrs
ports:
- "5433:5432"
redis_tests:
image: redis:7.2
hostname: redis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,7 @@ def setup_logging() -> None:
)

# Add a StreamHandler if none exists
has_stream_handler = any(
isinstance(h, logging.StreamHandler) for h in root_logger.handlers
)
has_stream_handler = any(isinstance(h, logging.StreamHandler) for h in root_logger.handlers)
if not has_stream_handler:
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
Expand Down
4 changes: 1 addition & 3 deletions examples/kafka_event_consuming.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,7 @@ def mediator_factory() -> cqrs.EventMediator:
decoder=empty_message_decoder,
)
async def hello_world_event_handler(
body: cqrs.NotificationEvent[HelloWorldPayload]
| deserializers.DeserializeJsonError
| None,
body: cqrs.NotificationEvent[HelloWorldPayload] | deserializers.DeserializeJsonError | None,
msg: kafka.KafkaMessage,
mediator: cqrs.EventMediator = faststream.Depends(mediator_factory),
):
Expand Down
4 changes: 1 addition & 3 deletions examples/request_response_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,7 @@ async def handle(self, request: GetUserQuery) -> UserDetailsResponse:
raise ValueError(f"User {request.user_id} not found")

user = USER_STORAGE[request.user_id]
total_orders = sum(
1 for order in ORDER_STORAGE.values() if order["user_id"] == request.user_id
)
total_orders = sum(1 for order in ORDER_STORAGE.values() if order["user_id"] == request.user_id)

return UserDetailsResponse(
user_id=user["user_id"],
Expand Down
Loading