Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,22 @@ await broker.send_message(...)
The package implements the [Transactional Outbox](https://microservices.io/patterns/data/transactional-outbox.html)
pattern, which ensures that messages are produced to the broker according to the at-least-once semantics.

To use the outbox pattern with SQLAlchemy, you first need to define your outbox model using the provided mixin:

```python
from sqlalchemy.orm import DeclarativeBase
from cqrs.outbox import OutboxModelMixin

class Base(DeclarativeBase):
pass

class OutboxModel(Base, OutboxModelMixin):
__tablename__ = "outbox" # You can customize the table name
```
Then, you can use SqlAlchemyOutboxedEventRepository in your handlers:
```python
import cqrs

def do_some_logic(meeting_room_id: int, session: sql_session.AsyncSession):
"""
Make changes to the database
Expand Down
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ dependencies = [
"dependency-injector>=4.0",
"orjson==3.*",
"pydantic==2.*",
"python-dotenv==1.*",
"retry-async==0.1.*",
"sqlalchemy[asyncio]==2.0.*",
"python-dotenv==1.*",
"uuid6>=2025.0.1",
"typing-extensions>=4.0"
]
description = "Event-Driven Architecture Framework for Distributed Systems"
maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}]
name = "python-cqrs"
readme = "README.md"
requires-python = ">=3.10"
version = "4.9.0"
version = "4.9.1"

[project.optional-dependencies]
aiobreaker = ["aiobreaker>=0.3.0"]
Expand All @@ -52,6 +52,7 @@ dev = [
"pytest-asyncio~=0.21.1",
"pytest-env==0.6.2",
"cryptography==42.0.2",
"sqlalchemy[asyncio]==2.0.*",
"asyncmy==0.2.9",
"asyncpg>=0.29.0",
"redis>=5.0.0",
Expand Down
5 changes: 3 additions & 2 deletions src/cqrs/events/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import sys
import typing
import uuid6
import uuid

import dotenv
Expand Down Expand Up @@ -289,7 +290,7 @@ class UserRegisteredEvent(DCNotificationEvent[dict]):
event_name: str
payload: PayloadT

event_id: uuid.UUID = dataclasses.field(default_factory=uuid.uuid4)
event_id: uuid.UUID = dataclasses.field(default_factory=uuid6.uuid7)
event_timestamp: datetime.datetime = dataclasses.field(
default_factory=datetime.datetime.now,
)
Expand Down Expand Up @@ -350,7 +351,7 @@ class UserRegisteredEvent(PydanticNotificationEvent[dict]):

payload: PayloadT

event_id: uuid.UUID = pydantic.Field(default_factory=uuid.uuid4)
event_id: uuid.UUID = pydantic.Field(default_factory=uuid6.uuid7)
event_timestamp: datetime.datetime = pydantic.Field(
default_factory=datetime.datetime.now,
)
Expand Down
20 changes: 20 additions & 0 deletions src/cqrs/outbox/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from cqrs.outbox.repository import OutboxedEventRepository, EventStatus
from cqrs.outbox.map import OutboxedEventMap

__all__ = [
"OutboxedEventRepository",
"EventStatus",
"OutboxedEventMap",
]

try:
from cqrs.outbox.sqlalchemy import (
SqlAlchemyOutboxedEventRepository,
OutboxModelMixin
)
__all__.extend([
"SqlAlchemyOutboxedEventRepository",
"OutboxModelMixin"
])
except ImportError:
pass
138 changes: 89 additions & 49 deletions src/cqrs/outbox/sqlalchemy.py
Original file line number Diff line number Diff line change
@@ -1,93 +1,129 @@
import datetime
import logging
import typing

import dotenv
import uuid
import orjson
import sqlalchemy
from sqlalchemy import func
from sqlalchemy.dialects import mysql
from sqlalchemy.ext.asyncio import session as sql_session
from sqlalchemy.orm import DeclarativeMeta, registry

import sys
import cqrs
from cqrs import compressors
from cqrs.outbox import map, repository

Base = registry().generate_base()

logger = logging.getLogger(__name__)
if sys.version_info < (3, 13):
from typing_extensions import deprecated
else:
from warnings import deprecated

try:
import sqlalchemy
from sqlalchemy import func
from sqlalchemy.orm import Mapped, mapped_column, declared_attr, DeclarativeMeta
from sqlalchemy.ext.asyncio import session as sql_session
from sqlalchemy.dialects import postgresql
except ImportError:
raise ImportError(
"You are trying to use SQLAlchemy outbox implementation, "
"but 'sqlalchemy' is not installed. "
"Please install it using: pip install python-cqrs[sqlalchemy]"
)

dotenv.load_dotenv()

logger = logging.getLogger(__name__)
DEFAULT_OUTBOX_TABLE_NAME = "outbox"

MAX_FLUSH_COUNTER_VALUE = 5


class OutboxModel(Base):
__tablename__ = DEFAULT_OUTBOX_TABLE_NAME

__table_args__ = (
sqlalchemy.UniqueConstraint(
"event_id_bin",
"event_name",
name="event_id_unique_index",
),
)
id = sqlalchemy.Column(
sqlalchemy.BigInteger(),
class BinaryUUID(sqlalchemy.TypeDecorator):
"""Stores the UUID as a native UUID in Postgres and as BINARY(16) in other databases (MySQL)."""
impl = sqlalchemy.BINARY(16)
cache_ok = True

def load_dialect_impl(self, dialect):
if dialect.name == "postgresql":
return dialect.type_descriptor(postgresql.UUID())
else:
return dialect.type_descriptor(sqlalchemy.BINARY(16))

def process_bind_param(self, value, dialect):
if value is None:
return value
if dialect.name == "postgresql":
return value # asyncpg work with uuid.UUID
if isinstance(value, uuid.UUID):
return value.bytes # For MySQL return 16 bytes
return value
Comment on lines +46 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

process_bind_param silently passes non-uuid.UUID values through for non-PostgreSQL dialects.

If a string UUID (e.g., "550e8400-e29b-...") is passed for MySQL/SQLite, the final return value falls through without conversion, producing a type error or silent data corruption when SQLAlchemy tries to store it in a BINARY(16) column.

🛡️ Proposed fix: add a string-to-UUID conversion guard
     def process_bind_param(self, value, dialect):
         if value is None:
             return value
         if dialect.name == "postgresql":
             return value  # asyncpg works with uuid.UUID
+        if isinstance(value, str):
+            value = uuid.UUID(value)
         if isinstance(value, uuid.UUID):
             return value.bytes  # For MySQL return 16 bytes
         return value
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 46 - 53, The process_bind_param
method currently lets non-uuid.UUID values slip through for non-PostgreSQL
dialects which breaks BINARY(16) storage; update process_bind_param in the
SQLAlchemy type to detect str inputs and convert them to uuid.UUID (e.g.,
uuid.UUID(value)) before returning .bytes for MySQL/SQLite, keep returning the
original uuid.UUID for PostgreSQL, and raise a clear TypeError for any other
unexpected types so invalid values don't silently corrupt the DB.


def process_result_value(self, value, dialect):
if value is None:
return value
if dialect.name == "postgresql":
return value # asyncpg return uuid.UUID
if isinstance(value, bytes):
return uuid.UUID(bytes=value) # From MySQL got bytes, make UUID
return value


class OutboxModelMixin:
@declared_attr.directive
def __tablename__(self) -> str:
return DEFAULT_OUTBOX_TABLE_NAME

id: Mapped[int] = mapped_column(
sqlalchemy.BigInteger,
sqlalchemy.Identity(),
primary_key=True,
nullable=False,
autoincrement=True,
comment="Identity",
)
event_id = sqlalchemy.Column(
sqlalchemy.Uuid,
event_id: Mapped[uuid.UUID] = mapped_column(
BinaryUUID,
nullable=False,
comment="Event idempotency id",
)
event_id_bin = sqlalchemy.Column(
sqlalchemy.BINARY(16),
nullable=False,
comment="Event idempotency id in 16 bit presentation",
)
event_status = sqlalchemy.Column(
event_status: Mapped[repository.EventStatus] = mapped_column(
sqlalchemy.Enum(repository.EventStatus),
nullable=False,
default=repository.EventStatus.NEW,
comment="Event producing status",
)
flush_counter = sqlalchemy.Column(
sqlalchemy.SmallInteger(),
flush_counter: Mapped[int] = mapped_column(
sqlalchemy.SmallInteger,
nullable=False,
default=0,
comment="Event producing flush counter",
)
event_name = sqlalchemy.Column(
event_name: Mapped[typing.Text] = mapped_column(
sqlalchemy.String(255),
nullable=False,
comment="Event name",
)
topic = sqlalchemy.Column(
topic: Mapped[typing.Text] = mapped_column(
sqlalchemy.String(255),
nullable=False,
comment="Event topic",
default="",
comment="Event topic",
)
created_at = sqlalchemy.Column(
created_at: Mapped[datetime.datetime] = mapped_column(
sqlalchemy.DateTime,
nullable=False,
server_default=func.now(),
comment="Event creation timestamp",
)
payload = sqlalchemy.Column(
mysql.BLOB,
payload: Mapped[bytes] = mapped_column(
sqlalchemy.LargeBinary,
nullable=False,
default={},
comment="Event payload",
)

@declared_attr
def __table_args__(self):
return (
sqlalchemy.UniqueConstraint(
"event_id",
"event_name",
name="event_id_unique_index",
),
)

def row_to_dict(self) -> typing.Dict[typing.Text, typing.Any]:
return {column.name: getattr(self, column.name) for column in self.__table__.columns}

Expand Down Expand Up @@ -149,10 +185,12 @@ class SqlAlchemyOutboxedEventRepository(repository.OutboxedEventRepository):
def __init__(
self,
session: sql_session.AsyncSession,
outbox_model: type[OutboxModelMixin],
compressor: compressors.Compressor | None = None,
):
self.session = session
self._compressor = compressor
self._outbox_model = outbox_model

def add(
self,
Expand All @@ -172,17 +210,16 @@ def add(
bytes_payload = self._compressor.compress(bytes_payload)

self.session.add(
OutboxModel(
self._outbox_model(
event_id=event.event_id,
event_id_bin=func.UUID_TO_BIN(event.event_id),
event_name=event.event_name,
created_at=event.event_timestamp,
payload=bytes_payload,
topic=event.topic,
),
)

def _process_events(self, model: OutboxModel) -> repository.OutboxedEvent | None:
def _process_events(self, model: OutboxModelMixin) -> repository.OutboxedEvent | None:
event_dict = model.row_to_dict()

event_model = map.OutboxedEventMap.get(event_dict["event_name"])
Expand All @@ -207,8 +244,10 @@ async def get_many(
batch_size: int = 100,
topic: typing.Text | None = None,
) -> typing.List[repository.OutboxedEvent]:
events: typing.Sequence[OutboxModel] = (
(await self.session.execute(OutboxModel.get_batch_query(batch_size, topic))).scalars().all()
events: typing.Sequence[OutboxModelMixin] = (
(await self.session.execute(self._outbox_model.get_batch_query(batch_size, topic)))
.scalars()
.all()
)

result = []
Expand All @@ -227,7 +266,7 @@ async def update_status(
new_status: repository.EventStatus,
) -> None:
await self.session.execute(
statement=OutboxModel.update_status_query(outboxed_event_id, new_status),
statement=self._outbox_model.update_status_query(outboxed_event_id, new_status),
)

async def commit(self):
Expand All @@ -237,6 +276,7 @@ async def rollback(self):
await self.session.rollback()


@deprecated("This function is deprecated; use `OutboxModelMixin` instead")
def rebind_outbox_model(
model: typing.Any,
new_base: DeclarativeMeta,
Expand Down
22 changes: 14 additions & 8 deletions src/cqrs/saga/storage/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,25 @@
import os
import typing
import uuid

import dotenv
import sqlalchemy
from sqlalchemy import func
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy.orm import registry

from cqrs.dispatcher.exceptions import SagaConcurrencyError
from cqrs.saga.storage.enums import SagaStatus, SagaStepStatus
from cqrs.saga.storage.models import SagaLogEntry
from cqrs.saga.storage.protocol import ISagaStorage, SagaStorageRun

import dotenv
try:
import sqlalchemy
from sqlalchemy import func
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy.orm import registry
except ImportError:
raise ImportError(
"You are trying to use SQLAlchemy saga storage implementation, "
"but 'sqlalchemy' is not installed. "
"Please install it using: pip install python-cqrs[sqlalchemy]"
)

Base = registry().generate_base()
logger = logging.getLogger(__name__)

Expand Down
17 changes: 2 additions & 15 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,13 @@
import asyncio
from unittest import mock

import pytest

from unittest import mock
from cqrs.adapters import kafka


TEST_TOPIC = "TestCqrsTopic"

pytest_plugins = ["tests.integration.fixtures"]


@pytest.fixture(scope="session")
def event_loop():
"""Create event loop for session-scoped fixtures."""
loop = asyncio.new_event_loop()
try:
yield loop
finally:
if not loop.is_closed():
loop.close()


@pytest.fixture(scope="function")
async def kafka_producer() -> kafka.KafkaProducer:
producer = mock.create_autospec(kafka.KafkaProducer)
Expand Down
Empty file.
Loading
Loading