Skip to content

Comments

[Refactor] Outbox Architecture Overhaul: PostgreSQL Support, Mixin Models, and UUID v7#33

Open
Dioneya wants to merge 18 commits intopypatterns:masterfrom
Dioneya:sqlalchemy-type-fix
Open

[Refactor] Outbox Architecture Overhaul: PostgreSQL Support, Mixin Models, and UUID v7#33
Dioneya wants to merge 18 commits intopypatterns:masterfrom
Dioneya:sqlalchemy-type-fix

Conversation

@Dioneya
Copy link

@Dioneya Dioneya commented Jan 9, 2026

  • Implement OutboxModelMixin.
  • Implement BinaryUUID for PostgreSQL and MySQL compatibility.
  • Switch ID generation to uuid7.
  • Make sqlalchemy an optional dependency.
  • Update tests (using testcontainers or mocks).
  • Update documentation (README).

Summary by CodeRabbit

  • New Features
    • Outbox mixin and cross-dialect UUID storage to improve transactional outbox support.
  • Documentation
    • Added a guide with examples for implementing the Transactional Outbox pattern.
  • Improvements
    • Event IDs now use UUID7 for better ordering; SQLAlchemy usage now guarded with clearer installation messaging; runtime dependency adjustments (uuid6 added, async-SQLAlchemy handling changed).
  • Tests
    • Integration tests reworked to use isolated async SQLAlchemy fixtures and updated outbox test setup; test fixtures simplified.
  • Version
    • Bumped to 4.8.2

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 30, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Refactors the outbox to a mixin-based ORM model with cross-dialect BinaryUUID, makes SQLAlchemy an optional runtime import, switches notification event IDs to uuid6.uuid7, updates repository constructors to accept a user-provided outbox model, and reorganizes async SQLAlchemy test fixtures and integration tests.

Changes

Cohort / File(s) Summary
Project metadata
pyproject.toml
Added uuid6>=2025.0.1, moved sqlalchemy[asyncio] to dev dependencies, bumped package version and adjusted dependency layout.
Event IDs
src/cqrs/events/event.py
Changed default factory for PydanticNotificationEvent.event_id from uuid.uuid4 to uuid6.uuid7.
Outbox exports
src/cqrs/outbox/__init__.py
Exported OutboxedEventRepository, EventStatus, OutboxedEventMap; conditionally export SqlAlchemyOutboxedEventRepository and OutboxModelMixin when SQLAlchemy import succeeds.
Outbox SQLAlchemy integration
src/cqrs/outbox/sqlalchemy.py
Replaced concrete OutboxModel with OutboxModelMixin and added BinaryUUID TypeDecorator; refactored SqlAlchemyOutboxedEventRepository to accept outbox_model: type[OutboxModelMixin]; added classmethods get_batch_query, update_status_query, status_sorting_case; added runtime import guard and deprecated dynamic rebind helper.
Saga storage import guard
src/cqrs/saga/storage/sqlalchemy.py
Wrapped SQLAlchemy imports in try/except and raise descriptive ImportError if missing.
Top-level test fixtures removed
tests/conftest.py, tests/integration/fixtures.py
Removed global/session-scoped event_loop fixture and legacy async ORM/saga fixtures.
Isolated SQLAlchemy test fixtures
tests/integration/sqlalchemy/conftest.py
Added session-scoped async engine, function-scoped async session, init_saga_orm, and saga_session_factory fixtures with dotenv and DATABASE_URL support for isolated transactional tests.
Integration tests updated
tests/integration/sqlalchemy/test_event_outbox.py, tests/integration/sqlalchemy/test_saga_storage_sqlalchemy.py
Added test Base and TestOutboxModel(Base, sqlalchemy.OutboxModelMixin), an outbox_repo fixture, per-test DB setup/cleanup, and refactored tests to use the new outbox model and repository.
Documentation
README.md
Added example documentation showing Transactional Outbox usage with SQLAlchemy, OutboxModelMixin, and repository usage.

Sequence Diagram(s)

sequenceDiagram
    participant Handler
    participant Repository
    participant Database
    Handler->>Repository: add(event)
    Repository->>Database: INSERT OutboxModelMixin (event_id, event_name, topic, payload, created_at)
    Database-->>Repository: ACK / row id
    Repository-->>Handler: persisted
    Handler->>Repository: get_many(batch, topic?)
    Repository->>Database: SELECT via get_batch_query(size, topic)
    Database-->>Repository: rows
    Repository-->>Handler: list[OutboxedEvent]
    Handler->>Repository: update_status(id, status)
    Repository->>Database: UPDATE via update_status_query(id, status)
    Database-->>Repository: ACK
    Repository-->>Handler: updated
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related issues

  • vadikko2/python-cqrs#32: Implements the same outbox refactor (BinaryUUID, OutboxModelMixin, uuid7 adoption, optional SQLAlchemy, repository signature changes), appears directly related.

Poem

🐇 I tunneled through mixins and found UUIDs that hum,
Events now wear uuid7 — a brighter drum.
Outboxes mix and match, SQL optional and neat,
Tests spin in fresh fixtures, transactions complete.
Hop, commit, and publish — a rabbit's small feat.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.81% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and clearly reflects the main changes: outbox architecture refactoring, mixin-based models, UUID v7 adoption, and PostgreSQL support via BinaryUUID.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/cqrs/__init__.py (1)

16-61: ⚠️ Potential issue | 🟠 Major

Guard SQLAlchemy import to keep base package usable without the optional dependency.

Line 16 (and line 50 in __all__): Importing cqrs.outbox.sqlalchemy unconditionally breaks when SQLAlchemy isn't installed. Since sqlalchemy is an optional dependency (defined in pyproject.toml lines 71-73, not in main dependencies), users installing without the [sqlalchemy] extra will fail on import cqrs. Use the same conditional import pattern already established in cqrs/outbox/__init__.py (lines 10-19).

🛠️ Suggested fix
-from cqrs.outbox.sqlalchemy import SqlAlchemyOutboxedEventRepository
+try:
+    from cqrs.outbox.sqlalchemy import SqlAlchemyOutboxedEventRepository
+except ImportError:
+    SqlAlchemyOutboxedEventRepository = None
@@
-__all__ = (
+__all__ = [
@@
-    "OutboxedEventRepository",
-    "SqlAlchemyOutboxedEventRepository",
+    "OutboxedEventRepository",
@@
-)
+]
+if SqlAlchemyOutboxedEventRepository is not None:
+    __all__.append("SqlAlchemyOutboxedEventRepository")
README.md (1)

470-528: ⚠️ Potential issue | 🟡 Minor

Fix the outbox example to match the actual repository API.

The code example incorrectly passes session to self.outbox.add() and self.outbox.commit(). The SqlAlchemyOutboxedEventRepository stores the session at initialization and binds both methods to that session—neither method accepts a session parameter.

