diff --git a/pyproject.toml b/pyproject.toml index 5f74f6f..9896e07 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,9 +21,9 @@ dependencies = [ "dependency-injector>=4.0", "orjson==3.*", "pydantic==2.*", + "sqlalchemy[asyncio]==2.0.*", "python-dotenv==1.*", "retry-async==0.1.*", - "sqlalchemy[asyncio]==2.0.*", "typing-extensions>=4.0" ] description = "Event-Driven Architecture Framework for Distributed Systems" @@ -31,7 +31,7 @@ maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}] name = "python-cqrs" readme = "README.md" requires-python = ">=3.10" -version = "4.10.0" +version = "4.10.1" [project.optional-dependencies] aiobreaker = ["aiobreaker>=0.3.0"] @@ -68,6 +68,7 @@ examples = [ ] kafka = ["aiokafka==0.10.0"] rabbit = ["aio-pika==9.3.0"] +sqlalchemy = ["sqlalchemy[asyncio]==2.0.*"] [project.urls] Documentation = "https://mkdocs.python-cqrs.dev/" diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index b4f8bc4..3bb7c40 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -1,18 +1,29 @@ +import datetime import logging import typing import dotenv import orjson -import sqlalchemy -from sqlalchemy import func -from sqlalchemy.dialects import mysql -from sqlalchemy.ext.asyncio import session as sql_session -from sqlalchemy.orm import DeclarativeMeta, registry - import cqrs +import uuid from cqrs import compressors from cqrs.outbox import map, repository +try: + import sqlalchemy + + from sqlalchemy import func + from sqlalchemy.orm import Mapped, mapped_column, DeclarativeMeta, registry + from sqlalchemy.ext.asyncio import session as sql_session + from sqlalchemy.dialects import postgresql +except ImportError: + raise ImportError( + "You are trying to use SQLAlchemy outbox implementation, " + "but 'sqlalchemy' is not installed. " + "Please install it using: pip install python-cqrs[sqlalchemy]", + ) + + Base = registry().generate_base() logger = logging.getLogger(__name__) @@ -24,6 +35,39 @@ MAX_FLUSH_COUNTER_VALUE = 5 +class BinaryUUID(sqlalchemy.TypeDecorator): + """Stores the UUID as a native UUID in Postgres and as BINARY(16) in other databases (MySQL).""" + + impl = sqlalchemy.BINARY(16) + cache_ok = True + + def load_dialect_impl(self, dialect): + if dialect.name == "postgresql": + return dialect.type_descriptor(postgresql.UUID()) + else: + return dialect.type_descriptor(sqlalchemy.BINARY(16)) + + def process_bind_param(self, value, dialect): + if value is None: + return value + if dialect.name == "postgresql": + return value # asyncpg work with uuid.UUID + if isinstance(value, str): + value = uuid.UUID(value) + if isinstance(value, uuid.UUID): + return value.bytes # For MySQL return 16 bytes + return value + + def process_result_value(self, value, dialect): + if value is None: + return value + if dialect.name == "postgresql": + return value # asyncpg return uuid.UUID + if isinstance(value, bytes): + return uuid.UUID(bytes=value) # From MySQL got bytes, make UUID + return value + + class OutboxModel(Base): __tablename__ = DEFAULT_OUTBOX_TABLE_NAME @@ -34,55 +78,55 @@ class OutboxModel(Base): name="event_id_unique_index", ), ) - id = sqlalchemy.Column( - sqlalchemy.BigInteger(), + id: Mapped[int] = mapped_column( + sqlalchemy.BigInteger, sqlalchemy.Identity(), primary_key=True, nullable=False, autoincrement=True, comment="Identity", ) - event_id = sqlalchemy.Column( - sqlalchemy.Uuid, + event_id: Mapped[uuid.UUID] = mapped_column( + BinaryUUID, nullable=False, comment="Event idempotency id", ) - event_id_bin = sqlalchemy.Column( + event_id_bin: Mapped[bytes] = mapped_column( sqlalchemy.BINARY(16), nullable=False, comment="Event idempotency id in 16 bit presentation", ) - event_status = sqlalchemy.Column( + event_status: Mapped[repository.EventStatus] = mapped_column( sqlalchemy.Enum(repository.EventStatus), nullable=False, default=repository.EventStatus.NEW, comment="Event producing status", ) - flush_counter = sqlalchemy.Column( - sqlalchemy.SmallInteger(), + flush_counter: Mapped[int] = mapped_column( + sqlalchemy.SmallInteger, nullable=False, default=0, comment="Event producing flush counter", ) - event_name = sqlalchemy.Column( + event_name: Mapped[typing.Text] = mapped_column( sqlalchemy.String(255), nullable=False, comment="Event name", ) - topic = sqlalchemy.Column( + topic: Mapped[typing.Text] = mapped_column( sqlalchemy.String(255), nullable=False, comment="Event topic", default="", ) - created_at = sqlalchemy.Column( + created_at: Mapped[datetime.datetime] = mapped_column( sqlalchemy.DateTime, nullable=False, server_default=func.now(), comment="Event creation timestamp", ) - payload = sqlalchemy.Column( - mysql.BLOB, + payload: Mapped[bytes] = mapped_column( + sqlalchemy.LargeBinary, nullable=False, default={}, comment="Event payload", @@ -174,7 +218,7 @@ def add( self.session.add( OutboxModel( event_id=event.event_id, - event_id_bin=func.UUID_TO_BIN(event.event_id), + event_id_bin=event.event_id.bytes, event_name=event.event_name, created_at=event.event_timestamp, payload=bytes_payload, diff --git a/src/cqrs/saga/storage/sqlalchemy.py b/src/cqrs/saga/storage/sqlalchemy.py index 99e2d58..e9edc50 100644 --- a/src/cqrs/saga/storage/sqlalchemy.py +++ b/src/cqrs/saga/storage/sqlalchemy.py @@ -6,17 +6,24 @@ import uuid import dotenv -import sqlalchemy -from sqlalchemy import func -from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker -from sqlalchemy.orm import registry - from cqrs.dispatcher.exceptions import SagaConcurrencyError from cqrs.saga.storage.enums import SagaStatus, SagaStepStatus from cqrs.saga.storage.models import SagaLogEntry from cqrs.saga.storage.protocol import ISagaStorage, SagaStorageRun +try: + import sqlalchemy + from sqlalchemy import func + from sqlalchemy.exc import SQLAlchemyError + from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + from sqlalchemy.orm import registry +except ImportError: + raise ImportError( + "You are trying to use SQLAlchemy saga storage implementation, " + "but 'sqlalchemy' is not installed. " + "Please install it using: pip install python-cqrs[sqlalchemy]" + ) + Base = registry().generate_base() logger = logging.getLogger(__name__)