From a0c711ef4706f825c252368877a5d6120f00a285 Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Thu, 8 Jan 2026 18:25:58 +0300 Subject: [PATCH 01/15] sqlalchemy as optional deps and correction of sqlalchemy models of outbox --- pyproject.toml | 6 +- src/cqrs/__init__.py | 6 +- src/cqrs/outbox/__init__.py | 20 +++++ src/cqrs/outbox/sqlalchemy.py | 110 +++++++++++-------------- src/cqrs/saga/storage/sqlalchemy.py | 20 +++-- tests/integration/test_event_outbox.py | 9 -- 6 files changed, 86 insertions(+), 85 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2a09a6d..6c7b1f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,6 @@ dependencies = [ "pydantic==2.*", "orjson==3.9.15", "di[anyio]==0.79.2", - "sqlalchemy[asyncio]==2.0.*", "retry-async==0.1.4", "python-dotenv==1.0.1", "dependency-injector>=4.48.2" @@ -29,7 +28,7 @@ maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}] name = "python-cqrs" readme = "README.md" requires-python = ">=3.10" -version = "4.4.5" +version = "4.4.6" [project.optional-dependencies] dev = [ @@ -62,6 +61,9 @@ protobuf = ["protobuf==4.25.5"] rabbit = [ "aio-pika==9.3.0" ] +sqlalchemy = [ + "sqlalchemy[asyncio]==2.0.*" +] [project.urls] Documentation = "https://vadikko2.github.io/python-cqrs-mkdocs/" diff --git a/src/cqrs/__init__.py b/src/cqrs/__init__.py index 9060890..de17693 100644 --- a/src/cqrs/__init__.py +++ b/src/cqrs/__init__.py @@ -13,10 +13,7 @@ ) from cqrs.outbox.map import OutboxedEventMap from cqrs.outbox.repository import OutboxedEventRepository -from cqrs.outbox.sqlalchemy import ( - rebind_outbox_model, - SqlAlchemyOutboxedEventRepository, -) +from cqrs.outbox.sqlalchemy import SqlAlchemyOutboxedEventRepository from cqrs.producer import EventProducer from cqrs.requests.map import RequestMap, SagaMap from cqrs.requests.request import Request @@ -56,7 +53,6 @@ "DIContainer", "Compressor", "ZlibCompressor", - "rebind_outbox_model", "Saga", "SagaStepHandler", "ContextT", diff --git a/src/cqrs/outbox/__init__.py b/src/cqrs/outbox/__init__.py index e69de29..59fb75a 100644 --- a/src/cqrs/outbox/__init__.py +++ b/src/cqrs/outbox/__init__.py @@ -0,0 +1,20 @@ +from cqrs.outbox.repository import OutboxedEventRepository, EventStatus +from cqrs.outbox.map import OutboxedEventMap + +__all__ = [ + "OutboxedEventRepository", + "EventStatus", + "OutboxedEventMap", +] + +try: + from cqrs.outbox.sqlalchemy import ( + SqlAlchemyOutboxedEventRepository, + OutboxModelMixin + ) + __all__.extend([ + "SqlAlchemyOutboxedEventRepository", + "OutboxModelMixin" + ]) +except ImportError: + pass diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index aeb126f..a251a81 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -1,93 +1,92 @@ +import datetime import logging import typing - -import dotenv +import uuid import orjson -import sqlalchemy -from sqlalchemy import func -from sqlalchemy.dialects import mysql -from sqlalchemy.ext.asyncio import session as sql_session -from sqlalchemy.orm import DeclarativeMeta, registry - import cqrs from cqrs import compressors from cqrs.outbox import map, repository -Base = registry().generate_base() - -logger = logging.getLogger(__name__) +try: + import sqlalchemy + from sqlalchemy import func + from sqlalchemy.orm import Mapped, mapped_column, declared_attr + from sqlalchemy.ext.asyncio import session as sql_session +except ImportError: + raise ImportError( + "You are trying to use SQLAlchemy outbox implementation, " + "but 'sqlalchemy' is not installed. " + "Please install it using: pip install python-cqrs[sqlalchemy]" + ) -dotenv.load_dotenv() +logger = logging.getLogger(__name__) DEFAULT_OUTBOX_TABLE_NAME = "outbox" - MAX_FLUSH_COUNTER_VALUE = 5 -class OutboxModel(Base): - __tablename__ = DEFAULT_OUTBOX_TABLE_NAME +class OutboxModelMixin: + @declared_attr.directive + def __tablename__(self) -> str: + return DEFAULT_OUTBOX_TABLE_NAME - __table_args__ = ( - sqlalchemy.UniqueConstraint( - "event_id_bin", - "event_name", - name="event_id_unique_index", - ), - ) - id = sqlalchemy.Column( - sqlalchemy.BigInteger(), + id: Mapped[int] = mapped_column( + sqlalchemy.BigInteger, sqlalchemy.Identity(), primary_key=True, nullable=False, - autoincrement=True, comment="Identity", ) - event_id = sqlalchemy.Column( + event_id: Mapped[uuid.UUID] = mapped_column( sqlalchemy.Uuid, nullable=False, comment="Event idempotency id", ) - event_id_bin = sqlalchemy.Column( - sqlalchemy.BINARY(16), - nullable=False, - comment="Event idempotency id in 16 bit presentation", - ) - event_status = sqlalchemy.Column( + event_status: Mapped[repository.EventStatus] = mapped_column( sqlalchemy.Enum(repository.EventStatus), nullable=False, default=repository.EventStatus.NEW, comment="Event producing status", ) - flush_counter = sqlalchemy.Column( - sqlalchemy.SmallInteger(), + flush_counter: Mapped[int] = mapped_column( + sqlalchemy.SmallInteger, nullable=False, default=0, comment="Event producing flush counter", ) - event_name = sqlalchemy.Column( + event_name: Mapped[typing.Text] = mapped_column( sqlalchemy.String(255), nullable=False, comment="Event name", ) - topic = sqlalchemy.Column( + topic: Mapped[typing.Text] = mapped_column( sqlalchemy.String(255), nullable=False, - comment="Event topic", default="", + comment="Event topic", ) - created_at = sqlalchemy.Column( + created_at: Mapped[datetime.datetime] = mapped_column( sqlalchemy.DateTime, nullable=False, server_default=func.now(), comment="Event creation timestamp", ) - payload = sqlalchemy.Column( - mysql.BLOB, + payload: Mapped[bytes] = mapped_column( + sqlalchemy.LargeBinary, nullable=False, - default={}, comment="Event payload", ) + @declared_attr + def __table_args__(self): + return ( + sqlalchemy.UniqueConstraint( + "event_id", + "event_name", + name="event_id_unique_index", + ), + ) + def row_to_dict(self) -> typing.Dict[typing.Text, typing.Any]: return { column.name: getattr(self, column.name) for column in self.__table__.columns @@ -153,10 +152,12 @@ class SqlAlchemyOutboxedEventRepository(repository.OutboxedEventRepository): def __init__( self, session: sql_session.AsyncSession, + outbox_model: type[OutboxModelMixin], compressor: compressors.Compressor | None = None, ): self.session = session self._compressor = compressor + self._outbox_model = outbox_model def add( self, @@ -176,9 +177,8 @@ def add( bytes_payload = self._compressor.compress(bytes_payload) self.session.add( - OutboxModel( + self._outbox_model( event_id=event.event_id, - event_id_bin=func.UUID_TO_BIN(event.event_id), event_name=event.event_name, created_at=event.event_timestamp, payload=bytes_payload, @@ -186,12 +186,12 @@ def add( ), ) - def _process_events(self, model: OutboxModel) -> repository.OutboxedEvent | None: + def _process_events(self, model: OutboxModelMixin) -> repository.OutboxedEvent | None: event_dict = model.row_to_dict() event_model = map.OutboxedEventMap.get(event_dict["event_name"]) if event_model is None: - return + return None if self._compressor is not None: event_dict["payload"] = self._compressor.decompress(event_dict["payload"]) @@ -209,8 +209,8 @@ async def get_many( batch_size: int = 100, topic: typing.Text | None = None, ) -> typing.List[repository.OutboxedEvent]: - events: typing.Sequence[OutboxModel] = ( - (await self.session.execute(OutboxModel.get_batch_query(batch_size, topic))) + events: typing.Sequence[OutboxModelMixin] = ( + (await self.session.execute(self._outbox_model.get_batch_query(batch_size, topic))) .scalars() .all() ) @@ -231,7 +231,7 @@ async def update_status( new_status: repository.EventStatus, ) -> None: await self.session.execute( - statement=OutboxModel.update_status_query(outboxed_event_id, new_status), + statement=self._outbox_model.update_status_query(outboxed_event_id, new_status), ) async def commit(self): @@ -239,17 +239,3 @@ async def commit(self): async def rollback(self): await self.session.rollback() - - -def rebind_outbox_model( - model: typing.Any, - new_base: DeclarativeMeta, - table_name: typing.Text | None = None, -): - model.__bases__ = (new_base,) - model.__table__.name = table_name or model.__table__.name - new_base.metadata._add_table( - model.__table__.name, - model.__table__.schema, - model.__table__, - ) diff --git a/src/cqrs/saga/storage/sqlalchemy.py b/src/cqrs/saga/storage/sqlalchemy.py index 3be685d..b3776b2 100644 --- a/src/cqrs/saga/storage/sqlalchemy.py +++ b/src/cqrs/saga/storage/sqlalchemy.py @@ -2,18 +2,24 @@ import logging import typing import uuid - -import sqlalchemy -from sqlalchemy import func -from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker -from sqlalchemy.orm import registry - from cqrs.dispatcher.exceptions import SagaConcurrencyError from cqrs.saga.storage.enums import SagaStatus, SagaStepStatus from cqrs.saga.storage.models import SagaLogEntry from cqrs.saga.storage.protocol import ISagaStorage +try: + import sqlalchemy + from sqlalchemy import func + from sqlalchemy.exc import SQLAlchemyError + from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + from sqlalchemy.orm import registry +except ImportError: + raise ImportError( + "You are trying to use SQLAlchemy saga storage implementation, " + "but 'sqlalchemy' is not installed. " + "Please install it using: pip install python-cqrs[sqlalchemy]" + ) + Base = registry().generate_base() logger = logging.getLogger(__name__) diff --git a/tests/integration/test_event_outbox.py b/tests/integration/test_event_outbox.py index bb0f542..971262e 100644 --- a/tests/integration/test_event_outbox.py +++ b/tests/integration/test_event_outbox.py @@ -212,12 +212,3 @@ async def test_mark_as_failure_negative(self, session): produce_candidates = await repository.get_many(batch_size=1) assert not len(produce_candidates) - - -async def test_rebind_outbox_model_positive(init_orm, session): - custom_base = registry().generate_base() - sqlalchemy.rebind_outbox_model(sqlalchemy.OutboxModel, custom_base, "rebind_outbox") - await init_orm.run_sync(custom_base.metadata.create_all) - - async with session.begin(): - await session.execute(sqla.text("SELECT * FROM rebind_outbox WHERE True;")) From befd9292cdc774b3e521fbe7126435064822b6d0 Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Fri, 9 Jan 2026 13:33:53 +0300 Subject: [PATCH 02/15] tests and readme fix --- README.md | 15 +++ pyproject.toml | 11 +- src/cqrs/events/event.py | 3 +- src/cqrs/outbox/sqlalchemy.py | 35 +++++- tests/conftest.py | 17 +-- tests/integration/fixtures.py | 94 -------------- tests/integration/sqlalchemy/__init__.py | 0 tests/integration/sqlalchemy/conftest.py | 51 ++++++++ .../{ => sqlalchemy}/test_event_outbox.py | 116 +++++++++++------- .../test_saga_mediator_sqlalchemy.py | 0 .../test_saga_storage_sqlalchemy.py | 2 - 11 files changed, 182 insertions(+), 162 deletions(-) delete mode 100644 tests/integration/fixtures.py create mode 100644 tests/integration/sqlalchemy/__init__.py create mode 100644 tests/integration/sqlalchemy/conftest.py rename tests/integration/{ => sqlalchemy}/test_event_outbox.py (60%) rename tests/integration/{ => sqlalchemy}/test_saga_mediator_sqlalchemy.py (100%) rename tests/integration/{ => sqlalchemy}/test_saga_storage_sqlalchemy.py (98%) diff --git a/README.md b/README.md index e5664b6..68464d6 100644 --- a/README.md +++ b/README.md @@ -467,7 +467,22 @@ await broker.send_message(...) The package implements the [Transactional Outbox](https://microservices.io/patterns/data/transactional-outbox.html) pattern, which ensures that messages are produced to the broker according to the at-least-once semantics. +To use the outbox pattern with SQLAlchemy, you first need to define your outbox model using the provided mixin: + ```python +from sqlalchemy.orm import DeclarativeBase +from cqrs.outbox import OutboxModelMixin + +class Base(DeclarativeBase): + pass + +class OutboxModel(Base, OutboxModelMixin): + __tablename__ = "outbox" # You can customize the table name +``` +Then, you can use SqlAlchemyOutboxedEventRepository in your handlers: +```python +import cqrs + def do_some_logic(meeting_room_id: int, session: sql_session.AsyncSession): """ Make changes to the database diff --git a/pyproject.toml b/pyproject.toml index 6c7b1f6..12c3a3b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,10 @@ dependencies = [ "di[anyio]==0.79.2", "retry-async==0.1.4", "python-dotenv==1.0.1", - "dependency-injector>=4.48.2" + "dependency-injector>=4.48.2", + "uuid6>=2025.0.1", + "aio-pika>=9.3.0", + "confluent-kafka>=2.6.0", ] description = "Python CQRS pattern implementation" maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}] @@ -43,8 +46,12 @@ dev = [ "pytest-asyncio~=0.21.1", "pytest-env==0.6.2", "cryptography==42.0.2", + "sqlalchemy[asyncio]==2.0.*", "asyncmy==0.2.9", - "requests>=2.32.5" + "requests>=2.32.5", + "confluent-kafka==2.6.0", + "protobuf==4.25.5", + "aio-pika==9.3.0" ] examples = [ "fastapi==0.109.*", diff --git a/src/cqrs/events/event.py b/src/cqrs/events/event.py index 3364f0f..e31a646 100644 --- a/src/cqrs/events/event.py +++ b/src/cqrs/events/event.py @@ -1,6 +1,7 @@ import datetime import os import typing +import uuid6 import uuid import dotenv @@ -30,7 +31,7 @@ class NotificationEvent(Event, typing.Generic[PayloadT], frozen=True): The base class for notification events """ - event_id: uuid.UUID = pydantic.Field(default_factory=uuid.uuid4) + event_id: uuid.UUID = pydantic.Field(default_factory=uuid6.uuid7) event_timestamp: datetime.datetime = pydantic.Field( default_factory=datetime.datetime.now, ) diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index a251a81..d20ea10 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -3,6 +3,8 @@ import typing import uuid import orjson +import uuid6 + import cqrs from cqrs import compressors from cqrs.outbox import map, repository @@ -12,6 +14,7 @@ from sqlalchemy import func from sqlalchemy.orm import Mapped, mapped_column, declared_attr from sqlalchemy.ext.asyncio import session as sql_session + from sqlalchemy.dialects import postgresql except ImportError: raise ImportError( "You are trying to use SQLAlchemy outbox implementation, " @@ -25,6 +28,36 @@ MAX_FLUSH_COUNTER_VALUE = 5 +class BinaryUUID(sqlalchemy.TypeDecorator): + """Stores the UUID as a native UUID in Postgres and as BINARY(16) in other databases (MySQL).""" + impl = sqlalchemy.BINARY(16) + cache_ok = True + + def load_dialect_impl(self, dialect): + if dialect.name == "postgresql": + return dialect.type_descriptor(postgresql.UUID()) + else: + return dialect.type_descriptor(sqlalchemy.BINARY(16)) + + def process_bind_param(self, value, dialect): + if value is None: + return value + if dialect.name == "postgresql": + return value # asyncpg work with uuid.UUID + if isinstance(value, uuid.UUID): + return value.bytes # For MySQL return 16 bytes + return value + + def process_result_value(self, value, dialect): + if value is None: + return value + if dialect.name == "postgresql": + return value # asyncpg return uuid.UUID + if isinstance(value, bytes): + return uuid.UUID(bytes=value) # From MySQL got bytes, make UUID + return value + + class OutboxModelMixin: @declared_attr.directive def __tablename__(self) -> str: @@ -38,7 +71,7 @@ def __tablename__(self) -> str: comment="Identity", ) event_id: Mapped[uuid.UUID] = mapped_column( - sqlalchemy.Uuid, + BinaryUUID, nullable=False, comment="Event idempotency id", ) diff --git a/tests/conftest.py b/tests/conftest.py index b76341b..ce5f4bf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,26 +1,13 @@ -import asyncio -from unittest import mock - import pytest - +from unittest import mock from cqrs.adapters import kafka + TEST_TOPIC = "TestCqrsTopic" pytest_plugins = ["tests.integration.fixtures"] -@pytest.fixture(scope="session") -def event_loop(): - """Create event loop for session-scoped fixtures.""" - loop = asyncio.new_event_loop() - try: - yield loop - finally: - if not loop.is_closed(): - loop.close() - - @pytest.fixture(scope="function") async def kafka_producer() -> kafka.KafkaProducer: return mock.create_autospec(kafka.KafkaProducer) diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py deleted file mode 100644 index 8965e73..0000000 --- a/tests/integration/fixtures.py +++ /dev/null @@ -1,94 +0,0 @@ -import contextlib -import functools -import os - -import dotenv -import pytest -from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine - -from cqrs.outbox import sqlalchemy - -dotenv.load_dotenv() -DATABASE_DSN = os.environ.get("DATABASE_DSN", "") - - -@pytest.fixture(scope="function") -async def init_orm(): - """Initialize outbox tables - drops and creates tables before each test.""" - engine = create_async_engine( - DATABASE_DSN, - pool_pre_ping=True, - pool_size=10, - max_overflow=30, - echo=False, - ) - async with engine.begin() as connect: - await connect.run_sync(sqlalchemy.Base.metadata.drop_all) - await connect.run_sync(sqlalchemy.Base.metadata.create_all) - yield connect - - -@pytest.fixture(scope="function") -async def session(init_orm): - engine_factory = functools.partial( - create_async_engine, - DATABASE_DSN, - isolation_level="REPEATABLE READ", - ) - session = async_sessionmaker(engine_factory())() - async with contextlib.aclosing(session): - yield session - - -# Saga storage fixtures -@pytest.fixture(scope="session") -async def init_saga_orm(): - """Initialize saga storage tables - drops and creates tables BEFORE test only.""" - from cqrs.saga.storage.sqlalchemy import Base - - engine = create_async_engine( - DATABASE_DSN, - pool_pre_ping=True, - pool_size=10, - max_overflow=30, - echo=False, - ) - # Drop and create tables BEFORE test (not after) - # Use begin() to ensure tables are created, but don't keep transaction open - async with engine.begin() as connect: - await connect.run_sync(Base.metadata.drop_all) - await connect.run_sync(Base.metadata.create_all) - - # Yield engine so it can be used for sessions - # Data will persist after test because we don't drop tables in cleanup - yield engine - - # Cleanup: dispose engine but DON'T drop tables - keep data in DB - await engine.dispose() - - -@pytest.fixture(scope="session") -def saga_session_factory(init_saga_orm): - """Create a session factory for saga storage tests.""" - engine = init_saga_orm - return async_sessionmaker(engine, expire_on_commit=False, autocommit=False) - - -@pytest.fixture(scope="session") -async def saga_session(saga_session_factory): - """Create a session for saga storage tests - commits data to persist.""" - # Use autocommit=False but ensure we commit explicitly - session = saga_session_factory() - - async with contextlib.aclosing(session): - try: - yield session - # Final commit before closing to ensure data persists - if session.in_transaction(): - await session.commit() - except Exception: - # Only rollback on exception - if session.in_transaction(): - await session.rollback() - raise - # No cleanup that would delete data - data persists in DB diff --git a/tests/integration/sqlalchemy/__init__.py b/tests/integration/sqlalchemy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/sqlalchemy/conftest.py b/tests/integration/sqlalchemy/conftest.py new file mode 100644 index 0000000..44208a6 --- /dev/null +++ b/tests/integration/sqlalchemy/conftest.py @@ -0,0 +1,51 @@ +import pytest +import os +import asyncio +from sqlalchemy.ext import asyncio as sqla_async +import dotenv + +dotenv.load_dotenv() +DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///:memory:") + +@pytest.fixture(scope="session") +def event_loop(): + loop = asyncio.new_event_loop() + yield loop + loop.close() + + +@pytest.fixture(scope="session") +async def engine(): + engine = sqla_async.create_async_engine(url=DATABASE_URL, pool_pre_ping=True) + yield engine + await engine.dispose() + + +@pytest.fixture(scope="function") +async def session(engine): + connection = await engine.connect() + transaction = await connection.begin() + + session_maker = sqla_async.async_sessionmaker(bind=connection, expire_on_commit=False) + session = session_maker() + + yield session + + await session.close() + await transaction.rollback() + await connection.close() + + +@pytest.fixture(scope="session") +async def init_saga_orm(engine): + """Initialize saga storage tables.""" + from cqrs.saga.storage.sqlalchemy import Base + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await conn.run_sync(Base.metadata.create_all) + yield + + +@pytest.fixture(scope="session") +def saga_session_factory(saga_engine, init_saga_orm): + return sqla_async.async_sessionmaker(saga_engine, expire_on_commit=False) \ No newline at end of file diff --git a/tests/integration/test_event_outbox.py b/tests/integration/sqlalchemy/test_event_outbox.py similarity index 60% rename from tests/integration/test_event_outbox.py rename to tests/integration/sqlalchemy/test_event_outbox.py index 971262e..829180a 100644 --- a/tests/integration/test_event_outbox.py +++ b/tests/integration/sqlalchemy/test_event_outbox.py @@ -1,8 +1,9 @@ import typing import pydantic +import pytest +from sqlalchemy import orm as sqla_orm import sqlalchemy as sqla -from sqlalchemy.orm import registry import cqrs from cqrs import events, Request, RequestHandler @@ -13,6 +14,14 @@ ) +class Base(sqla_orm.DeclarativeBase): + pass + + +class TestOutboxModel(Base, sqlalchemy.OutboxModelMixin): + __tablename__ = "test_outbox" + + class OutboxRequest(Request): message: typing.Text count: int @@ -52,19 +61,38 @@ async def handle(self, request: OutboxRequest) -> None: await self.repository.commit() +@pytest.fixture +def outbox_repo(session): + return sqlalchemy.SqlAlchemyOutboxedEventRepository( + session, + outbox_model=TestOutboxModel + ) + + class TestOutbox: - async def test_outbox_add_3_event_positive(self, session): + + @pytest.fixture(autouse=True) + async def setup_db(self, engine): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + @pytest.fixture(autouse=True) + async def cleanup_table(self, session, setup_db): + await session.execute(sqla.text(f"TRUNCATE TABLE {TestOutboxModel.__tablename__}")) + await session.commit() + yield + + async def test_outbox_add_3_event_positive(self, outbox_repo): """ checks positive save events to outbox case """ - repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session) request = OutboxRequest(message="test_outbox_add_3_event_positive", count=3) - await OutboxRequestHandler(repository).handle(request) + await OutboxRequestHandler(outbox_repo).handle(request) not_produced_events: typing.List[ outbox_repository.OutboxedEvent - ] = await repository.get_many(3) - await session.commit() + ] = await outbox_repo.get_many(3) + await outbox_repo.commit() assert len(not_produced_events) == 3 assert all( @@ -80,135 +108,129 @@ async def test_outbox_add_3_event_positive(self, session): ), ) - async def test_get_new_events_positive(self, session): + async def test_get_new_events_positive(self, outbox_repo): """ checks getting many new events """ - repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session) request = OutboxRequest( message="test_outbox_mark_event_as_produced_positive", count=3, ) - await OutboxRequestHandler(repository).handle(request) + await OutboxRequestHandler(outbox_repo).handle(request) - events_list = await repository.get_many(3) - await session.commit() + events_list = await outbox_repo.get_many(3) + await outbox_repo.commit() assert len(events_list) == 3 - async def test_get_new_events_negative(self, session): + async def test_get_new_events_negative(self, outbox_repo): """ checks getting many new events, but not produced """ - repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session) request = OutboxRequest( message="test_outbox_mark_event_as_produced_positive", count=3, ) - await OutboxRequestHandler(repository).handle(request) - events_list = await repository.get_many(3) - await repository.update_status( + await OutboxRequestHandler(outbox_repo).handle(request) + events_list = await outbox_repo.get_many(3) + await outbox_repo.update_status( events_list[-1].id, repository_protocol.EventStatus.PRODUCED, ) - await session.commit() + await outbox_repo.commit() - new_events_list = await repository.get_many(3) - await session.commit() + new_events_list = await outbox_repo.get_many(3) + await outbox_repo.commit() assert len(new_events_list) == 2 - async def test_get_new_event_positive(self, session): + async def test_get_new_event_positive(self, outbox_repo): """ checks getting one event positive """ - repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session) request = OutboxRequest( message="test_outbox_mark_event_as_produced_positive", count=1, ) - await OutboxRequestHandler(repository).handle(request) - [event_over_get_all_events_method] = await repository.get_many(1) + await OutboxRequestHandler(outbox_repo).handle(request) + [event_over_get_all_events_method] = await outbox_repo.get_many(1) event: outbox_repository.OutboxedEvent | None = next( iter( - await repository.get_many( + await outbox_repo.get_many( batch_size=1, ), ), None, ) - await session.commit() + await outbox_repo.commit() assert event assert event.id == event_over_get_all_events_method.id # noqa - async def test_get_new_event_negative(self, session): + async def test_get_new_event_negative(self, outbox_repo): """ checks getting one event positive, but not produced """ - repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session) request = OutboxRequest( message="test_outbox_mark_event_as_produced_positive", count=1, ) - await OutboxRequestHandler(repository).handle(request) - [event_over_get_all_events_method] = await repository.get_many(1) - await repository.update_status( + await OutboxRequestHandler(outbox_repo).handle(request) + [event_over_get_all_events_method] = await outbox_repo.get_many(1) + await outbox_repo.update_status( event_over_get_all_events_method.id, repository_protocol.EventStatus.PRODUCED, ) - await session.commit() + await outbox_repo.commit() - event = await repository.get_many( + event = await outbox_repo.get_many( batch_size=1, ) - await session.commit() + await outbox_repo.commit() assert not event - async def test_mark_as_failure_positive(self, session): + async def test_mark_as_failure_positive(self, outbox_repo): """checks reading failure produced event successfully""" - repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session) request = OutboxRequest( message="test_outbox_mark_event_as_produced_positive", count=2, ) - await OutboxRequestHandler(repository).handle(request) - [failure_event, success_event] = await repository.get_many(2) + await OutboxRequestHandler(outbox_repo).handle(request) + [failure_event, success_event] = await outbox_repo.get_many(2) # mark FIRST event as failure - await repository.update_status( + await outbox_repo.update_status( failure_event.id, repository_protocol.EventStatus.NOT_PRODUCED, ) - await session.commit() + await outbox_repo.commit() - produce_candidates = await repository.get_many(batch_size=2) + produce_candidates = await outbox_repo.get_many(batch_size=2) assert len(produce_candidates) == 2 # check events order by status assert produce_candidates[0].id == success_event.id assert produce_candidates[1].id == failure_event.id - async def test_mark_as_failure_negative(self, session): + async def test_mark_as_failure_negative(self, outbox_repo): """checks reading failure produced events with flush_counter speeding""" - repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session) request = OutboxRequest( message="test_outbox_mark_event_as_produced_positive", count=1, ) - await OutboxRequestHandler(repository).handle(request) - [failure_event] = await repository.get_many(1) + await OutboxRequestHandler(outbox_repo).handle(request) + [failure_event] = await outbox_repo.get_many(1) for _ in range(sqlalchemy.MAX_FLUSH_COUNTER_VALUE): - await repository.update_status( + await outbox_repo.update_status( failure_event.id, repository_protocol.EventStatus.NOT_PRODUCED, ) - await session.commit() + await outbox_repo.commit() - produce_candidates = await repository.get_many(batch_size=1) + produce_candidates = await outbox_repo.get_many(batch_size=1) assert not len(produce_candidates) diff --git a/tests/integration/test_saga_mediator_sqlalchemy.py b/tests/integration/sqlalchemy/test_saga_mediator_sqlalchemy.py similarity index 100% rename from tests/integration/test_saga_mediator_sqlalchemy.py rename to tests/integration/sqlalchemy/test_saga_mediator_sqlalchemy.py diff --git a/tests/integration/test_saga_storage_sqlalchemy.py b/tests/integration/sqlalchemy/test_saga_storage_sqlalchemy.py similarity index 98% rename from tests/integration/test_saga_storage_sqlalchemy.py rename to tests/integration/sqlalchemy/test_saga_storage_sqlalchemy.py index a4d92c6..6050057 100644 --- a/tests/integration/test_saga_storage_sqlalchemy.py +++ b/tests/integration/sqlalchemy/test_saga_storage_sqlalchemy.py @@ -9,8 +9,6 @@ from cqrs.saga.storage.enums import SagaStatus, SagaStepStatus from cqrs.saga.storage.sqlalchemy import SqlAlchemySagaStorage -# Fixtures init_saga_orm and saga_session_factory are imported from tests/integration/fixtures.py - @pytest.fixture def storage( From a595c7d75c868432d0d49016107fc347aac8783f Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Thu, 19 Feb 2026 14:52:34 +0300 Subject: [PATCH 03/15] returned deprecated method rebind_outbox_model --- src/cqrs/outbox/sqlalchemy.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index 888f27c..7fae7d4 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -274,3 +274,18 @@ async def commit(self): async def rollback(self): await self.session.rollback() + + +@warnings.deprecated() +def rebind_outbox_model( + model: typing.Any, + new_base: DeclarativeMeta, + table_name: typing.Text | None = None, +): + model.__bases__ = (new_base,) + model.__table__.name = table_name or model.__table__.name + new_base.metadata._add_table( + model.__table__.name, + model.__table__.schema, + model.__table__, + ) From 3a642e46b541c6e8b64b685cba2c0257a29992e7 Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Thu, 19 Feb 2026 14:54:34 +0300 Subject: [PATCH 04/15] replaced uuid4 by uuid7 in PydanticNotificationEvent --- src/cqrs/events/event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cqrs/events/event.py b/src/cqrs/events/event.py index e46d0e6..b0b01d1 100644 --- a/src/cqrs/events/event.py +++ b/src/cqrs/events/event.py @@ -290,7 +290,7 @@ class UserRegisteredEvent(DCNotificationEvent[dict]): event_name: str payload: PayloadT - event_id: uuid.UUID = dataclasses.field(default_factory=uuid.uuid4) + event_id: uuid.UUID = dataclasses.field(default_factory=uuid6.uuid7) event_timestamp: datetime.datetime = dataclasses.field( default_factory=datetime.datetime.now, ) From 0ce18ec93e64c610457cd8a3a80950fa60e42344 Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Thu, 19 Feb 2026 15:01:09 +0300 Subject: [PATCH 05/15] remove useless import --- src/cqrs/outbox/sqlalchemy.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index 7fae7d4..47d04d5 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -3,7 +3,6 @@ import typing import uuid import orjson -import uuid6 import cqrs from cqrs import compressors From 733ff7f6a89ac412fc0ddeb11d9389cd1414534d Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Thu, 19 Feb 2026 15:07:49 +0300 Subject: [PATCH 06/15] remove useless import --- src/cqrs/outbox/sqlalchemy.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index 47d04d5..556240c 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -8,6 +8,11 @@ from cqrs import compressors from cqrs.outbox import map, repository +if sys.version_info < (3, 13): + from typing_extensions import deprecated +else: + from warnings import deprecated + try: import sqlalchemy from sqlalchemy import func @@ -275,7 +280,7 @@ async def rollback(self): await self.session.rollback() -@warnings.deprecated() +@depricated("This function is deprecated; use `OutboxModelMixin` instead") def rebind_outbox_model( model: typing.Any, new_base: DeclarativeMeta, From c00af44abf09b8fe48d470050bcf5db61ba4d17e Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Thu, 19 Feb 2026 15:11:03 +0300 Subject: [PATCH 07/15] tests fix --- tests/integration/sqlalchemy/test_event_outbox.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/sqlalchemy/test_event_outbox.py b/tests/integration/sqlalchemy/test_event_outbox.py index 5ad6aea..803977f 100644 --- a/tests/integration/sqlalchemy/test_event_outbox.py +++ b/tests/integration/sqlalchemy/test_event_outbox.py @@ -78,7 +78,7 @@ async def setup_db(self, engine): @pytest.fixture(autouse=True) async def cleanup_table(self, session, setup_db): - await session.execute(sqla.text(f"TRUNCATE TABLE {TestOutboxModel.__tablename__}")) + await session.execute(sqla.delete(TestOutboxModel)) await session.commit() yield @@ -166,7 +166,7 @@ async def test_get_new_event_positive(self, outbox_repo): await outbox_repo.commit() assert event - assert event.id == event_over_get_all_events_method.id # noqa + assert event.id == event_over_get_all_events_method.id async def test_get_new_event_negative(self, outbox_repo): """ From 6525ec31742c6a6e0464d51c13af87f53238eea4 Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Thu, 19 Feb 2026 15:11:58 +0300 Subject: [PATCH 08/15] tests fix --- tests/integration/sqlalchemy/test_event_outbox.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/sqlalchemy/test_event_outbox.py b/tests/integration/sqlalchemy/test_event_outbox.py index 803977f..1435b86 100644 --- a/tests/integration/sqlalchemy/test_event_outbox.py +++ b/tests/integration/sqlalchemy/test_event_outbox.py @@ -72,12 +72,12 @@ def outbox_repo(session): class TestOutbox: @pytest.fixture(autouse=True) - async def setup_db(self, engine): + async def _setup_db(self, engine): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) @pytest.fixture(autouse=True) - async def cleanup_table(self, session, setup_db): + async def cleanup_table(self, session, _setup_db): await session.execute(sqla.delete(TestOutboxModel)) await session.commit() yield From d0af53dcb00300055a998911cde581d78b9461dc Mon Sep 17 00:00:00 2001 From: Dioneya <44917218+Dioneya@users.noreply.github.com> Date: Thu, 19 Feb 2026 15:15:16 +0300 Subject: [PATCH 09/15] Apply suggestion from @coderabbitai[bot] Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- tests/integration/sqlalchemy/conftest.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/integration/sqlalchemy/conftest.py b/tests/integration/sqlalchemy/conftest.py index 44208a6..a79a274 100644 --- a/tests/integration/sqlalchemy/conftest.py +++ b/tests/integration/sqlalchemy/conftest.py @@ -46,6 +46,7 @@ async def init_saga_orm(engine): yield -@pytest.fixture(scope="session") -def saga_session_factory(saga_engine, init_saga_orm): +`@pytest.fixture`(scope="session") +def saga_session_factory(engine, _init_saga_orm): + return sqla_async.async_sessionmaker(engine, expire_on_commit=False) return sqla_async.async_sessionmaker(saga_engine, expire_on_commit=False) \ No newline at end of file From 7b6db8a6b0e783e7132de06451c3c2a14fe9fc39 Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Thu, 19 Feb 2026 15:18:42 +0300 Subject: [PATCH 10/15] tests fix --- tests/integration/sqlalchemy/conftest.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/integration/sqlalchemy/conftest.py b/tests/integration/sqlalchemy/conftest.py index 44208a6..a45f316 100644 --- a/tests/integration/sqlalchemy/conftest.py +++ b/tests/integration/sqlalchemy/conftest.py @@ -1,5 +1,6 @@ import pytest import os +from sqlalchemy import event import asyncio from sqlalchemy.ext import asyncio as sqla_async import dotenv @@ -7,13 +8,6 @@ dotenv.load_dotenv() DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///:memory:") -@pytest.fixture(scope="session") -def event_loop(): - loop = asyncio.new_event_loop() - yield loop - loop.close() - - @pytest.fixture(scope="session") async def engine(): engine = sqla_async.create_async_engine(url=DATABASE_URL, pool_pre_ping=True) @@ -28,6 +22,12 @@ async def session(engine): session_maker = sqla_async.async_sessionmaker(bind=connection, expire_on_commit=False) session = session_maker() + await session.begin_nested() + + @event.listens_for(session.sync_session, "after_transaction_end") + def restart_savepoint(sess, trans): + if trans.nested and trans.parent and not trans.parent.nested: + sess.begin_nested() yield session From 76c7dbd8fd77fc37e289e23e3d58f1724b6202cf Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Thu, 19 Feb 2026 15:19:05 +0300 Subject: [PATCH 11/15] tests fix --- tests/integration/sqlalchemy/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/sqlalchemy/conftest.py b/tests/integration/sqlalchemy/conftest.py index 1ea74b5..4ff4526 100644 --- a/tests/integration/sqlalchemy/conftest.py +++ b/tests/integration/sqlalchemy/conftest.py @@ -46,7 +46,7 @@ async def init_saga_orm(engine): yield -`@pytest.fixture`(scope="session") +@pytest.fixture(scope="session") def saga_session_factory(engine, _init_saga_orm): return sqla_async.async_sessionmaker(engine, expire_on_commit=False) return sqla_async.async_sessionmaker(saga_engine, expire_on_commit=False) \ No newline at end of file From 6b61232b69ee649904c84224463f4693a33fa4da Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Thu, 19 Feb 2026 15:21:46 +0300 Subject: [PATCH 12/15] tests fix --- tests/integration/sqlalchemy/conftest.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/integration/sqlalchemy/conftest.py b/tests/integration/sqlalchemy/conftest.py index 4ff4526..447a966 100644 --- a/tests/integration/sqlalchemy/conftest.py +++ b/tests/integration/sqlalchemy/conftest.py @@ -6,7 +6,7 @@ import dotenv dotenv.load_dotenv() -DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///:memory:") +DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./.cqrs_test.db") @pytest.fixture(scope="session") async def engine(): @@ -48,5 +48,4 @@ async def init_saga_orm(engine): @pytest.fixture(scope="session") def saga_session_factory(engine, _init_saga_orm): - return sqla_async.async_sessionmaker(engine, expire_on_commit=False) - return sqla_async.async_sessionmaker(saga_engine, expire_on_commit=False) \ No newline at end of file + return sqla_async.async_sessionmaker(engine, expire_on_commit=False) \ No newline at end of file From c04200c2eb6b2a793c5fb80ac4d3e3ccbdce8025 Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Thu, 19 Feb 2026 17:06:09 +0300 Subject: [PATCH 13/15] fix sys import --- src/cqrs/saga/storage/sqlalchemy.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cqrs/saga/storage/sqlalchemy.py b/src/cqrs/saga/storage/sqlalchemy.py index 6cf6ffb..4c6db98 100644 --- a/src/cqrs/saga/storage/sqlalchemy.py +++ b/src/cqrs/saga/storage/sqlalchemy.py @@ -3,6 +3,7 @@ import os import typing import uuid +import sys from cqrs.dispatcher.exceptions import SagaConcurrencyError from cqrs.saga.storage.enums import SagaStatus, SagaStepStatus from cqrs.saga.storage.models import SagaLogEntry From 52f67aaa0f23837146054bce74f72fd56176e964 Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Thu, 19 Feb 2026 17:06:51 +0300 Subject: [PATCH 14/15] fix sys import --- src/cqrs/outbox/sqlalchemy.py | 2 +- src/cqrs/saga/storage/sqlalchemy.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index 556240c..c2006f9 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -3,7 +3,7 @@ import typing import uuid import orjson - +import sys import cqrs from cqrs import compressors from cqrs.outbox import map, repository diff --git a/src/cqrs/saga/storage/sqlalchemy.py b/src/cqrs/saga/storage/sqlalchemy.py index 4c6db98..6cf6ffb 100644 --- a/src/cqrs/saga/storage/sqlalchemy.py +++ b/src/cqrs/saga/storage/sqlalchemy.py @@ -3,7 +3,6 @@ import os import typing import uuid -import sys from cqrs.dispatcher.exceptions import SagaConcurrencyError from cqrs.saga.storage.enums import SagaStatus, SagaStepStatus from cqrs.saga.storage.models import SagaLogEntry From 608d0a9dea3d7b50f04b0816daacad517aaf6975 Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Fri, 20 Feb 2026 11:18:21 +0300 Subject: [PATCH 15/15] fix import --- src/cqrs/outbox/sqlalchemy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index c2006f9..68d0f34 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -16,7 +16,7 @@ try: import sqlalchemy from sqlalchemy import func - from sqlalchemy.orm import Mapped, mapped_column, declared_attr + from sqlalchemy.orm import Mapped, mapped_column, declared_attr, DeclarativeMeta from sqlalchemy.ext.asyncio import session as sql_session from sqlalchemy.dialects import postgresql except ImportError: @@ -280,7 +280,7 @@ async def rollback(self): await self.session.rollback() -@depricated("This function is deprecated; use `OutboxModelMixin` instead") +@deprecated("This function is deprecated; use `OutboxModelMixin` instead") def rebind_outbox_model( model: typing.Any, new_base: DeclarativeMeta,