📌 Required doc update
-            self.outbox.add(
-                session,
+            self.outbox.add(
                 cqrs.NotificationEvent[UserJoinedNotificationPayload](
                     event_name="UserJoined",
                     topic="user_notification_events",
                     payload=UserJoinedNotificationPayload(
                         user_id=request.user_id,
                         meeting_id=request.meeting_id,
                     ),
                 ),
             )
-            self.outbox.add(
-                session,
+            self.outbox.add(
                 cqrs.NotificationEvent[UserJoinedECSTPayload](
                     event_name="UserJoined",
                     topic="user_ecst_events",
                     payload=UserJoinedECSTPayload(
                         user_id=request.user_id,
                         meeting_id=request.meeting_id,
                     ),
                 ),
             )
-            await self.outbox.commit(session)
+            await self.outbox.commit()
🤖 Fix all issues with AI agents
In `@tests/integration/sqlalchemy/conftest.py`:
- Around line 49-50: The fixture definition saga_session_factory currently
declares a non-existent parameter saga_engine and uses init_saga_orm as a used
name; change the parameter to engine (the real fixture) and rename init_saga_orm
to _init_saga_orm to mark it intentionally unused while keeping the dependency
so saga tables are initialized before saga_session_factory runs.
- Around line 7-9: The saga_session_factory fixture references an undefined
parameter name: replace the incorrect saga_engine parameter with engine in the
saga_session_factory function signature so pytest can resolve the fixture
(search for saga_session_factory and change its parameter to engine); also
update the DATABASE_URL default from the in-memory URI
("sqlite+aiosqlite:///:memory:") to a file-based/shared-cache URI (e.g.,
"sqlite+aiosqlite:///./.cqrs_test.db") so init_saga_orm-created tables are
visible across connections during tests.
- Around line 24-36: The fixture must use a nested SAVEPOINT so tests can call
session.commit() without ending the outer fixture transaction; change the
session fixture to (1) start the outer connection transaction as you already do,
(2) create the async session via async_sessionmaker(bind=connection,...), (3)
start a nested transaction on the session with await session.begin_nested(), and
(4) register an after-transaction-end event listener on the session (use the
session.sync_session for event.listen) that, when a subtransaction ends and the
transaction has a parent (use the public parent attribute), restarts a new
nested transaction by calling session.begin_nested(); keep yielding the session,
and on teardown close the session and rollback/close the outer connection
transaction as before.

In `@tests/integration/sqlalchemy/test_event_outbox.py`:
- Around line 147-169: The test function test_get_new_event_positive contains a
redundant inline comment "# noqa" on the assert comparing event.id and
event_over_get_all_events_method.id; remove the "# noqa" token from that
assertion line (in the test_get_new_event_positive body where event and
event_over_get_all_events_method are compared) so the linter (Ruff) no longer
flags it as unused, then run the test suite/linter to confirm the warning is
gone.
- Around line 79-83: The cleanup_table fixture uses a non-portable "TRUNCATE
TABLE" SQL and an unused parameter; change the fixture named cleanup_table to
use SQLAlchemy's delete operation against TestOutboxModel (e.g., execute a
sqla.delete(TestOutboxModel) via the session and commit) so it works with
SQLite, and rename the unused setup_db parameter to _setup_db to silence linter
warnings while keeping session and TestOutboxModel references intact.
🧹 Nitpick comments (6)
src/cqrs/saga/storage/sqlalchemy.py (1)

10-21: Chain the ImportError for clearer diagnostics.

Line 10-21: Re-raising without exception chaining drops the original ImportError context; consider from err (or from None) to improve debugging and satisfy Ruff B904.

💡 Suggested change
-except ImportError:
-    raise ImportError(
+except ImportError as err:
+    raise ImportError(
         "You are trying to use SQLAlchemy saga storage implementation, "
         "but 'sqlalchemy' is not installed. "
         "Please install it using: pip install python-cqrs[sqlalchemy]"
-    )
+    ) from err
pyproject.toml (1)

24-27: Confirm whether broker clients should be core deps.

Line 24-27 and Line 49-54: Adding aio-pika and confluent-kafka to core dependencies forces native client libs for all installs. If these transports are meant to stay optional (extras already exist), consider keeping them only in extras.

♻️ Optional change (if brokers should remain optional)
-  "aio-pika>=9.3.0",
-  "confluent-kafka>=2.6.0",

Also applies to: 49-54

src/cqrs/outbox/sqlalchemy.py (4)

12-23: Chain the ImportError in the optional SQLAlchemy guard.

Line 12-23: Re-raising without exception chaining drops the original ImportError context; consider from err (or from None) to improve diagnostics.

💡 Suggested change
-except ImportError:
-    raise ImportError(
+except ImportError as err:
+    raise ImportError(
         "You are trying to use SQLAlchemy outbox implementation, "
         "but 'sqlalchemy' is not installed. "
         "Please install it using: pip install python-cqrs[sqlalchemy]"
-    )
+    ) from err

31-58: Normalize UUID types across dialects.

Line 31-58: Depending on SQLAlchemy/driver settings, Postgres UUIDs can come back as strings and MySQL can return bytearray/memoryview. Consider explicitly requesting as_uuid=True and normalizing bytes-like values to avoid type drift.

💡 Suggested change
-        if dialect.name == "postgresql":
-            return dialect.type_descriptor(postgresql.UUID())
+        if dialect.name == "postgresql":
+            return dialect.type_descriptor(postgresql.UUID(as_uuid=True))
@@
-        if isinstance(value, uuid.UUID):
-            return value.bytes # For MySQL return 16 bytes
+        if isinstance(value, str):
+            value = uuid.UUID(value)
+        if isinstance(value, uuid.UUID):
+            return value.bytes  # For MySQL return 16 bytes
@@
-        if isinstance(value, bytes):
-            return uuid.UUID(bytes=value) # From MySQL got bytes, make UUID
+        if isinstance(value, (bytes, bytearray, memoryview)):
+            return uuid.UUID(bytes=bytes(value))  # Normalize bytes-like

113-121: Avoid unique-constraint name collisions.

Line 113-121: The fixed constraint name can collide if multiple OutboxModelMixin subclasses share the same schema. Consider incorporating the table name (or omit the explicit name and rely on naming conventions).

💡 Suggested change
-    def __table_args__(self):
+    def __table_args__(cls):
         return (
             sqlalchemy.UniqueConstraint(
                 "event_id",
                 "event_name",
-                name="event_id_unique_index",
+                name=f"{cls.__tablename__}_event_id_unique",
             ),
         )

222-227: Confirm behavior for unknown event names.

Line 222-227: Returning None means rows with unknown event_name will be skipped and remain in the outbox. If you want to avoid repeated warnings or stuck rows, consider marking them or surfacing an error.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (5)
src/cqrs/events/event.py (1)

293-297: ⚠️ Potential issue | 🟠 Major

DCNotificationEvent was not updated — inconsistent UUID versions between the two INotificationEvent implementations.

The PR explicitly targets switching all event ID generation to UUID v7, but only PydanticNotificationEvent (line 354) was updated. DCNotificationEvent.event_id at line 293 still uses uuid.uuid4, so callers using the dataclass-based path continue to receive UUID v4 while callers using the Pydantic path receive UUID v7. Both classes implement the same INotificationEvent interface, and this divergence is a silent behavioral inconsistency.

🔧 Proposed fix for `DCNotificationEvent`
-    event_id: uuid.UUID = dataclasses.field(default_factory=uuid.uuid4)
+    event_id: uuid.UUID = dataclasses.field(default_factory=uuid6.uuid7)

Also applies to: 354-354

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/events/event.py` around lines 293 - 297, DCNotificationEvent
currently assigns event_id using uuid.uuid4 causing a mismatch with
PydanticNotificationEvent which uses UUID v7; update the dataclass field
DCNotificationEvent.event_id to use uuid.uuid7 as the default_factory (and add
or adjust the uuid import if uuid.uuid7 isn't already available) so both
INotificationEvent implementations produce UUID v7 consistently.
pyproject.toml (1)

36-60: ⚠️ Potential issue | 🟠 Major

Missing sqlalchemy entry in [project.optional-dependencies].

The PR objective is to "Make SQLAlchemy an optional dependency," but sqlalchemy[asyncio] has only been moved to dev — no installable optional extra is exposed to end users. Anyone using the SQLAlchemy outbox integration would need to know to install SQLAlchemy manually, and pip install python-cqrs[sqlalchemy] will silently do nothing. The existing kafka and rabbit extras set the right precedent.

Additionally, given that the PR description mentions PostgreSQL support, consider whether a PostgreSQL async driver (e.g. asyncpg) should also be included as an optional extra or bundled with the sqlalchemy extra.

♻️ Proposed fix
 [project.optional-dependencies]
 aiobreaker = ["aiobreaker>=0.3.0"]
+sqlalchemy = ["sqlalchemy[asyncio]>=2.0,<3"]
 dev = [
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pyproject.toml` around lines 36 - 60, The optional-dependencies table removes
the public sqlalchemy extra but only kept sqlalchemy[asyncio] under the dev
extra, so installing the package with an extra (e.g. pip install
python-cqrs[sqlalchemy]) does nothing; add a proper optional extra named
"sqlalchemy" under [project.optional-dependencies] that includes
"sqlalchemy[asyncio]==2.0.*" (and optionally the async PostgreSQL driver like
"asyncpg" if you want to advertise Postgres support) so consumers can opt into
the SQLAlchemy-based outbox integration without manually installing
dependencies.
README.md (3)

606-651: ⚠️ Potential issue | 🟠 Major

Outdated handler example uses the old API — will produce TypeError at runtime.

The pre-existing handler example at lines 609–651 uses the old API that no longer exists:

  • async with self.outbox as session:OutboxedEventRepository has no __aenter__/__aexit__
  • self.outbox.add(session, event)add() now takes only event
  • await self.outbox.commit(session)commit() now takes no arguments

The new API (from SqlAlchemyOutboxedEventRepository) should be reflected:

async def handle(self, request: JoinMeetingCommand) -> None:
    self.outbox.add(
        cqrs.NotificationEvent[UserJoinedNotificationPayload](
            event_name="UserJoined",
            topic="user_notification_events",
            payload=UserJoinedNotificationPayload(...),
        ),
    )
    await self.outbox.commit()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 606 - 651, The README handler example uses the old
OutboxedEventRepository API and will TypeError at runtime; update the
JoinMeetingCommandHandler.handle implementation to stop using the async context
manager (remove "async with self.outbox as session") and stop passing a session
into self.outbox.add and self.outbox.commit: call self.outbox.add(...) with just
the NotificationEvent instances and then await self.outbox.commit() with no
arguments, leaving your business DB operations (do_some_logic) outside of an
unavailable repository context or performed via the actual DB session management
used elsewhere.

686-691: ⚠️ Potential issue | 🟠 Major

SqlAlchemyOutboxedEventRepository constructor call is missing the now-required outbox_model argument.

Line 689:

cqrs.SqlAlchemyOutboxedEventRepository(session_factory, zlib.ZlibCompressor())

The constructor signature changed to __init__(self, session, outbox_model, compressor=None). Passing ZlibCompressor() as the second positional argument now binds it to outbox_model, not compressor, causing a runtime error. This example needs to be updated to pass the concrete model class:

cqrs.SqlAlchemyOutboxedEventRepository(
    session,
    outbox_model=OutboxModel,
    compressor=zlib.ZlibCompressor(),
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 686 - 691, The README example calls
cqrs.SqlAlchemyOutboxedEventRepository(session_factory, zlib.ZlibCompressor())
but the constructor is now __init__(self, session, outbox_model,
compressor=None), so the compressor is being passed as the outbox_model; update
the call to pass the concrete outbox model class as the second argument and the
compressor as the third (e.g., use session_factory and OutboxModel for the
outbox_model parameter and zlib.ZlibCompressor() for compressor), or use
explicit keyword args outbox_model=OutboxModel and
compressor=zlib.ZlibCompressor() to avoid positional confusion.

656-658: ⚠️ Potential issue | 🟡 Minor

Stale OUTBOX_SQLA_TABLE environment variable tip.

The tip at line 657 states the outbox table name can be set via the OUTBOX_SQLA_TABLE environment variable, but sqlalchemy.py uses the hardcoded constant DEFAULT_OUTBOX_TABLE_NAME = "outbox" with no os.getenv lookup. This feature no longer exists; the table name is now configured by overriding __tablename__ in the concrete model class.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 656 - 658, The README's tip about the
OUTBOX_SQLA_TABLE environment variable is stale; update the README to remove or
replace the sentence referring to OUTBOX_SQLA_TABLE and instead document that
the table name is determined by the concrete model's __tablename__ override (and
that DEFAULT_OUTBOX_TABLE_NAME = "outbox" in sqlalchemy.py is only a fallback
constant, not an env-configurable value). Reference DEFAULT_OUTBOX_TABLE_NAME in
sqlalchemy.py and the __tablename__ mechanism so readers know how to set a
custom table name.
🧹 Nitpick comments (7)
src/cqrs/events/event.py (1)

7-8: Consider using the stdlib uuid.uuid7() on Python 3.14+ to avoid the third-party dependency.

Python 3.14 added support for UUID versions 6, 7, and 8, meaning the uuid6 package becomes redundant on that version. If the project plans to support Python 3.14+, the dependency can be conditionally avoided:

if sys.version_info >= (3, 14):
    _uuid7 = uuid.uuid7  # type: ignore[attr-defined]
else:
    import uuid6
    _uuid7 = uuid6.uuid7

Then reference _uuid7 as the default_factory in both event classes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/events/event.py` around lines 7 - 8, Replace the unconditional
import of the third‑party uuid6 with a conditional resolver that uses the stdlib
uuid.uuid7 when running on Python 3.14+ and falls back to uuid6 otherwise: add
an import of sys, set a local _uuid7 = uuid.uuid7 if sys.version_info >= (3,14)
else import uuid6 and set _uuid7 = uuid6.uuid7, then use _uuid7 as the
default_factory for the UUID fields in the event classes (the
constructors/fields where uuid6.uuid7 is currently referenced) and remove the
plain uuid6 import.
src/cqrs/saga/storage/sqlalchemy.py (3)

28-28: Module-level dotenv.load_dotenv() is a side effect on import.

Every consumer that imports this module triggers .env file loading, which can silently override programmatically set environment variables (dotenv loads with override=False by default, but the file is still parsed). This is unchanged code, but since the PR is already restructuring this module, consider deferring load_dotenv() to an explicit initialization step or documenting the behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/saga/storage/sqlalchemy.py` at line 28, The module currently calls
dotenv.load_dotenv() at import time, causing a side effect whenever this module
is imported; remove that top-level call and instead provide an explicit
initialization entry point (e.g., a function named init_sqlalchemy_storage or
load_env_for_sqlalchemy) or invoke dotenv.load_dotenv() inside the
SQLAlchemySagaStorage constructor so callers control when .env is parsed; update
references to dotenv.load_dotenv() and SQLAlchemySagaStorage to call the new
initializer from application startup rather than relying on import-time
behavior.

11-23: Suppress exception chaining in the guarded import for cleaner tracebacks.

The raise ImportError(...) inside the except ImportError block implicitly chains to the original exception, producing a longer traceback. Since the replacement message is self-contained and provides complete guidance, use from None to suppress the chaining:

Suggested improvement
 except ImportError:
     raise ImportError(
         "You are trying to use SQLAlchemy saga storage implementation, "
         "but 'sqlalchemy' is not installed. "
         "Please install it using: pip install python-cqrs[sqlalchemy]"
-    )
+    ) from None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/saga/storage/sqlalchemy.py` around lines 11 - 23, The ImportError
raised in the guarded import block should suppress exception chaining to avoid
noisy tracebacks; update the except ImportError block that re-raises the
replacement ImportError (the block around the sqlalchemy imports in
src/cqrs/saga/storage/sqlalchemy.py) to raise the new ImportError with "from
None" (i.e., raise ImportError(...) from None) so the original ImportError is
not chained into the traceback.

25-25: Consider modernizing to DeclarativeBase for alignment with SQLAlchemy 2.0+ best practices.

While registry().generate_base() still works in SQLAlchemy 2.0+, it is no longer the recommended approach. SQLAlchemy 2.0+ favors the DeclarativeBase pattern for better typing support and modernization.

Modern alternative
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/saga/storage/sqlalchemy.py` at line 25, Replace the old
registry-based base with the SQLAlchemy 2.0 DeclarativeBase pattern: import
DeclarativeBase from sqlalchemy.orm, replace the single line Base =
registry().generate_base() with a class definition class Base(DeclarativeBase):
pass, and ensure all ORM models that previously inherited from Base continue to
inherit from this new Base; update imports accordingly so DeclarativeBase is
used instead of registry().generate_base().
pyproject.toml (1)

