From c9349cce18154e04efac9e7d952b9546f68a73ab Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Sun, 22 Feb 2026 18:36:30 +0300 Subject: [PATCH 1/6] Make sqlalchemy an optional dependency. Implement BinaryUUID for PostgreSQL and MySQL compatibility. --- pyproject.toml | 5 +- src/cqrs/outbox/sqlalchemy.py | 83 +++++++++++++++++++++-------- src/cqrs/saga/storage/sqlalchemy.py | 19 ++++--- 3 files changed, 76 insertions(+), 31 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5f74f6f..61d5233 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,6 @@ dependencies = [ "pydantic==2.*", "python-dotenv==1.*", "retry-async==0.1.*", - "sqlalchemy[asyncio]==2.0.*", "typing-extensions>=4.0" ] description = "Event-Driven Architecture Framework for Distributed Systems" @@ -31,7 +30,7 @@ maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}] name = "python-cqrs" readme = "README.md" requires-python = ">=3.10" -version = "4.10.0" +version = "4.10.1" [project.optional-dependencies] aiobreaker = ["aiobreaker>=0.3.0"] @@ -48,6 +47,7 @@ dev = [ "aio-pika==9.3.0", # from rabbit "aiokafka==0.10.0", # from kafka "requests==2.*", # from aiokafka + "sqlalchemy[asyncio]==2.0.*", # from sqlalchemy "pytest~=7.4.2", "pytest-asyncio~=0.21.1", "pytest-env==0.6.2", @@ -68,6 +68,7 @@ examples = [ ] kafka = ["aiokafka==0.10.0"] rabbit = ["aio-pika==9.3.0"] +sqlalchemy = ["sqlalchemy[asyncio]==2.0.*"] [project.urls] Documentation = "https://mkdocs.python-cqrs.dev/" diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index b4f8bc4..ff1b93e 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -1,18 +1,28 @@ +import datetime import logging import typing import dotenv import orjson -import sqlalchemy -from sqlalchemy import func -from sqlalchemy.dialects import mysql -from sqlalchemy.ext.asyncio import session as sql_session -from sqlalchemy.orm import DeclarativeMeta, registry - import cqrs +import uuid from cqrs import compressors from cqrs.outbox import map, repository +try: + import sqlalchemy + from sqlalchemy import func + from sqlalchemy.orm import Mapped, mapped_column, declared_attr, DeclarativeMeta, registry + from sqlalchemy.ext.asyncio import session as sql_session + from sqlalchemy.dialects import postgresql +except ImportError: + raise ImportError( + "You are trying to use SQLAlchemy outbox implementation, " + "but 'sqlalchemy' is not installed. " + "Please install it using: pip install python-cqrs[sqlalchemy]" + ) + + Base = registry().generate_base() logger = logging.getLogger(__name__) @@ -24,6 +34,38 @@ MAX_FLUSH_COUNTER_VALUE = 5 +class BinaryUUID(sqlalchemy.TypeDecorator): + """Stores the UUID as a native UUID in Postgres and as BINARY(16) in other databases (MySQL).""" + impl = sqlalchemy.BINARY(16) + cache_ok = True + + def load_dialect_impl(self, dialect): + if dialect.name == "postgresql": + return dialect.type_descriptor(postgresql.UUID()) + else: + return dialect.type_descriptor(sqlalchemy.BINARY(16)) + + def process_bind_param(self, value, dialect): + if value is None: + return value + if dialect.name == "postgresql": + return value # asyncpg work with uuid.UUID + if isinstance(value, str): + value = uuid.UUID(value) + if isinstance(value, uuid.UUID): + return value.bytes # For MySQL return 16 bytes + return value + + def process_result_value(self, value, dialect): + if value is None: + return value + if dialect.name == "postgresql": + return value # asyncpg return uuid.UUID + if isinstance(value, bytes): + return uuid.UUID(bytes=value) # From MySQL got bytes, make UUID + return value + + class OutboxModel(Base): __tablename__ = DEFAULT_OUTBOX_TABLE_NAME @@ -34,55 +76,50 @@ class OutboxModel(Base): name="event_id_unique_index", ), ) - id = sqlalchemy.Column( - sqlalchemy.BigInteger(), + id: Mapped[int] = mapped_column( + sqlalchemy.BigInteger, sqlalchemy.Identity(), primary_key=True, nullable=False, autoincrement=True, comment="Identity", ) - event_id = sqlalchemy.Column( - sqlalchemy.Uuid, + event_id: Mapped[uuid.UUID] = mapped_column( + BinaryUUID, nullable=False, comment="Event idempotency id", ) - event_id_bin = sqlalchemy.Column( - sqlalchemy.BINARY(16), - nullable=False, - comment="Event idempotency id in 16 bit presentation", - ) - event_status = sqlalchemy.Column( + event_status: Mapped[repository.EventStatus] = mapped_column( sqlalchemy.Enum(repository.EventStatus), nullable=False, default=repository.EventStatus.NEW, comment="Event producing status", ) - flush_counter = sqlalchemy.Column( - sqlalchemy.SmallInteger(), + flush_counter: Mapped[int] = mapped_column( + sqlalchemy.SmallInteger, nullable=False, default=0, comment="Event producing flush counter", ) - event_name = sqlalchemy.Column( + event_name: Mapped[typing.Text] = mapped_column( sqlalchemy.String(255), nullable=False, comment="Event name", ) - topic = sqlalchemy.Column( + topic: Mapped[typing.Text] = mapped_column( sqlalchemy.String(255), nullable=False, comment="Event topic", default="", ) - created_at = sqlalchemy.Column( + created_at: Mapped[datetime.datetime] = mapped_column( sqlalchemy.DateTime, nullable=False, server_default=func.now(), comment="Event creation timestamp", ) - payload = sqlalchemy.Column( - mysql.BLOB, + payload: Mapped[bytes] = mapped_column( + sqlalchemy.LargeBinary, nullable=False, default={}, comment="Event payload", diff --git a/src/cqrs/saga/storage/sqlalchemy.py b/src/cqrs/saga/storage/sqlalchemy.py index 99e2d58..e9edc50 100644 --- a/src/cqrs/saga/storage/sqlalchemy.py +++ b/src/cqrs/saga/storage/sqlalchemy.py @@ -6,17 +6,24 @@ import uuid import dotenv -import sqlalchemy -from sqlalchemy import func -from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker -from sqlalchemy.orm import registry - from cqrs.dispatcher.exceptions import SagaConcurrencyError from cqrs.saga.storage.enums import SagaStatus, SagaStepStatus from cqrs.saga.storage.models import SagaLogEntry from cqrs.saga.storage.protocol import ISagaStorage, SagaStorageRun +try: + import sqlalchemy + from sqlalchemy import func + from sqlalchemy.exc import SQLAlchemyError + from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + from sqlalchemy.orm import registry +except ImportError: + raise ImportError( + "You are trying to use SQLAlchemy saga storage implementation, " + "but 'sqlalchemy' is not installed. " + "Please install it using: pip install python-cqrs[sqlalchemy]" + ) + Base = registry().generate_base() logger = logging.getLogger(__name__) From 5977dd25c8d0d1373fb95b3d767c0dbf926faaa8 Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Sun, 22 Feb 2026 18:41:45 +0300 Subject: [PATCH 2/6] compatibility --- src/cqrs/outbox/sqlalchemy.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index ff1b93e..447f1c5 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -89,6 +89,11 @@ class OutboxModel(Base): nullable=False, comment="Event idempotency id", ) + event_id_bin: Mapped[bytes] = mapped_column( + sqlalchemy.BINARY(16), + nullable=False, + comment="Event idempotency id in 16 bit presentation", + ) event_status: Mapped[repository.EventStatus] = mapped_column( sqlalchemy.Enum(repository.EventStatus), nullable=False, From bb538d9f1cdd6ac6bfccaa106eef0d816a8f1c1e Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Sun, 22 Feb 2026 18:43:43 +0300 Subject: [PATCH 3/6] lint fix --- src/cqrs/outbox/sqlalchemy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index 447f1c5..523f40c 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -12,7 +12,7 @@ try: import sqlalchemy from sqlalchemy import func - from sqlalchemy.orm import Mapped, mapped_column, declared_attr, DeclarativeMeta, registry + from sqlalchemy.orm import Mapped, mapped_column, DeclarativeMeta, registry from sqlalchemy.ext.asyncio import session as sql_session from sqlalchemy.dialects import postgresql except ImportError: From 39e4c8f40f3a2f32dc6f9568ee8da06d47f16c5e Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Sun, 22 Feb 2026 19:12:31 +0300 Subject: [PATCH 4/6] lint fix --- src/cqrs/outbox/sqlalchemy.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index 523f40c..35b965f 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -11,6 +11,7 @@ try: import sqlalchemy + from sqlalchemy import func from sqlalchemy.orm import Mapped, mapped_column, DeclarativeMeta, registry from sqlalchemy.ext.asyncio import session as sql_session @@ -19,7 +20,7 @@ raise ImportError( "You are trying to use SQLAlchemy outbox implementation, " "but 'sqlalchemy' is not installed. " - "Please install it using: pip install python-cqrs[sqlalchemy]" + "Please install it using: pip install python-cqrs[sqlalchemy]", ) @@ -36,6 +37,7 @@ class BinaryUUID(sqlalchemy.TypeDecorator): """Stores the UUID as a native UUID in Postgres and as BINARY(16) in other databases (MySQL).""" + impl = sqlalchemy.BINARY(16) cache_ok = True @@ -53,7 +55,7 @@ def process_bind_param(self, value, dialect): if isinstance(value, str): value = uuid.UUID(value) if isinstance(value, uuid.UUID): - return value.bytes # For MySQL return 16 bytes + return value.bytes # For MySQL return 16 bytes return value def process_result_value(self, value, dialect): From 99485f8cfd5320199db16233e868cd8e417f8f51 Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Sun, 22 Feb 2026 20:17:59 +0300 Subject: [PATCH 5/6] save sqlalchemy as required dependency --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 61d5233..9896e07 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "dependency-injector>=4.0", "orjson==3.*", "pydantic==2.*", + "sqlalchemy[asyncio]==2.0.*", "python-dotenv==1.*", "retry-async==0.1.*", "typing-extensions>=4.0" @@ -47,7 +48,6 @@ dev = [ "aio-pika==9.3.0", # from rabbit "aiokafka==0.10.0", # from kafka "requests==2.*", # from aiokafka - "sqlalchemy[asyncio]==2.0.*", # from sqlalchemy "pytest~=7.4.2", "pytest-asyncio~=0.21.1", "pytest-env==0.6.2", From 5e6e5eb42710f2f5be21bd4092c922b2aabf7c40 Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Sun, 22 Feb 2026 20:42:45 +0300 Subject: [PATCH 6/6] lint fix --- src/cqrs/outbox/sqlalchemy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index 35b965f..3bb7c40 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -218,7 +218,7 @@ def add( self.session.add( OutboxModel( event_id=event.event_id, - event_id_bin=func.UUID_TO_BIN(event.event_id), + event_id_bin=event.event_id.bytes, event_name=event.event_name, created_at=event.event_timestamp, payload=bytes_payload,