26-26: Pin uuid6 to a compatible release range — unbounded >= risks pulling in a future breaking version.

Starting with version 2025.0.0, the uuid6 project transitions from Calendar Versioning to Semantic Versioning. The versioning scheme is YEAR.MINOR.PATCH (e.g. 2025.0.0, 2025.1.0, 2025.0.1). A bare >=2025.0.1 therefore accepts any hypothetical 2026.0.0 release, which could contain breaking API changes. For a published library, use a compatible-release specifier or explicit upper cap:

♻️ Proposed fix
-  "uuid6>=2025.0.1",
+  "uuid6>=2025.0.1,<2026",
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pyproject.toml` at line 26, The dependency entry "uuid6>=2025.0.1" is
unbounded and may pull in incompatible future major versions; update the
constraint for the uuid6 dependency to a compatible-release or explicit upper
bound (for example use the compatible specifier ~=2025.0.1 or an upper cap like
>=2025.0.1,<2026.0.0) so builds remain safe while allowing patch/minor updates;
locate and update the uuid6 dependency line in pyproject.toml accordingly.
src/cqrs/outbox/sqlalchemy.py (2)

113-121: __table_args__ should use @declared_attr.directive; hardcoded constraint name will conflict when multiple models inherit this mixin.

Two separate concerns:

  1. Decorator inconsistency: __tablename__ correctly uses @declared_attr.directive (line 62), but __table_args__ uses plain @declared_attr. For non-mapped class-level directives in SQLAlchemy 2.0, @declared_attr.directive is the semantically correct choice.

  2. Hardcoded constraint name: The name "event_id_unique_index" is global to the schema. If two concrete models inherit OutboxModelMixin and live in the same PostgreSQL/MySQL schema (e.g., a multi-tenant setup or side-by-side migration), CREATE TABLE will fail with a duplicate constraint name.

♻️ Proposed fix
-    `@declared_attr`
-    def __table_args__(self):
-        return (
-            sqlalchemy.UniqueConstraint(
-                "event_id",
-                "event_name",
-                name="event_id_unique_index",
-            ),
-        )
+    `@declared_attr.directive`
+    def __table_args__(cls):
+        return (
+            sqlalchemy.UniqueConstraint(
+                "event_id",
+                "event_name",
+                name=f"{cls.__tablename__}_event_id_unique_index",
+            ),
+        )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 113 - 121, The __table_args__ in
OutboxModelMixin is using `@declared_attr` and a hardcoded constraint name which
will conflict across multiple concrete models; change the decorator to
`@declared_attr.directive` to match __tablename__ semantics and make the
UniqueConstraint name per-class (or omit the name) so it’s not global—e.g.,
inside the __table_args__/OutboxModelMixin declared attr compute the name
dynamically from cls.__tablename__ (or use no explicit name) and return the
UniqueConstraint("event_id", "event_name", name=dynamic_name) so each inheriting
model gets a unique constraint name.

172-181: Migrate to SQLAlchemy 2.0 positional CASE form.

The project targets SQLAlchemy 2.0 (sqlalchemy[asyncio]==2.0.*), which deprecates the dict-based case() syntax. Use the positional tuple form instead:

♻️ Suggested fix
    `@classmethod`
    def status_sorting_case(cls) -> sqlalchemy.Case:
        return sqlalchemy.case(
-           {
-               repository.EventStatus.NEW: 1,
-               repository.EventStatus.NOT_PRODUCED: 2,
-               repository.EventStatus.PRODUCED: 3,
-           },
-           value=cls.event_status,
-           else_=4,
+           (cls.event_status == repository.EventStatus.NEW, 1),
+           (cls.event_status == repository.EventStatus.NOT_PRODUCED, 2),
+           (cls.event_status == repository.EventStatus.PRODUCED, 3),
+           else_=4,
        )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 172 - 181, The dict-based
sqlalchemy.case usage in status_sorting_case is deprecated for SQLAlchemy 2.0;
replace the dict with the positional list/tuple of (when, then) pairs while
keeping value=cls.event_status and else_=4. Update the sqlalchemy.case call
inside status_sorting_case to pass an ordered sequence of tuples for
repository.EventStatus.NEW/NOT_PRODUCED/PRODUCED mapping to 1/2/3 respectively,
preserving cls.event_status as the value argument and else_=4.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Line 6: Remove the unused top-level import "import uuid6" from this module; it
is not referenced anywhere (event IDs come from event.event_id already a
uuid.UUID), so delete that import line to eliminate dead code and unused
dependency.

In `@tests/integration/sqlalchemy/conftest.py`:
- Around line 10-14: Remove the redundant session-scoped pytest fixture named
event_loop from conftest.py: delete the entire fixture definition (the function
def event_loop() that creates asyncio.new_event_loop(), yields it, and closes
it) since pytest-asyncio is configured with asyncio_mode = "auto" and will
manage the loop; ensure no other test code depends on this custom event_loop
fixture before removing it.

---

Outside diff comments:
In `@pyproject.toml`:
- Around line 36-60: The optional-dependencies table removes the public
sqlalchemy extra but only kept sqlalchemy[asyncio] under the dev extra, so
installing the package with an extra (e.g. pip install python-cqrs[sqlalchemy])
does nothing; add a proper optional extra named "sqlalchemy" under
[project.optional-dependencies] that includes "sqlalchemy[asyncio]==2.0.*" (and
optionally the async PostgreSQL driver like "asyncpg" if you want to advertise
Postgres support) so consumers can opt into the SQLAlchemy-based outbox
integration without manually installing dependencies.

In `@README.md`:
- Around line 606-651: The README handler example uses the old
OutboxedEventRepository API and will TypeError at runtime; update the
JoinMeetingCommandHandler.handle implementation to stop using the async context
manager (remove "async with self.outbox as session") and stop passing a session
into self.outbox.add and self.outbox.commit: call self.outbox.add(...) with just
the NotificationEvent instances and then await self.outbox.commit() with no
arguments, leaving your business DB operations (do_some_logic) outside of an
unavailable repository context or performed via the actual DB session management
used elsewhere.
- Around line 686-691: The README example calls
cqrs.SqlAlchemyOutboxedEventRepository(session_factory, zlib.ZlibCompressor())
but the constructor is now __init__(self, session, outbox_model,
compressor=None), so the compressor is being passed as the outbox_model; update
the call to pass the concrete outbox model class as the second argument and the
compressor as the third (e.g., use session_factory and OutboxModel for the
outbox_model parameter and zlib.ZlibCompressor() for compressor), or use
explicit keyword args outbox_model=OutboxModel and
compressor=zlib.ZlibCompressor() to avoid positional confusion.
- Around line 656-658: The README's tip about the OUTBOX_SQLA_TABLE environment
variable is stale; update the README to remove or replace the sentence referring
to OUTBOX_SQLA_TABLE and instead document that the table name is determined by
the concrete model's __tablename__ override (and that DEFAULT_OUTBOX_TABLE_NAME
= "outbox" in sqlalchemy.py is only a fallback constant, not an env-configurable
value). Reference DEFAULT_OUTBOX_TABLE_NAME in sqlalchemy.py and the
__tablename__ mechanism so readers know how to set a custom table name.

In `@src/cqrs/events/event.py`:
- Around line 293-297: DCNotificationEvent currently assigns event_id using
uuid.uuid4 causing a mismatch with PydanticNotificationEvent which uses UUID v7;
update the dataclass field DCNotificationEvent.event_id to use uuid.uuid7 as the
default_factory (and add or adjust the uuid import if uuid.uuid7 isn't already
available) so both INotificationEvent implementations produce UUID v7
consistently.

---

Duplicate comments:
In `@tests/integration/sqlalchemy/conftest.py`:
- Around line 49-51: The pytest fixture saga_session_factory currently
references a non-existent fixture name and has an unused parameter; update its
signature to accept engine instead of saga_engine and rename init_saga_orm to
_init_saga_orm to mark it as intentionally unused, then return
sqla_async.async_sessionmaker(engine, expire_on_commit=False) so the correct
fixture is used and the unused-parameter warning is suppressed.
- Around line 24-36: The fixture currently starts a top-level transaction that
tests can commit (via repo.commit()), which breaks the fixture teardown
rollback; change the fixture to use a nested SAVEPOINT so test-level commits
only affect the savepoint: after creating connection = await engine.connect()
and transaction = await connection.begin(), start a nested transaction via
nested = await connection.begin_nested(); create the session via
sqla_async.async_sessionmaker(bind=connection, expire_on_commit=False) and
session = session_maker(); then attach an after-transaction-end listener on the
session (use SQLAlchemy event "after_transaction_end" on the session's
sync_session or appropriate async wrapper) that restarts a new nested savepoint
when the test-local transaction ends; keep the yield and in teardown close the
session, rollback the outer transaction and close the connection as before.

In `@tests/integration/sqlalchemy/test_event_outbox.py`:
- Line 169: Remove the redundant blanket noqa on the assertion line in
tests/integration/sqlalchemy/test_event_outbox.py by deleting the trailing " #
noqa" from the assertion "assert event.id ==
event_over_get_all_events_method.id" so the line reads only the assertion; this
removes the unused RUF100 suppression while leaving the test logic intact.
- Around line 79-83: The fixture cleanup_table uses a TRUNCATE SQL statement
which is unsupported by SQLite and includes an unused parameter setup_db; change
the deletion to a cross-DB-safe operation (e.g., await
session.execute(sqla.text(f"DELETE FROM {TestOutboxModel.__tablename__}")) or
use the ORM bulk delete) and keep committing as before, and remove the unused
setup_db parameter from the fixture signature so cleanup_table(self, session)
(and any references) no longer accept setup_db.

---

Nitpick comments:
In `@pyproject.toml`:
- Line 26: The dependency entry "uuid6>=2025.0.1" is unbounded and may pull in
incompatible future major versions; update the constraint for the uuid6
dependency to a compatible-release or explicit upper bound (for example use the
compatible specifier ~=2025.0.1 or an upper cap like >=2025.0.1,<2026.0.0) so
builds remain safe while allowing patch/minor updates; locate and update the
uuid6 dependency line in pyproject.toml accordingly.

In `@src/cqrs/events/event.py`:
- Around line 7-8: Replace the unconditional import of the third‑party uuid6
with a conditional resolver that uses the stdlib uuid.uuid7 when running on
Python 3.14+ and falls back to uuid6 otherwise: add an import of sys, set a
local _uuid7 = uuid.uuid7 if sys.version_info >= (3,14) else import uuid6 and
set _uuid7 = uuid6.uuid7, then use _uuid7 as the default_factory for the UUID
fields in the event classes (the constructors/fields where uuid6.uuid7 is
currently referenced) and remove the plain uuid6 import.

In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 113-121: The __table_args__ in OutboxModelMixin is using
`@declared_attr` and a hardcoded constraint name which will conflict across
multiple concrete models; change the decorator to `@declared_attr.directive` to
match __tablename__ semantics and make the UniqueConstraint name per-class (or
omit the name) so it’s not global—e.g., inside the
__table_args__/OutboxModelMixin declared attr compute the name dynamically from
cls.__tablename__ (or use no explicit name) and return the
UniqueConstraint("event_id", "event_name", name=dynamic_name) so each inheriting
model gets a unique constraint name.
- Around line 172-181: The dict-based sqlalchemy.case usage in
status_sorting_case is deprecated for SQLAlchemy 2.0; replace the dict with the
positional list/tuple of (when, then) pairs while keeping value=cls.event_status
and else_=4. Update the sqlalchemy.case call inside status_sorting_case to pass
an ordered sequence of tuples for
repository.EventStatus.NEW/NOT_PRODUCED/PRODUCED mapping to 1/2/3 respectively,
preserving cls.event_status as the value argument and else_=4.

In `@src/cqrs/saga/storage/sqlalchemy.py`:
- Line 28: The module currently calls dotenv.load_dotenv() at import time,
causing a side effect whenever this module is imported; remove that top-level
call and instead provide an explicit initialization entry point (e.g., a
function named init_sqlalchemy_storage or load_env_for_sqlalchemy) or invoke
dotenv.load_dotenv() inside the SQLAlchemySagaStorage constructor so callers
control when .env is parsed; update references to dotenv.load_dotenv() and
SQLAlchemySagaStorage to call the new initializer from application startup
rather than relying on import-time behavior.
- Around line 11-23: The ImportError raised in the guarded import block should
suppress exception chaining to avoid noisy tracebacks; update the except
ImportError block that re-raises the replacement ImportError (the block around
the sqlalchemy imports in src/cqrs/saga/storage/sqlalchemy.py) to raise the new
ImportError with "from None" (i.e., raise ImportError(...) from None) so the
original ImportError is not chained into the traceback.
- Line 25: Replace the old registry-based base with the SQLAlchemy 2.0
DeclarativeBase pattern: import DeclarativeBase from sqlalchemy.orm, replace the
single line Base = registry().generate_base() with a class definition class
Base(DeclarativeBase): pass, and ensure all ORM models that previously inherited
from Base continue to inherit from this new Base; update imports accordingly so
DeclarativeBase is used instead of registry().generate_base().

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/cqrs/outbox/sqlalchemy.py (1)

279-290: ⚠️ Potential issue | 🔴 Critical

Remove the dead rebind_outbox_model() function — module cannot be imported due to undefined warnings and DeclarativeMeta.

The @warnings.deprecated() decorator (line 279) references an undefined warnings module, and the function signature references undefined DeclarativeMeta (line 282). Since from __future__ import annotations is not imported, both will cause NameError at module import time, preventing the entire module from loading.

Additionally, this function has zero callers in the codebase and is dead code. Simply remove it:

Fix
-@warnings.deprecated()
-def rebind_outbox_model(
-    model: typing.Any,
-    new_base: DeclarativeMeta,
-    table_name: typing.Text | None = None,
-):
-    model.__bases__ = (new_base,)
-    model.__table__.name = table_name or model.__table__.name
-    new_base.metadata._add_table(
-        model.__table__.name,
-        model.__table__.schema,
-        model.__table__,
-    )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 279 - 290, Remove the dead
function rebind_outbox_model and its `@warnings.deprecated`() decorator from the
module: delete the entire function definition (including the decorator and its
body) that references DeclarativeMeta and warnings to prevent import-time
NameError and because it has no callers; after removal, run tests/linters and
remove any now-unused imports related to DeclarativeMeta or warnings if they
exist.
🧹 Nitpick comments (2)
src/cqrs/outbox/sqlalchemy.py (2)

222-240: Nit: _process_events processes a single event — consider _process_event (singular).

The method accepts one model and returns one OutboxedEvent | None, but the name is plural, which is misleading when reading the loop in get_many.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 222 - 240, The method
_process_events is misnamed since it handles a single OutboxModelMixin and
returns one OutboxedEvent; rename it to _process_event and update all internal
references (calls from get_many and anywhere else) to use _process_event,
keeping the implementation unchanged (including use of map.OutboxedEventMap,
self._compressor, orjson.loads, and constructing repository.OutboxedEvent); also
update any tests, docstrings, and type hints that reference _process_events to
the new name.

154-169: update_status_query sets flush_counter = flush_counter (no-op) for non-NOT_PRODUCED statuses.

When status != NOT_PRODUCED, line 162 produces the SQL expression SET flush_counter = flush_counter, which is a pointless write. Only include flush_counter in the update when it actually changes.

Suggested fix
     `@classmethod`
     def update_status_query(
         cls,
         outboxed_event_id: int,
         status: repository.EventStatus,
     ) -> sqlalchemy.Update:
-        values = {
-            "event_status": status,
-            "flush_counter": cls.flush_counter,
-        }
-        if status == repository.EventStatus.NOT_PRODUCED:
-            values["flush_counter"] += 1
+        values: dict = {"event_status": status}
+        if status == repository.EventStatus.NOT_PRODUCED:
+            values["flush_counter"] = cls.flush_counter + 1
 
         return (
             sqlalchemy.update(cls).where(cls.id == outboxed_event_id).values(**values)
         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 154 - 169, The
update_status_query currently always includes "flush_counter": cls.flush_counter
which yields a no-op SET flush_counter = flush_counter for non-NOT_PRODUCED
statuses; change the construction of values in update_status_query so that
flush_counter is only added when status == repository.EventStatus.NOT_PRODUCED
(e.g. set values["flush_counter"] = cls.flush_counter + 1 inside that branch)
and omit flush_counter for all other statuses, leaving the
sqlalchemy.update(...) return unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 279-290: Remove the dead function rebind_outbox_model and its
`@warnings.deprecated`() decorator from the module: delete the entire function
definition (including the decorator and its body) that references
DeclarativeMeta and warnings to prevent import-time NameError and because it has
no callers; after removal, run tests/linters and remove any now-unused imports
related to DeclarativeMeta or warnings if they exist.

---

Duplicate comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Line 6: The import uuid6 in src/cqrs/outbox/sqlalchemy.py is unused; remove
the line importing uuid6 so the module no longer includes an unreferenced
dependency (look for the top-level import statement "import uuid6" and delete
it), then run lint/tests to confirm no other references to uuid6 exist in this
file.

---

Nitpick comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 222-240: The method _process_events is misnamed since it handles a
single OutboxModelMixin and returns one OutboxedEvent; rename it to
_process_event and update all internal references (calls from get_many and
anywhere else) to use _process_event, keeping the implementation unchanged
(including use of map.OutboxedEventMap, self._compressor, orjson.loads, and
constructing repository.OutboxedEvent); also update any tests, docstrings, and
type hints that reference _process_events to the new name.
- Around line 154-169: The update_status_query currently always includes
"flush_counter": cls.flush_counter which yields a no-op SET flush_counter =
flush_counter for non-NOT_PRODUCED statuses; change the construction of values
in update_status_query so that flush_counter is only added when status ==
repository.EventStatus.NOT_PRODUCED (e.g. set values["flush_counter"] =
cls.flush_counter + 1 inside that branch) and omit flush_counter for all other
statuses, leaving the sqlalchemy.update(...) return unchanged.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/cqrs/outbox/sqlalchemy.py (1)

283-290: ⚠️ Potential issue | 🔴 Critical

Two NameErrors in rebind_outbox_model: typo @depricated and missing DeclarativeMeta import.

  1. Line 283: @depricated is a misspelling — the imported name is deprecated (line 12/14). NameError: name 'depricated' is not defined.
  2. Line 286: DeclarativeMeta is used as a type annotation but is never imported. Python evaluates annotations eagerly without from __future__ import annotations, so this raises NameError: name 'DeclarativeMeta' is not defined at function definition time. DeclarativeMeta would need to be imported from sqlalchemy.orm (it still exists in SQLAlchemy 2.x as a legacy type).
🐛 Proposed fix
-from sqlalchemy.orm import Mapped, mapped_column, declared_attr
+from sqlalchemy.orm import Mapped, mapped_column, declared_attr, DeclarativeMeta
-@depricated("This function is deprecated; use `OutboxModelMixin` instead")
+@deprecated("This function is deprecated; use `OutboxModelMixin` instead")
 def rebind_outbox_model(
     model: typing.Any,
     new_base: DeclarativeMeta,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 283 - 290, Fix the two NameErrors
in rebind_outbox_model by correcting the decorator name from `@depricated` to
`@deprecated` and by importing DeclarativeMeta so the type annotation is valid at
function definition time; update the top-level imports to include
DeclarativeMeta from sqlalchemy.orm (or wrap the annotation in a string if you
prefer forward references) and ensure the decorator used is the existing
deprecated symbol referenced elsewhere in the file.
🧹 Nitpick comments (1)
src/cqrs/outbox/sqlalchemy.py (1)

117-118: Use @declared_attr.directive for __table_args__ for consistency with __tablename__.

__tablename__ on line 66 already uses @declared_attr.directive, but __table_args__ uses plain @declared_attr. The purpose of declared_attr.directive is strictly to support PEP 484 typing tools, allowing the decorated function to have a return type that is not using the Mapped generic class; at runtime it returns the declared_attr class unmodified. There is no runtime impact, but using the .directive variant here maintains internal consistency and keeps type-checkers happy.

♻️ Proposed fix
-    `@declared_attr`
+    `@declared_attr.directive`
     def __table_args__(self):
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 117 - 118, Replace the plain
decorator on the class-level table args with the directive variant to match
__tablename__ and satisfy type-checkers: change the decorator on the
__table_args__ declared attribute from `@declared_attr` to
`@declared_attr.directive` so the function signature can use PEP 484 typing
consistently (keep the method name __table_args__ and its return type intact).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 1-14: The module references sys.version_info but never imports
sys, causing a NameError on import; add an import sys at the top of the file
(alongside the other top-level imports) so the conditional that chooses
typing_extensions.deprecated vs warnings.deprecated runs correctly; ensure the
import is placed before the sys.version_info check (the conditional using
sys.version_info in the sqlalchemy module).

In `@tests/integration/sqlalchemy/conftest.py`:
- Around line 51-52: Remove the dead/unreachable second return that references
the undefined saga_engine: keep the single correct return using
sqla_async.async_sessionmaker(engine, expire_on_commit=False) and delete the
extraneous return sqla_async.async_sessionmaker(saga_engine, ...); this
eliminates the unreachable code and the potential NameError tied to saga_engine.

---

Outside diff comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 283-290: Fix the two NameErrors in rebind_outbox_model by
correcting the decorator name from `@depricated` to `@deprecated` and by importing
DeclarativeMeta so the type annotation is valid at function definition time;
update the top-level imports to include DeclarativeMeta from sqlalchemy.orm (or
wrap the annotation in a string if you prefer forward references) and ensure the
decorator used is the existing deprecated symbol referenced elsewhere in the
file.

---

Duplicate comments:
In `@tests/integration/sqlalchemy/conftest.py`:
- Around line 24-36: The session fixture opens a Connection and creates an
async_sessionmaker without join_transaction_mode, which causes Session.commit()
to affect the outer transaction; update the session_maker creation in the
fixture (the sqla_async.async_sessionmaker call used to produce session) to
include join_transaction_mode="create_savepoint" (keeping bind=connection and
expire_on_commit=False) so the Session creates SAVEPOINTs and will only
commit/rollback its own scope.
- Around line 10-14: The custom session-scoped event_loop fixture is a
deprecated override; remove this fixture definition (the event_loop function) so
pytest-asyncio's built-in event_loop is used, or if a session-scoped loop is
required rename it (e.g., session_event_loop) and update tests to request that
name; locate the event_loop fixture in conftest.py and delete/rename the
function to avoid overriding the plugin-provided event_loop.

---

Nitpick comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 117-118: Replace the plain decorator on the class-level table args
with the directive variant to match __tablename__ and satisfy type-checkers:
change the decorator on the __table_args__ declared attribute from
`@declared_attr` to `@declared_attr.directive` so the function signature can use PEP
484 typing consistently (keep the method name __table_args__ and its return type
intact).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/cqrs/outbox/sqlalchemy.py (1)

283-295: ⚠️ Potential issue | 🔴 Critical

@depricated typo and undefined DeclarativeMeta prevent module import.

Two critical errors:

  1. Line 284: @depricated should be @deprecated (the correct spelling imported on lines 12–14).
  2. Line 286: DeclarativeMeta is never imported. Since function annotations are evaluated at definition time in Python 3.10–3.11 (without from __future__ import annotations), this causes a NameError on module load.

Replace with typing.Any or drop the type hint, as DeclarativeMeta was removed in SQLAlchemy 2.0 and this function is deprecated anyway.

Fix
-@depricated("This function is deprecated; use `OutboxModelMixin` instead")
+@deprecated("This function is deprecated; use `OutboxModelMixin` instead")
 def rebind_outbox_model(
     model: typing.Any,
-    new_base: DeclarativeMeta,
+    new_base: typing.Any,
     table_name: typing.Text | None = None,
 ):
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 283 - 295, The decorator name on
the deprecated helper is misspelled and the type annotation uses an undefined
symbol; change the decorator on rebind_outbox_model from `@depricated` to
`@deprecated` and replace the DeclarativeMeta annotation on the new_base parameter
with typing.Any (or remove the annotation) so the module no longer raises
NameError at import; keep the rest of rebind_outbox_model unchanged.
🧹 Nitpick comments (3)
src/cqrs/outbox/sqlalchemy.py (3)

226-226: Method name _process_events is misleading — it processes a single event.

The method accepts one model: OutboxModelMixin and returns a single OutboxedEvent | None, but the plural name suggests batch processing. Consider renaming to _process_event for clarity.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` at line 226, Rename the misleading plural
method _process_events to _process_event and update its signature and return
type accordingly (currently def _process_events(self, model: OutboxModelMixin)
-> repository.OutboxedEvent | None) so it clearly indicates it handles a single
OutboxModelMixin and returns one OutboxedEvent | None; then update every
internal reference/call site to use _process_event (including any loops or
helpers that called _process_events), adjust any tests or subclass overrides
that reference the old name, and ensure imports/types (OutboxModelMixin,
OutboxedEvent, repository) remain correct after the rename.

158-173: update_status_query performs a no-op SET flush_counter = flush_counter for non-NOT_PRODUCED statuses.

Lines 164-166 initialize values["flush_counter"] to cls.flush_counter (the column itself). When status != NOT_PRODUCED, the generated SQL becomes SET flush_counter = flush_counter, which is a pointless write. Consider only including flush_counter in the update values when actually incrementing:

Proposed fix
     `@classmethod`
     def update_status_query(
         cls,
         outboxed_event_id: int,
         status: repository.EventStatus,
     ) -> sqlalchemy.Update:
-        values = {
-            "event_status": status,
-            "flush_counter": cls.flush_counter,
-        }
+        values: dict[str, typing.Any] = {
+            "event_status": status,
+        }
         if status == repository.EventStatus.NOT_PRODUCED:
-            values["flush_counter"] += 1
+            values["flush_counter"] = cls.flush_counter + 1
 
         return (
             sqlalchemy.update(cls).where(cls.id == outboxed_event_id).values(**values)
         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 158 - 173, update_status_query
currently always includes "flush_counter": cls.flush_counter which yields a
no-op SET flush_counter = flush_counter for statuses != NOT_PRODUCED; change the
logic in update_status_query so you only add the "flush_counter" key to values
when status == repository.EventStatus.NOT_PRODUCED and, in that branch, set
values["flush_counter"] to an expression that increments the column (e.g.
cls.flush_counter + 1 or sqlalchemy expression) instead of mutating the column
object; keep the rest of the update as-is so non-NOT_PRODUCED updates don't
include flush_counter.

117-125: Use @declared_attr.directive for __table_args__ to match SQLAlchemy 2.0 best practices and consistency with __tablename__.

Line 66 correctly uses @declared_attr.directive for __tablename__. Since __table_args__ here returns a tuple of UniqueConstraint objects that should be instantiated per subclass, it should also use @declared_attr.directive rather than plain @declared_attr.

Proposed fix
-    `@declared_attr`
+    `@declared_attr.directive`
     def __table_args__(self):
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 117 - 125, The __table_args__
declared as a class-level attribute should use `@declared_attr.directive` like
__tablename__ to ensure SQLAlchemy 2.0 instantiates the UniqueConstraint per
subclass; update the decorator on the __table_args__ method to
`@declared_attr.directive` and leave the method body returning the tuple with
sqlalchemy.UniqueConstraint("event_id", "event_name",
name="event_id_unique_index") unchanged so subclasses receive their own
constraint instances.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 283-295: The decorator name on the deprecated helper is misspelled
and the type annotation uses an undefined symbol; change the decorator on
rebind_outbox_model from `@depricated` to `@deprecated` and replace the
DeclarativeMeta annotation on the new_base parameter with typing.Any (or remove
the annotation) so the module no longer raises NameError at import; keep the
rest of rebind_outbox_model unchanged.

---

Nitpick comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Line 226: Rename the misleading plural method _process_events to
_process_event and update its signature and return type accordingly (currently
def _process_events(self, model: OutboxModelMixin) -> repository.OutboxedEvent |
None) so it clearly indicates it handles a single OutboxModelMixin and returns
one OutboxedEvent | None; then update every internal reference/call site to use
_process_event (including any loops or helpers that called _process_events),
adjust any tests or subclass overrides that reference the old name, and ensure
imports/types (OutboxModelMixin, OutboxedEvent, repository) remain correct after
the rename.
- Around line 158-173: update_status_query currently always includes
"flush_counter": cls.flush_counter which yields a no-op SET flush_counter =
flush_counter for statuses != NOT_PRODUCED; change the logic in
update_status_query so you only add the "flush_counter" key to values when
status == repository.EventStatus.NOT_PRODUCED and, in that branch, set
values["flush_counter"] to an expression that increments the column (e.g.
cls.flush_counter + 1 or sqlalchemy expression) instead of mutating the column
object; keep the rest of the update as-is so non-NOT_PRODUCED updates don't
include flush_counter.
- Around line 117-125: The __table_args__ declared as a class-level attribute
should use `@declared_attr.directive` like __tablename__ to ensure SQLAlchemy 2.0
instantiates the UniqueConstraint per subclass; update the decorator on the
__table_args__ method to `@declared_attr.directive` and leave the method body
returning the tuple with sqlalchemy.UniqueConstraint("event_id", "event_name",
name="event_id_unique_index") unchanged so subclasses receive their own
constraint instances.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/cqrs/outbox/sqlalchemy.py (1)

283-294: ⚠️ Potential issue | 🟠 Major

Raise NotImplementedError for rebind_outbox_model instead of silently corrupting state.

This function is already marked @deprecated, but the deprecation warning alone does not prevent problematic behavior. The technical issues are real:

  1. Modifying __bases__ after mapper initialization is broken: SQLAlchemy's mapper instrumentation happens at class definition time. Swapping __bases__ afterward does not re-trigger __init_subclass__, mapper configuration, or column instrumentation, resulting in a class whose __table__ and __bases__ point to different mapper registries — silent corruption.
  2. Private API usage: Calls new_base.metadata._add_table(...), which is not guaranteed stable across minor SQLAlchemy releases.
  3. Misleading type hint: new_base is typed as DeclarativeMeta (legacy SQLAlchemy), but SQLAlchemy 2.0 users creating bases via class Base(DeclarativeBase): ... would pass a plain type, making the hint misleading.

Since the function is already deprecated and has a clear migration path (OutboxModelMixin), raise NotImplementedError to prevent accidental corruption by external callers who have not yet migrated.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 283 - 294, The
rebind_outbox_model function is unsafe and should raise NotImplementedError
instead of mutating class internals; replace the current body of
rebind_outbox_model (the function that currently assigns model.__bases__,
mutates model.__table__.name and calls new_base.metadata._add_table) with a
single raise NotImplementedError explaining the function is deprecated and to
use OutboxModelMixin; also adjust the type hint/comments to avoid implying
DeclarativeMeta is required (use typing.Any or a neutral note) and remove any
uses of private metadata API (_add_table) to prevent silent state corruption.
🧹 Nitpick comments (3)
src/cqrs/outbox/sqlalchemy.py (3)

138-156: .select_from(cls) is redundant when select(cls) already establishes the FROM clause.

sqlalchemy.select(cls) already implies FROM outbox for the mapped entity. The chained .select_from(cls) is a no-op and adds cognitive noise.

♻️ Proposed fix
         return (
             sqlalchemy.select(cls)
-            .select_from(cls)
             .where(
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 138 - 156, The query builder
unnecessarily calls .select_from(cls) after sqlalchemy.select(cls), which is
redundant and noisy; remove the chained .select_from(cls) call in the select
construction (the expression starting with sqlalchemy.select(cls) and using
cls.event_status, cls.flush_counter, cls.topic, cls.status_sorting_case(),
cls.id, and .limit(size)) so the query simply uses
sqlalchemy.select(cls).where(...).order_by(...).limit(size) instead.

117-125: Use @declared_attr.directive for __table_args__ to match __tablename__ and SQLAlchemy 2.0 convention.

__tablename__ (Line 66) already uses @declared_attr.directive, which is the recommended decorator in SQLAlchemy 2.0 for mixin class-level schema directives. __table_args__ should follow suit for consistency and to suppress type-checker warnings.

♻️ Proposed fix
-    `@declared_attr`
+    `@declared_attr.directive`
     def __table_args__(self):
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 117 - 125, The __table_args__
mixin currently uses `@declared_attr` but should use `@declared_attr.directive` to
match the __tablename__ usage and SQLAlchemy 2.0 conventions; update the
decorator on the __table_args__ method to declared_attr.directive while keeping
the return value (the sqlalchemy.UniqueConstraint over "event_id" and
"event_name" named "event_id_unique_index") unchanged so type-checkers and
mixin-level schema directives remain consistent.

158-173: update_status_query unconditionally adds a no-op flush_counter = flush_counter self-reference for non-NOT_PRODUCED statuses.

values["flush_counter"] = cls.flush_counter (the bare InstrumentedAttribute) generates SET flush_counter = flush_counter, which is a database round-trip with no effect. The cleaner approach is to only include flush_counter in the update when it actually needs to change.

♻️ Proposed fix
     `@classmethod`
     def update_status_query(
         cls,
         outboxed_event_id: int,
         status: repository.EventStatus,
     ) -> sqlalchemy.Update:
-        values = {
-            "event_status": status,
-            "flush_counter": cls.flush_counter,
-        }
-        if status == repository.EventStatus.NOT_PRODUCED:
-            values["flush_counter"] += 1
+        values: dict = {"event_status": status}
+        if status == repository.EventStatus.NOT_PRODUCED:
+            values["flush_counter"] = cls.flush_counter + 1

         return (
             sqlalchemy.update(cls).where(cls.id == outboxed_event_id).values(**values)
         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 158 - 173, The
update_status_query currently always includes a no-op self-reference by setting
values["flush_counter"] = cls.flush_counter; change it so flush_counter is only
placed into the values dict when it must change: remove the unconditional
assignment and instead, inside the branch for
repository.EventStatus.NOT_PRODUCED, set values["flush_counter"] to an increment
expression (e.g., cls.flush_counter + 1 or an equivalent SQL expression) so the
UPDATE only touches flush_counter when status == NOT_PRODUCED and avoids
emitting SET flush_counter = flush_counter for other statuses.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 46-53: The process_bind_param method currently lets non-uuid.UUID
values slip through for non-PostgreSQL dialects which breaks BINARY(16) storage;
update process_bind_param in the SQLAlchemy type to detect str inputs and
convert them to uuid.UUID (e.g., uuid.UUID(value)) before returning .bytes for
MySQL/SQLite, keep returning the original uuid.UUID for PostgreSQL, and raise a
clear TypeError for any other unexpected types so invalid values don't silently
corrupt the DB.

---

Outside diff comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 283-294: The rebind_outbox_model function is unsafe and should
raise NotImplementedError instead of mutating class internals; replace the
current body of rebind_outbox_model (the function that currently assigns
model.__bases__, mutates model.__table__.name and calls
new_base.metadata._add_table) with a single raise NotImplementedError explaining
the function is deprecated and to use OutboxModelMixin; also adjust the type
hint/comments to avoid implying DeclarativeMeta is required (use typing.Any or a
neutral note) and remove any uses of private metadata API (_add_table) to
prevent silent state corruption.

---

Nitpick comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 138-156: The query builder unnecessarily calls .select_from(cls)
after sqlalchemy.select(cls), which is redundant and noisy; remove the chained
.select_from(cls) call in the select construction (the expression starting with
sqlalchemy.select(cls) and using cls.event_status, cls.flush_counter, cls.topic,
cls.status_sorting_case(), cls.id, and .limit(size)) so the query simply uses
sqlalchemy.select(cls).where(...).order_by(...).limit(size) instead.
- Around line 117-125: The __table_args__ mixin currently uses `@declared_attr`
but should use `@declared_attr.directive` to match the __tablename__ usage and
SQLAlchemy 2.0 conventions; update the decorator on the __table_args__ method to
declared_attr.directive while keeping the return value (the
sqlalchemy.UniqueConstraint over "event_id" and "event_name" named
"event_id_unique_index") unchanged so type-checkers and mixin-level schema
directives remain consistent.
- Around line 158-173: The update_status_query currently always includes a no-op
self-reference by setting values["flush_counter"] = cls.flush_counter; change it
so flush_counter is only placed into the values dict when it must change: remove
the unconditional assignment and instead, inside the branch for
repository.EventStatus.NOT_PRODUCED, set values["flush_counter"] to an increment
expression (e.g., cls.flush_counter + 1 or an equivalent SQL expression) so the
UPDATE only touches flush_counter when status == NOT_PRODUCED and avoids
emitting SET flush_counter = flush_counter for other statuses.

Comment on lines +46 to +53
def process_bind_param(self, value, dialect):
if value is None:
return value
if dialect.name == "postgresql":
return value # asyncpg work with uuid.UUID
if isinstance(value, uuid.UUID):
return value.bytes # For MySQL return 16 bytes
return value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

process_bind_param silently passes non-uuid.UUID values through for non-PostgreSQL dialects.

If a string UUID (e.g., "550e8400-e29b-...") is passed for MySQL/SQLite, the final return value falls through without conversion, producing a type error or silent data corruption when SQLAlchemy tries to store it in a BINARY(16) column.

🛡️ Proposed fix: add a string-to-UUID conversion guard
     def process_bind_param(self, value, dialect):
         if value is None:
             return value
         if dialect.name == "postgresql":
             return value  # asyncpg works with uuid.UUID
+        if isinstance(value, str):
+            value = uuid.UUID(value)
         if isinstance(value, uuid.UUID):
             return value.bytes  # For MySQL return 16 bytes
         return value
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 46 - 53, The process_bind_param
method currently lets non-uuid.UUID values slip through for non-PostgreSQL
dialects which breaks BINARY(16) storage; update process_bind_param in the
SQLAlchemy type to detect str inputs and convert them to uuid.UUID (e.g.,
uuid.UUID(value)) before returning .bytes for MySQL/SQLite, keep returning the
original uuid.UUID for PostgreSQL, and raise a clear TypeError for any other
unexpected types so invalid values don't silently corrupt the DB.

@codspeed-hq
Copy link
Contributor

codspeed-hq bot commented Feb 20, 2026

Merging this PR will not alter performance

✅ 70 untouched benchmarks
⏩ 10 skipped benchmarks1


Comparing Dioneya:sqlalchemy-type-fix (9f3e6b1) with master (37e723a)

Open in CodSpeed

Footnotes

  1. 10 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants