[Refactor] Outbox Architecture Overhaul: PostgreSQL Support, Mixin Models, and UUID v7#33
[Refactor] Outbox Architecture Overhaul: PostgreSQL Support, Mixin Models, and UUID v7#33Dioneya wants to merge 18 commits intopypatterns:masterfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughRefactors the outbox to a mixin-based ORM model with cross-dialect BinaryUUID, makes SQLAlchemy an optional runtime import, switches notification event IDs to Changes
Sequence Diagram(s)sequenceDiagram
participant Handler
participant Repository
participant Database
Handler->>Repository: add(event)
Repository->>Database: INSERT OutboxModelMixin (event_id, event_name, topic, payload, created_at)
Database-->>Repository: ACK / row id
Repository-->>Handler: persisted
Handler->>Repository: get_many(batch, topic?)
Repository->>Database: SELECT via get_batch_query(size, topic)
Database-->>Repository: rows
Repository-->>Handler: list[OutboxedEvent]
Handler->>Repository: update_status(id, status)
Repository->>Database: UPDATE via update_status_query(id, status)
Database-->>Repository: ACK
Repository-->>Handler: updated
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/cqrs/__init__.py (1)
16-61:⚠️ Potential issue | 🟠 MajorGuard SQLAlchemy import to keep base package usable without the optional dependency.
Line 16 (and line 50 in
__all__): Importingcqrs.outbox.sqlalchemyunconditionally breaks when SQLAlchemy isn't installed. Sincesqlalchemyis an optional dependency (defined inpyproject.tomllines 71-73, not in main dependencies), users installing without the[sqlalchemy]extra will fail onimport cqrs. Use the same conditional import pattern already established incqrs/outbox/__init__.py(lines 10-19).🛠️ Suggested fix
-from cqrs.outbox.sqlalchemy import SqlAlchemyOutboxedEventRepository +try: + from cqrs.outbox.sqlalchemy import SqlAlchemyOutboxedEventRepository +except ImportError: + SqlAlchemyOutboxedEventRepository = None @@ -__all__ = ( +__all__ = [ @@ - "OutboxedEventRepository", - "SqlAlchemyOutboxedEventRepository", + "OutboxedEventRepository", @@ -) +] +if SqlAlchemyOutboxedEventRepository is not None: + __all__.append("SqlAlchemyOutboxedEventRepository")README.md (1)
470-528:⚠️ Potential issue | 🟡 MinorFix the outbox example to match the actual repository API.
The code example incorrectly passes
sessiontoself.outbox.add()andself.outbox.commit(). TheSqlAlchemyOutboxedEventRepositorystores the session at initialization and binds both methods to that session—neither method accepts a session parameter.📌 Required doc update
- self.outbox.add( - session, + self.outbox.add( cqrs.NotificationEvent[UserJoinedNotificationPayload]( event_name="UserJoined", topic="user_notification_events", payload=UserJoinedNotificationPayload( user_id=request.user_id, meeting_id=request.meeting_id, ), ), ) - self.outbox.add( - session, + self.outbox.add( cqrs.NotificationEvent[UserJoinedECSTPayload]( event_name="UserJoined", topic="user_ecst_events", payload=UserJoinedECSTPayload( user_id=request.user_id, meeting_id=request.meeting_id, ), ), ) - await self.outbox.commit(session) + await self.outbox.commit()
🤖 Fix all issues with AI agents
In `@tests/integration/sqlalchemy/conftest.py`:
- Around line 49-50: The fixture definition saga_session_factory currently
declares a non-existent parameter saga_engine and uses init_saga_orm as a used
name; change the parameter to engine (the real fixture) and rename init_saga_orm
to _init_saga_orm to mark it intentionally unused while keeping the dependency
so saga tables are initialized before saga_session_factory runs.
- Around line 7-9: The saga_session_factory fixture references an undefined
parameter name: replace the incorrect saga_engine parameter with engine in the
saga_session_factory function signature so pytest can resolve the fixture
(search for saga_session_factory and change its parameter to engine); also
update the DATABASE_URL default from the in-memory URI
("sqlite+aiosqlite:///:memory:") to a file-based/shared-cache URI (e.g.,
"sqlite+aiosqlite:///./.cqrs_test.db") so init_saga_orm-created tables are
visible across connections during tests.
- Around line 24-36: The fixture must use a nested SAVEPOINT so tests can call
session.commit() without ending the outer fixture transaction; change the
session fixture to (1) start the outer connection transaction as you already do,
(2) create the async session via async_sessionmaker(bind=connection,...), (3)
start a nested transaction on the session with await session.begin_nested(), and
(4) register an after-transaction-end event listener on the session (use the
session.sync_session for event.listen) that, when a subtransaction ends and the
transaction has a parent (use the public parent attribute), restarts a new
nested transaction by calling session.begin_nested(); keep yielding the session,
and on teardown close the session and rollback/close the outer connection
transaction as before.
In `@tests/integration/sqlalchemy/test_event_outbox.py`:
- Around line 147-169: The test function test_get_new_event_positive contains a
redundant inline comment "# noqa" on the assert comparing event.id and
event_over_get_all_events_method.id; remove the "# noqa" token from that
assertion line (in the test_get_new_event_positive body where event and
event_over_get_all_events_method are compared) so the linter (Ruff) no longer
flags it as unused, then run the test suite/linter to confirm the warning is
gone.
- Around line 79-83: The cleanup_table fixture uses a non-portable "TRUNCATE
TABLE" SQL and an unused parameter; change the fixture named cleanup_table to
use SQLAlchemy's delete operation against TestOutboxModel (e.g., execute a
sqla.delete(TestOutboxModel) via the session and commit) so it works with
SQLite, and rename the unused setup_db parameter to _setup_db to silence linter
warnings while keeping session and TestOutboxModel references intact.
🧹 Nitpick comments (6)
src/cqrs/saga/storage/sqlalchemy.py (1)
10-21: Chain the ImportError for clearer diagnostics.Line 10-21: Re-raising without exception chaining drops the original ImportError context; consider
from err(orfrom None) to improve debugging and satisfy Ruff B904.💡 Suggested change
-except ImportError: - raise ImportError( +except ImportError as err: + raise ImportError( "You are trying to use SQLAlchemy saga storage implementation, " "but 'sqlalchemy' is not installed. " "Please install it using: pip install python-cqrs[sqlalchemy]" - ) + ) from errpyproject.toml (1)
24-27: Confirm whether broker clients should be core deps.Line 24-27 and Line 49-54: Adding
aio-pikaandconfluent-kafkato core dependencies forces native client libs for all installs. If these transports are meant to stay optional (extras already exist), consider keeping them only in extras.♻️ Optional change (if brokers should remain optional)
- "aio-pika>=9.3.0", - "confluent-kafka>=2.6.0",Also applies to: 49-54
src/cqrs/outbox/sqlalchemy.py (4)
12-23: Chain the ImportError in the optional SQLAlchemy guard.Line 12-23: Re-raising without exception chaining drops the original ImportError context; consider
from err(orfrom None) to improve diagnostics.💡 Suggested change
-except ImportError: - raise ImportError( +except ImportError as err: + raise ImportError( "You are trying to use SQLAlchemy outbox implementation, " "but 'sqlalchemy' is not installed. " "Please install it using: pip install python-cqrs[sqlalchemy]" - ) + ) from err
31-58: Normalize UUID types across dialects.Line 31-58: Depending on SQLAlchemy/driver settings, Postgres UUIDs can come back as strings and MySQL can return bytearray/memoryview. Consider explicitly requesting
as_uuid=Trueand normalizing bytes-like values to avoid type drift.💡 Suggested change
- if dialect.name == "postgresql": - return dialect.type_descriptor(postgresql.UUID()) + if dialect.name == "postgresql": + return dialect.type_descriptor(postgresql.UUID(as_uuid=True)) @@ - if isinstance(value, uuid.UUID): - return value.bytes # For MySQL return 16 bytes + if isinstance(value, str): + value = uuid.UUID(value) + if isinstance(value, uuid.UUID): + return value.bytes # For MySQL return 16 bytes @@ - if isinstance(value, bytes): - return uuid.UUID(bytes=value) # From MySQL got bytes, make UUID + if isinstance(value, (bytes, bytearray, memoryview)): + return uuid.UUID(bytes=bytes(value)) # Normalize bytes-like
113-121: Avoid unique-constraint name collisions.Line 113-121: The fixed constraint name can collide if multiple
OutboxModelMixinsubclasses share the same schema. Consider incorporating the table name (or omit the explicit name and rely on naming conventions).💡 Suggested change
- def __table_args__(self): + def __table_args__(cls): return ( sqlalchemy.UniqueConstraint( "event_id", "event_name", - name="event_id_unique_index", + name=f"{cls.__tablename__}_event_id_unique", ), )
222-227: Confirm behavior for unknown event names.Line 222-227: Returning
Nonemeans rows with unknownevent_namewill be skipped and remain in the outbox. If you want to avoid repeated warnings or stuck rows, consider marking them or surfacing an error.
# Conflicts: # pyproject.toml # src/cqrs/__init__.py # src/cqrs/events/event.py # src/cqrs/saga/storage/sqlalchemy.py
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (5)
src/cqrs/events/event.py (1)
293-297:⚠️ Potential issue | 🟠 Major
DCNotificationEventwas not updated — inconsistent UUID versions between the twoINotificationEventimplementations.The PR explicitly targets switching all event ID generation to UUID v7, but only
PydanticNotificationEvent(line 354) was updated.DCNotificationEvent.event_idat line 293 still usesuuid.uuid4, so callers using the dataclass-based path continue to receive UUID v4 while callers using the Pydantic path receive UUID v7. Both classes implement the sameINotificationEventinterface, and this divergence is a silent behavioral inconsistency.🔧 Proposed fix for `DCNotificationEvent`
- event_id: uuid.UUID = dataclasses.field(default_factory=uuid.uuid4) + event_id: uuid.UUID = dataclasses.field(default_factory=uuid6.uuid7)Also applies to: 354-354
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/events/event.py` around lines 293 - 297, DCNotificationEvent currently assigns event_id using uuid.uuid4 causing a mismatch with PydanticNotificationEvent which uses UUID v7; update the dataclass field DCNotificationEvent.event_id to use uuid.uuid7 as the default_factory (and add or adjust the uuid import if uuid.uuid7 isn't already available) so both INotificationEvent implementations produce UUID v7 consistently.pyproject.toml (1)
36-60:⚠️ Potential issue | 🟠 MajorMissing
sqlalchemyentry in[project.optional-dependencies].The PR objective is to "Make SQLAlchemy an optional dependency," but
sqlalchemy[asyncio]has only been moved todev— no installable optional extra is exposed to end users. Anyone using the SQLAlchemy outbox integration would need to know to install SQLAlchemy manually, andpip install python-cqrs[sqlalchemy]will silently do nothing. The existingkafkaandrabbitextras set the right precedent.Additionally, given that the PR description mentions PostgreSQL support, consider whether a PostgreSQL async driver (e.g.
asyncpg) should also be included as an optional extra or bundled with thesqlalchemyextra.♻️ Proposed fix
[project.optional-dependencies] aiobreaker = ["aiobreaker>=0.3.0"] +sqlalchemy = ["sqlalchemy[asyncio]>=2.0,<3"] dev = [🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pyproject.toml` around lines 36 - 60, The optional-dependencies table removes the public sqlalchemy extra but only kept sqlalchemy[asyncio] under the dev extra, so installing the package with an extra (e.g. pip install python-cqrs[sqlalchemy]) does nothing; add a proper optional extra named "sqlalchemy" under [project.optional-dependencies] that includes "sqlalchemy[asyncio]==2.0.*" (and optionally the async PostgreSQL driver like "asyncpg" if you want to advertise Postgres support) so consumers can opt into the SQLAlchemy-based outbox integration without manually installing dependencies.README.md (3)
606-651:⚠️ Potential issue | 🟠 MajorOutdated handler example uses the old API — will produce
TypeErrorat runtime.The pre-existing handler example at lines 609–651 uses the old API that no longer exists:
async with self.outbox as session:—OutboxedEventRepositoryhas no__aenter__/__aexit__self.outbox.add(session, event)—add()now takes onlyeventawait self.outbox.commit(session)—commit()now takes no argumentsThe new API (from
SqlAlchemyOutboxedEventRepository) should be reflected:async def handle(self, request: JoinMeetingCommand) -> None: self.outbox.add( cqrs.NotificationEvent[UserJoinedNotificationPayload]( event_name="UserJoined", topic="user_notification_events", payload=UserJoinedNotificationPayload(...), ), ) await self.outbox.commit()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@README.md` around lines 606 - 651, The README handler example uses the old OutboxedEventRepository API and will TypeError at runtime; update the JoinMeetingCommandHandler.handle implementation to stop using the async context manager (remove "async with self.outbox as session") and stop passing a session into self.outbox.add and self.outbox.commit: call self.outbox.add(...) with just the NotificationEvent instances and then await self.outbox.commit() with no arguments, leaving your business DB operations (do_some_logic) outside of an unavailable repository context or performed via the actual DB session management used elsewhere.
686-691:⚠️ Potential issue | 🟠 Major
SqlAlchemyOutboxedEventRepositoryconstructor call is missing the now-requiredoutbox_modelargument.Line 689:
cqrs.SqlAlchemyOutboxedEventRepository(session_factory, zlib.ZlibCompressor())The constructor signature changed to
__init__(self, session, outbox_model, compressor=None). PassingZlibCompressor()as the second positional argument now binds it tooutbox_model, notcompressor, causing a runtime error. This example needs to be updated to pass the concrete model class:cqrs.SqlAlchemyOutboxedEventRepository( session, outbox_model=OutboxModel, compressor=zlib.ZlibCompressor(), )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@README.md` around lines 686 - 691, The README example calls cqrs.SqlAlchemyOutboxedEventRepository(session_factory, zlib.ZlibCompressor()) but the constructor is now __init__(self, session, outbox_model, compressor=None), so the compressor is being passed as the outbox_model; update the call to pass the concrete outbox model class as the second argument and the compressor as the third (e.g., use session_factory and OutboxModel for the outbox_model parameter and zlib.ZlibCompressor() for compressor), or use explicit keyword args outbox_model=OutboxModel and compressor=zlib.ZlibCompressor() to avoid positional confusion.
656-658:⚠️ Potential issue | 🟡 MinorStale
OUTBOX_SQLA_TABLEenvironment variable tip.The tip at line 657 states the outbox table name can be set via the
OUTBOX_SQLA_TABLEenvironment variable, butsqlalchemy.pyuses the hardcoded constantDEFAULT_OUTBOX_TABLE_NAME = "outbox"with noos.getenvlookup. This feature no longer exists; the table name is now configured by overriding__tablename__in the concrete model class.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@README.md` around lines 656 - 658, The README's tip about the OUTBOX_SQLA_TABLE environment variable is stale; update the README to remove or replace the sentence referring to OUTBOX_SQLA_TABLE and instead document that the table name is determined by the concrete model's __tablename__ override (and that DEFAULT_OUTBOX_TABLE_NAME = "outbox" in sqlalchemy.py is only a fallback constant, not an env-configurable value). Reference DEFAULT_OUTBOX_TABLE_NAME in sqlalchemy.py and the __tablename__ mechanism so readers know how to set a custom table name.
🧹 Nitpick comments (7)
src/cqrs/events/event.py (1)
7-8: Consider using the stdlibuuid.uuid7()on Python 3.14+ to avoid the third-party dependency.Python 3.14 added support for UUID versions 6, 7, and 8, meaning the
uuid6package becomes redundant on that version. If the project plans to support Python 3.14+, the dependency can be conditionally avoided:if sys.version_info >= (3, 14): _uuid7 = uuid.uuid7 # type: ignore[attr-defined] else: import uuid6 _uuid7 = uuid6.uuid7Then reference
_uuid7as thedefault_factoryin both event classes.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/events/event.py` around lines 7 - 8, Replace the unconditional import of the third‑party uuid6 with a conditional resolver that uses the stdlib uuid.uuid7 when running on Python 3.14+ and falls back to uuid6 otherwise: add an import of sys, set a local _uuid7 = uuid.uuid7 if sys.version_info >= (3,14) else import uuid6 and set _uuid7 = uuid6.uuid7, then use _uuid7 as the default_factory for the UUID fields in the event classes (the constructors/fields where uuid6.uuid7 is currently referenced) and remove the plain uuid6 import.src/cqrs/saga/storage/sqlalchemy.py (3)
28-28: Module-leveldotenv.load_dotenv()is a side effect on import.Every consumer that imports this module triggers
.envfile loading, which can silently override programmatically set environment variables (dotenv loads withoverride=Falseby default, but the file is still parsed). This is unchanged code, but since the PR is already restructuring this module, consider deferringload_dotenv()to an explicit initialization step or documenting the behavior.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/saga/storage/sqlalchemy.py` at line 28, The module currently calls dotenv.load_dotenv() at import time, causing a side effect whenever this module is imported; remove that top-level call and instead provide an explicit initialization entry point (e.g., a function named init_sqlalchemy_storage or load_env_for_sqlalchemy) or invoke dotenv.load_dotenv() inside the SQLAlchemySagaStorage constructor so callers control when .env is parsed; update references to dotenv.load_dotenv() and SQLAlchemySagaStorage to call the new initializer from application startup rather than relying on import-time behavior.
11-23: Suppress exception chaining in the guarded import for cleaner tracebacks.The
raise ImportError(...)inside theexcept ImportErrorblock implicitly chains to the original exception, producing a longer traceback. Since the replacement message is self-contained and provides complete guidance, usefrom Noneto suppress the chaining:Suggested improvement
except ImportError: raise ImportError( "You are trying to use SQLAlchemy saga storage implementation, " "but 'sqlalchemy' is not installed. " "Please install it using: pip install python-cqrs[sqlalchemy]" - ) + ) from None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/saga/storage/sqlalchemy.py` around lines 11 - 23, The ImportError raised in the guarded import block should suppress exception chaining to avoid noisy tracebacks; update the except ImportError block that re-raises the replacement ImportError (the block around the sqlalchemy imports in src/cqrs/saga/storage/sqlalchemy.py) to raise the new ImportError with "from None" (i.e., raise ImportError(...) from None) so the original ImportError is not chained into the traceback.
25-25: Consider modernizing toDeclarativeBasefor alignment with SQLAlchemy 2.0+ best practices.While
registry().generate_base()still works in SQLAlchemy 2.0+, it is no longer the recommended approach. SQLAlchemy 2.0+ favors theDeclarativeBasepattern for better typing support and modernization.Modern alternative
from sqlalchemy.orm import DeclarativeBase class Base(DeclarativeBase): pass🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/saga/storage/sqlalchemy.py` at line 25, Replace the old registry-based base with the SQLAlchemy 2.0 DeclarativeBase pattern: import DeclarativeBase from sqlalchemy.orm, replace the single line Base = registry().generate_base() with a class definition class Base(DeclarativeBase): pass, and ensure all ORM models that previously inherited from Base continue to inherit from this new Base; update imports accordingly so DeclarativeBase is used instead of registry().generate_base().pyproject.toml (1)
26-26: Pinuuid6to a compatible release range — unbounded>=risks pulling in a future breaking version.Starting with version 2025.0.0, the
uuid6project transitions from Calendar Versioning to Semantic Versioning. The versioning scheme isYEAR.MINOR.PATCH(e.g.2025.0.0,2025.1.0,2025.0.1). A bare>=2025.0.1therefore accepts any hypothetical2026.0.0release, which could contain breaking API changes. For a published library, use a compatible-release specifier or explicit upper cap:♻️ Proposed fix
- "uuid6>=2025.0.1", + "uuid6>=2025.0.1,<2026",🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pyproject.toml` at line 26, The dependency entry "uuid6>=2025.0.1" is unbounded and may pull in incompatible future major versions; update the constraint for the uuid6 dependency to a compatible-release or explicit upper bound (for example use the compatible specifier ~=2025.0.1 or an upper cap like >=2025.0.1,<2026.0.0) so builds remain safe while allowing patch/minor updates; locate and update the uuid6 dependency line in pyproject.toml accordingly.src/cqrs/outbox/sqlalchemy.py (2)
113-121:__table_args__should use@declared_attr.directive; hardcoded constraint name will conflict when multiple models inherit this mixin.Two separate concerns:
Decorator inconsistency:
__tablename__correctly uses@declared_attr.directive(line 62), but__table_args__uses plain@declared_attr. For non-mapped class-level directives in SQLAlchemy 2.0,@declared_attr.directiveis the semantically correct choice.Hardcoded constraint name: The name
"event_id_unique_index"is global to the schema. If two concrete models inheritOutboxModelMixinand live in the same PostgreSQL/MySQL schema (e.g., a multi-tenant setup or side-by-side migration),CREATE TABLEwill fail with a duplicate constraint name.♻️ Proposed fix
- `@declared_attr` - def __table_args__(self): - return ( - sqlalchemy.UniqueConstraint( - "event_id", - "event_name", - name="event_id_unique_index", - ), - ) + `@declared_attr.directive` + def __table_args__(cls): + return ( + sqlalchemy.UniqueConstraint( + "event_id", + "event_name", + name=f"{cls.__tablename__}_event_id_unique_index", + ), + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 113 - 121, The __table_args__ in OutboxModelMixin is using `@declared_attr` and a hardcoded constraint name which will conflict across multiple concrete models; change the decorator to `@declared_attr.directive` to match __tablename__ semantics and make the UniqueConstraint name per-class (or omit the name) so it’s not global—e.g., inside the __table_args__/OutboxModelMixin declared attr compute the name dynamically from cls.__tablename__ (or use no explicit name) and return the UniqueConstraint("event_id", "event_name", name=dynamic_name) so each inheriting model gets a unique constraint name.
172-181: Migrate to SQLAlchemy 2.0 positional CASE form.The project targets SQLAlchemy 2.0 (
sqlalchemy[asyncio]==2.0.*), which deprecates the dict-basedcase()syntax. Use the positional tuple form instead:♻️ Suggested fix
`@classmethod` def status_sorting_case(cls) -> sqlalchemy.Case: return sqlalchemy.case( - { - repository.EventStatus.NEW: 1, - repository.EventStatus.NOT_PRODUCED: 2, - repository.EventStatus.PRODUCED: 3, - }, - value=cls.event_status, - else_=4, + (cls.event_status == repository.EventStatus.NEW, 1), + (cls.event_status == repository.EventStatus.NOT_PRODUCED, 2), + (cls.event_status == repository.EventStatus.PRODUCED, 3), + else_=4, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 172 - 181, The dict-based sqlalchemy.case usage in status_sorting_case is deprecated for SQLAlchemy 2.0; replace the dict with the positional list/tuple of (when, then) pairs while keeping value=cls.event_status and else_=4. Update the sqlalchemy.case call inside status_sorting_case to pass an ordered sequence of tuples for repository.EventStatus.NEW/NOT_PRODUCED/PRODUCED mapping to 1/2/3 respectively, preserving cls.event_status as the value argument and else_=4.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Line 6: Remove the unused top-level import "import uuid6" from this module; it
is not referenced anywhere (event IDs come from event.event_id already a
uuid.UUID), so delete that import line to eliminate dead code and unused
dependency.
In `@tests/integration/sqlalchemy/conftest.py`:
- Around line 10-14: Remove the redundant session-scoped pytest fixture named
event_loop from conftest.py: delete the entire fixture definition (the function
def event_loop() that creates asyncio.new_event_loop(), yields it, and closes
it) since pytest-asyncio is configured with asyncio_mode = "auto" and will
manage the loop; ensure no other test code depends on this custom event_loop
fixture before removing it.
---
Outside diff comments:
In `@pyproject.toml`:
- Around line 36-60: The optional-dependencies table removes the public
sqlalchemy extra but only kept sqlalchemy[asyncio] under the dev extra, so
installing the package with an extra (e.g. pip install python-cqrs[sqlalchemy])
does nothing; add a proper optional extra named "sqlalchemy" under
[project.optional-dependencies] that includes "sqlalchemy[asyncio]==2.0.*" (and
optionally the async PostgreSQL driver like "asyncpg" if you want to advertise
Postgres support) so consumers can opt into the SQLAlchemy-based outbox
integration without manually installing dependencies.
In `@README.md`:
- Around line 606-651: The README handler example uses the old
OutboxedEventRepository API and will TypeError at runtime; update the
JoinMeetingCommandHandler.handle implementation to stop using the async context
manager (remove "async with self.outbox as session") and stop passing a session
into self.outbox.add and self.outbox.commit: call self.outbox.add(...) with just
the NotificationEvent instances and then await self.outbox.commit() with no
arguments, leaving your business DB operations (do_some_logic) outside of an
unavailable repository context or performed via the actual DB session management
used elsewhere.
- Around line 686-691: The README example calls
cqrs.SqlAlchemyOutboxedEventRepository(session_factory, zlib.ZlibCompressor())
but the constructor is now __init__(self, session, outbox_model,
compressor=None), so the compressor is being passed as the outbox_model; update
the call to pass the concrete outbox model class as the second argument and the
compressor as the third (e.g., use session_factory and OutboxModel for the
outbox_model parameter and zlib.ZlibCompressor() for compressor), or use
explicit keyword args outbox_model=OutboxModel and
compressor=zlib.ZlibCompressor() to avoid positional confusion.
- Around line 656-658: The README's tip about the OUTBOX_SQLA_TABLE environment
variable is stale; update the README to remove or replace the sentence referring
to OUTBOX_SQLA_TABLE and instead document that the table name is determined by
the concrete model's __tablename__ override (and that DEFAULT_OUTBOX_TABLE_NAME
= "outbox" in sqlalchemy.py is only a fallback constant, not an env-configurable
value). Reference DEFAULT_OUTBOX_TABLE_NAME in sqlalchemy.py and the
__tablename__ mechanism so readers know how to set a custom table name.
In `@src/cqrs/events/event.py`:
- Around line 293-297: DCNotificationEvent currently assigns event_id using
uuid.uuid4 causing a mismatch with PydanticNotificationEvent which uses UUID v7;
update the dataclass field DCNotificationEvent.event_id to use uuid.uuid7 as the
default_factory (and add or adjust the uuid import if uuid.uuid7 isn't already
available) so both INotificationEvent implementations produce UUID v7
consistently.
---
Duplicate comments:
In `@tests/integration/sqlalchemy/conftest.py`:
- Around line 49-51: The pytest fixture saga_session_factory currently
references a non-existent fixture name and has an unused parameter; update its
signature to accept engine instead of saga_engine and rename init_saga_orm to
_init_saga_orm to mark it as intentionally unused, then return
sqla_async.async_sessionmaker(engine, expire_on_commit=False) so the correct
fixture is used and the unused-parameter warning is suppressed.
- Around line 24-36: The fixture currently starts a top-level transaction that
tests can commit (via repo.commit()), which breaks the fixture teardown
rollback; change the fixture to use a nested SAVEPOINT so test-level commits
only affect the savepoint: after creating connection = await engine.connect()
and transaction = await connection.begin(), start a nested transaction via
nested = await connection.begin_nested(); create the session via
sqla_async.async_sessionmaker(bind=connection, expire_on_commit=False) and
session = session_maker(); then attach an after-transaction-end listener on the
session (use SQLAlchemy event "after_transaction_end" on the session's
sync_session or appropriate async wrapper) that restarts a new nested savepoint
when the test-local transaction ends; keep the yield and in teardown close the
session, rollback the outer transaction and close the connection as before.
In `@tests/integration/sqlalchemy/test_event_outbox.py`:
- Line 169: Remove the redundant blanket noqa on the assertion line in
tests/integration/sqlalchemy/test_event_outbox.py by deleting the trailing " #
noqa" from the assertion "assert event.id ==
event_over_get_all_events_method.id" so the line reads only the assertion; this
removes the unused RUF100 suppression while leaving the test logic intact.
- Around line 79-83: The fixture cleanup_table uses a TRUNCATE SQL statement
which is unsupported by SQLite and includes an unused parameter setup_db; change
the deletion to a cross-DB-safe operation (e.g., await
session.execute(sqla.text(f"DELETE FROM {TestOutboxModel.__tablename__}")) or
use the ORM bulk delete) and keep committing as before, and remove the unused
setup_db parameter from the fixture signature so cleanup_table(self, session)
(and any references) no longer accept setup_db.
---
Nitpick comments:
In `@pyproject.toml`:
- Line 26: The dependency entry "uuid6>=2025.0.1" is unbounded and may pull in
incompatible future major versions; update the constraint for the uuid6
dependency to a compatible-release or explicit upper bound (for example use the
compatible specifier ~=2025.0.1 or an upper cap like >=2025.0.1,<2026.0.0) so
builds remain safe while allowing patch/minor updates; locate and update the
uuid6 dependency line in pyproject.toml accordingly.
In `@src/cqrs/events/event.py`:
- Around line 7-8: Replace the unconditional import of the third‑party uuid6
with a conditional resolver that uses the stdlib uuid.uuid7 when running on
Python 3.14+ and falls back to uuid6 otherwise: add an import of sys, set a
local _uuid7 = uuid.uuid7 if sys.version_info >= (3,14) else import uuid6 and
set _uuid7 = uuid6.uuid7, then use _uuid7 as the default_factory for the UUID
fields in the event classes (the constructors/fields where uuid6.uuid7 is
currently referenced) and remove the plain uuid6 import.
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 113-121: The __table_args__ in OutboxModelMixin is using
`@declared_attr` and a hardcoded constraint name which will conflict across
multiple concrete models; change the decorator to `@declared_attr.directive` to
match __tablename__ semantics and make the UniqueConstraint name per-class (or
omit the name) so it’s not global—e.g., inside the
__table_args__/OutboxModelMixin declared attr compute the name dynamically from
cls.__tablename__ (or use no explicit name) and return the
UniqueConstraint("event_id", "event_name", name=dynamic_name) so each inheriting
model gets a unique constraint name.
- Around line 172-181: The dict-based sqlalchemy.case usage in
status_sorting_case is deprecated for SQLAlchemy 2.0; replace the dict with the
positional list/tuple of (when, then) pairs while keeping value=cls.event_status
and else_=4. Update the sqlalchemy.case call inside status_sorting_case to pass
an ordered sequence of tuples for
repository.EventStatus.NEW/NOT_PRODUCED/PRODUCED mapping to 1/2/3 respectively,
preserving cls.event_status as the value argument and else_=4.
In `@src/cqrs/saga/storage/sqlalchemy.py`:
- Line 28: The module currently calls dotenv.load_dotenv() at import time,
causing a side effect whenever this module is imported; remove that top-level
call and instead provide an explicit initialization entry point (e.g., a
function named init_sqlalchemy_storage or load_env_for_sqlalchemy) or invoke
dotenv.load_dotenv() inside the SQLAlchemySagaStorage constructor so callers
control when .env is parsed; update references to dotenv.load_dotenv() and
SQLAlchemySagaStorage to call the new initializer from application startup
rather than relying on import-time behavior.
- Around line 11-23: The ImportError raised in the guarded import block should
suppress exception chaining to avoid noisy tracebacks; update the except
ImportError block that re-raises the replacement ImportError (the block around
the sqlalchemy imports in src/cqrs/saga/storage/sqlalchemy.py) to raise the new
ImportError with "from None" (i.e., raise ImportError(...) from None) so the
original ImportError is not chained into the traceback.
- Line 25: Replace the old registry-based base with the SQLAlchemy 2.0
DeclarativeBase pattern: import DeclarativeBase from sqlalchemy.orm, replace the
single line Base = registry().generate_base() with a class definition class
Base(DeclarativeBase): pass, and ensure all ORM models that previously inherited
from Base continue to inherit from this new Base; update imports accordingly so
DeclarativeBase is used instead of registry().generate_base().
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/cqrs/outbox/sqlalchemy.py (1)
279-290:⚠️ Potential issue | 🔴 CriticalRemove the dead
rebind_outbox_model()function — module cannot be imported due to undefinedwarningsandDeclarativeMeta.The
@warnings.deprecated()decorator (line 279) references an undefinedwarningsmodule, and the function signature references undefinedDeclarativeMeta(line 282). Sincefrom __future__ import annotationsis not imported, both will causeNameErrorat module import time, preventing the entire module from loading.Additionally, this function has zero callers in the codebase and is dead code. Simply remove it:
Fix
-@warnings.deprecated() -def rebind_outbox_model( - model: typing.Any, - new_base: DeclarativeMeta, - table_name: typing.Text | None = None, -): - model.__bases__ = (new_base,) - model.__table__.name = table_name or model.__table__.name - new_base.metadata._add_table( - model.__table__.name, - model.__table__.schema, - model.__table__, - )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 279 - 290, Remove the dead function rebind_outbox_model and its `@warnings.deprecated`() decorator from the module: delete the entire function definition (including the decorator and its body) that references DeclarativeMeta and warnings to prevent import-time NameError and because it has no callers; after removal, run tests/linters and remove any now-unused imports related to DeclarativeMeta or warnings if they exist.
🧹 Nitpick comments (2)
src/cqrs/outbox/sqlalchemy.py (2)
222-240: Nit:_process_eventsprocesses a single event — consider_process_event(singular).The method accepts one
modeland returns oneOutboxedEvent | None, but the name is plural, which is misleading when reading the loop inget_many.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 222 - 240, The method _process_events is misnamed since it handles a single OutboxModelMixin and returns one OutboxedEvent; rename it to _process_event and update all internal references (calls from get_many and anywhere else) to use _process_event, keeping the implementation unchanged (including use of map.OutboxedEventMap, self._compressor, orjson.loads, and constructing repository.OutboxedEvent); also update any tests, docstrings, and type hints that reference _process_events to the new name.
154-169:update_status_querysetsflush_counter = flush_counter(no-op) for non-NOT_PRODUCEDstatuses.When
status != NOT_PRODUCED, line 162 produces the SQL expressionSET flush_counter = flush_counter, which is a pointless write. Only includeflush_counterin the update when it actually changes.Suggested fix
`@classmethod` def update_status_query( cls, outboxed_event_id: int, status: repository.EventStatus, ) -> sqlalchemy.Update: - values = { - "event_status": status, - "flush_counter": cls.flush_counter, - } - if status == repository.EventStatus.NOT_PRODUCED: - values["flush_counter"] += 1 + values: dict = {"event_status": status} + if status == repository.EventStatus.NOT_PRODUCED: + values["flush_counter"] = cls.flush_counter + 1 return ( sqlalchemy.update(cls).where(cls.id == outboxed_event_id).values(**values) )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 154 - 169, The update_status_query currently always includes "flush_counter": cls.flush_counter which yields a no-op SET flush_counter = flush_counter for non-NOT_PRODUCED statuses; change the construction of values in update_status_query so that flush_counter is only added when status == repository.EventStatus.NOT_PRODUCED (e.g. set values["flush_counter"] = cls.flush_counter + 1 inside that branch) and omit flush_counter for all other statuses, leaving the sqlalchemy.update(...) return unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 279-290: Remove the dead function rebind_outbox_model and its
`@warnings.deprecated`() decorator from the module: delete the entire function
definition (including the decorator and its body) that references
DeclarativeMeta and warnings to prevent import-time NameError and because it has
no callers; after removal, run tests/linters and remove any now-unused imports
related to DeclarativeMeta or warnings if they exist.
---
Duplicate comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Line 6: The import uuid6 in src/cqrs/outbox/sqlalchemy.py is unused; remove
the line importing uuid6 so the module no longer includes an unreferenced
dependency (look for the top-level import statement "import uuid6" and delete
it), then run lint/tests to confirm no other references to uuid6 exist in this
file.
---
Nitpick comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 222-240: The method _process_events is misnamed since it handles a
single OutboxModelMixin and returns one OutboxedEvent; rename it to
_process_event and update all internal references (calls from get_many and
anywhere else) to use _process_event, keeping the implementation unchanged
(including use of map.OutboxedEventMap, self._compressor, orjson.loads, and
constructing repository.OutboxedEvent); also update any tests, docstrings, and
type hints that reference _process_events to the new name.
- Around line 154-169: The update_status_query currently always includes
"flush_counter": cls.flush_counter which yields a no-op SET flush_counter =
flush_counter for non-NOT_PRODUCED statuses; change the construction of values
in update_status_query so that flush_counter is only added when status ==
repository.EventStatus.NOT_PRODUCED (e.g. set values["flush_counter"] =
cls.flush_counter + 1 inside that branch) and omit flush_counter for all other
statuses, leaving the sqlalchemy.update(...) return unchanged.
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/cqrs/outbox/sqlalchemy.py (1)
283-290:⚠️ Potential issue | 🔴 CriticalTwo
NameErrors inrebind_outbox_model: typo@depricatedand missingDeclarativeMetaimport.
- Line 283:
@depricatedis a misspelling — the imported name isdeprecated(line 12/14).NameError: name 'depricated' is not defined.- Line 286:
DeclarativeMetais used as a type annotation but is never imported. Python evaluates annotations eagerly withoutfrom __future__ import annotations, so this raisesNameError: name 'DeclarativeMeta' is not definedat function definition time.DeclarativeMetawould need to be imported fromsqlalchemy.orm(it still exists in SQLAlchemy 2.x as a legacy type).🐛 Proposed fix
-from sqlalchemy.orm import Mapped, mapped_column, declared_attr +from sqlalchemy.orm import Mapped, mapped_column, declared_attr, DeclarativeMeta-@depricated("This function is deprecated; use `OutboxModelMixin` instead") +@deprecated("This function is deprecated; use `OutboxModelMixin` instead") def rebind_outbox_model( model: typing.Any, new_base: DeclarativeMeta,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 283 - 290, Fix the two NameErrors in rebind_outbox_model by correcting the decorator name from `@depricated` to `@deprecated` and by importing DeclarativeMeta so the type annotation is valid at function definition time; update the top-level imports to include DeclarativeMeta from sqlalchemy.orm (or wrap the annotation in a string if you prefer forward references) and ensure the decorator used is the existing deprecated symbol referenced elsewhere in the file.
🧹 Nitpick comments (1)
src/cqrs/outbox/sqlalchemy.py (1)
117-118: Use@declared_attr.directivefor__table_args__for consistency with__tablename__.
__tablename__on line 66 already uses@declared_attr.directive, but__table_args__uses plain@declared_attr. The purpose ofdeclared_attr.directiveis strictly to support PEP 484 typing tools, allowing the decorated function to have a return type that is not using theMappedgeneric class; at runtime it returns thedeclared_attrclass unmodified. There is no runtime impact, but using the.directivevariant here maintains internal consistency and keeps type-checkers happy.♻️ Proposed fix
- `@declared_attr` + `@declared_attr.directive` def __table_args__(self):🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 117 - 118, Replace the plain decorator on the class-level table args with the directive variant to match __tablename__ and satisfy type-checkers: change the decorator on the __table_args__ declared attribute from `@declared_attr` to `@declared_attr.directive` so the function signature can use PEP 484 typing consistently (keep the method name __table_args__ and its return type intact).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 1-14: The module references sys.version_info but never imports
sys, causing a NameError on import; add an import sys at the top of the file
(alongside the other top-level imports) so the conditional that chooses
typing_extensions.deprecated vs warnings.deprecated runs correctly; ensure the
import is placed before the sys.version_info check (the conditional using
sys.version_info in the sqlalchemy module).
In `@tests/integration/sqlalchemy/conftest.py`:
- Around line 51-52: Remove the dead/unreachable second return that references
the undefined saga_engine: keep the single correct return using
sqla_async.async_sessionmaker(engine, expire_on_commit=False) and delete the
extraneous return sqla_async.async_sessionmaker(saga_engine, ...); this
eliminates the unreachable code and the potential NameError tied to saga_engine.
---
Outside diff comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 283-290: Fix the two NameErrors in rebind_outbox_model by
correcting the decorator name from `@depricated` to `@deprecated` and by importing
DeclarativeMeta so the type annotation is valid at function definition time;
update the top-level imports to include DeclarativeMeta from sqlalchemy.orm (or
wrap the annotation in a string if you prefer forward references) and ensure the
decorator used is the existing deprecated symbol referenced elsewhere in the
file.
---
Duplicate comments:
In `@tests/integration/sqlalchemy/conftest.py`:
- Around line 24-36: The session fixture opens a Connection and creates an
async_sessionmaker without join_transaction_mode, which causes Session.commit()
to affect the outer transaction; update the session_maker creation in the
fixture (the sqla_async.async_sessionmaker call used to produce session) to
include join_transaction_mode="create_savepoint" (keeping bind=connection and
expire_on_commit=False) so the Session creates SAVEPOINTs and will only
commit/rollback its own scope.
- Around line 10-14: The custom session-scoped event_loop fixture is a
deprecated override; remove this fixture definition (the event_loop function) so
pytest-asyncio's built-in event_loop is used, or if a session-scoped loop is
required rename it (e.g., session_event_loop) and update tests to request that
name; locate the event_loop fixture in conftest.py and delete/rename the
function to avoid overriding the plugin-provided event_loop.
---
Nitpick comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 117-118: Replace the plain decorator on the class-level table args
with the directive variant to match __tablename__ and satisfy type-checkers:
change the decorator on the __table_args__ declared attribute from
`@declared_attr` to `@declared_attr.directive` so the function signature can use PEP
484 typing consistently (keep the method name __table_args__ and its return type
intact).
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/cqrs/outbox/sqlalchemy.py (1)
283-295:⚠️ Potential issue | 🔴 Critical
@depricatedtypo and undefinedDeclarativeMetaprevent module import.Two critical errors:
- Line 284:
@depricatedshould be@deprecated(the correct spelling imported on lines 12–14).- Line 286:
DeclarativeMetais never imported. Since function annotations are evaluated at definition time in Python 3.10–3.11 (withoutfrom __future__ import annotations), this causes aNameErroron module load.Replace with
typing.Anyor drop the type hint, asDeclarativeMetawas removed in SQLAlchemy 2.0 and this function is deprecated anyway.Fix
-@depricated("This function is deprecated; use `OutboxModelMixin` instead") +@deprecated("This function is deprecated; use `OutboxModelMixin` instead") def rebind_outbox_model( model: typing.Any, - new_base: DeclarativeMeta, + new_base: typing.Any, table_name: typing.Text | None = None, ):🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 283 - 295, The decorator name on the deprecated helper is misspelled and the type annotation uses an undefined symbol; change the decorator on rebind_outbox_model from `@depricated` to `@deprecated` and replace the DeclarativeMeta annotation on the new_base parameter with typing.Any (or remove the annotation) so the module no longer raises NameError at import; keep the rest of rebind_outbox_model unchanged.
🧹 Nitpick comments (3)
src/cqrs/outbox/sqlalchemy.py (3)
226-226: Method name_process_eventsis misleading — it processes a single event.The method accepts one
model: OutboxModelMixinand returns a singleOutboxedEvent | None, but the plural name suggests batch processing. Consider renaming to_process_eventfor clarity.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` at line 226, Rename the misleading plural method _process_events to _process_event and update its signature and return type accordingly (currently def _process_events(self, model: OutboxModelMixin) -> repository.OutboxedEvent | None) so it clearly indicates it handles a single OutboxModelMixin and returns one OutboxedEvent | None; then update every internal reference/call site to use _process_event (including any loops or helpers that called _process_events), adjust any tests or subclass overrides that reference the old name, and ensure imports/types (OutboxModelMixin, OutboxedEvent, repository) remain correct after the rename.
158-173:update_status_queryperforms a no-opSET flush_counter = flush_counterfor non-NOT_PRODUCEDstatuses.Lines 164-166 initialize
values["flush_counter"]tocls.flush_counter(the column itself). Whenstatus != NOT_PRODUCED, the generated SQL becomesSET flush_counter = flush_counter, which is a pointless write. Consider only includingflush_counterin the update values when actually incrementing:Proposed fix
`@classmethod` def update_status_query( cls, outboxed_event_id: int, status: repository.EventStatus, ) -> sqlalchemy.Update: - values = { - "event_status": status, - "flush_counter": cls.flush_counter, - } + values: dict[str, typing.Any] = { + "event_status": status, + } if status == repository.EventStatus.NOT_PRODUCED: - values["flush_counter"] += 1 + values["flush_counter"] = cls.flush_counter + 1 return ( sqlalchemy.update(cls).where(cls.id == outboxed_event_id).values(**values) )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 158 - 173, update_status_query currently always includes "flush_counter": cls.flush_counter which yields a no-op SET flush_counter = flush_counter for statuses != NOT_PRODUCED; change the logic in update_status_query so you only add the "flush_counter" key to values when status == repository.EventStatus.NOT_PRODUCED and, in that branch, set values["flush_counter"] to an expression that increments the column (e.g. cls.flush_counter + 1 or sqlalchemy expression) instead of mutating the column object; keep the rest of the update as-is so non-NOT_PRODUCED updates don't include flush_counter.
117-125: Use@declared_attr.directivefor__table_args__to match SQLAlchemy 2.0 best practices and consistency with__tablename__.Line 66 correctly uses
@declared_attr.directivefor__tablename__. Since__table_args__here returns a tuple ofUniqueConstraintobjects that should be instantiated per subclass, it should also use@declared_attr.directiverather than plain@declared_attr.Proposed fix
- `@declared_attr` + `@declared_attr.directive` def __table_args__(self):🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 117 - 125, The __table_args__ declared as a class-level attribute should use `@declared_attr.directive` like __tablename__ to ensure SQLAlchemy 2.0 instantiates the UniqueConstraint per subclass; update the decorator on the __table_args__ method to `@declared_attr.directive` and leave the method body returning the tuple with sqlalchemy.UniqueConstraint("event_id", "event_name", name="event_id_unique_index") unchanged so subclasses receive their own constraint instances.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 283-295: The decorator name on the deprecated helper is misspelled
and the type annotation uses an undefined symbol; change the decorator on
rebind_outbox_model from `@depricated` to `@deprecated` and replace the
DeclarativeMeta annotation on the new_base parameter with typing.Any (or remove
the annotation) so the module no longer raises NameError at import; keep the
rest of rebind_outbox_model unchanged.
---
Nitpick comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Line 226: Rename the misleading plural method _process_events to
_process_event and update its signature and return type accordingly (currently
def _process_events(self, model: OutboxModelMixin) -> repository.OutboxedEvent |
None) so it clearly indicates it handles a single OutboxModelMixin and returns
one OutboxedEvent | None; then update every internal reference/call site to use
_process_event (including any loops or helpers that called _process_events),
adjust any tests or subclass overrides that reference the old name, and ensure
imports/types (OutboxModelMixin, OutboxedEvent, repository) remain correct after
the rename.
- Around line 158-173: update_status_query currently always includes
"flush_counter": cls.flush_counter which yields a no-op SET flush_counter =
flush_counter for statuses != NOT_PRODUCED; change the logic in
update_status_query so you only add the "flush_counter" key to values when
status == repository.EventStatus.NOT_PRODUCED and, in that branch, set
values["flush_counter"] to an expression that increments the column (e.g.
cls.flush_counter + 1 or sqlalchemy expression) instead of mutating the column
object; keep the rest of the update as-is so non-NOT_PRODUCED updates don't
include flush_counter.
- Around line 117-125: The __table_args__ declared as a class-level attribute
should use `@declared_attr.directive` like __tablename__ to ensure SQLAlchemy 2.0
instantiates the UniqueConstraint per subclass; update the decorator on the
__table_args__ method to `@declared_attr.directive` and leave the method body
returning the tuple with sqlalchemy.UniqueConstraint("event_id", "event_name",
name="event_id_unique_index") unchanged so subclasses receive their own
constraint instances.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/cqrs/outbox/sqlalchemy.py (1)
283-294:⚠️ Potential issue | 🟠 MajorRaise
NotImplementedErrorforrebind_outbox_modelinstead of silently corrupting state.This function is already marked
@deprecated, but the deprecation warning alone does not prevent problematic behavior. The technical issues are real:
- Modifying
__bases__after mapper initialization is broken: SQLAlchemy's mapper instrumentation happens at class definition time. Swapping__bases__afterward does not re-trigger__init_subclass__, mapper configuration, or column instrumentation, resulting in a class whose__table__and__bases__point to different mapper registries — silent corruption.- Private API usage: Calls
new_base.metadata._add_table(...), which is not guaranteed stable across minor SQLAlchemy releases.- Misleading type hint:
new_baseis typed asDeclarativeMeta(legacy SQLAlchemy), but SQLAlchemy 2.0 users creating bases viaclass Base(DeclarativeBase): ...would pass a plaintype, making the hint misleading.Since the function is already deprecated and has a clear migration path (
OutboxModelMixin), raiseNotImplementedErrorto prevent accidental corruption by external callers who have not yet migrated.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 283 - 294, The rebind_outbox_model function is unsafe and should raise NotImplementedError instead of mutating class internals; replace the current body of rebind_outbox_model (the function that currently assigns model.__bases__, mutates model.__table__.name and calls new_base.metadata._add_table) with a single raise NotImplementedError explaining the function is deprecated and to use OutboxModelMixin; also adjust the type hint/comments to avoid implying DeclarativeMeta is required (use typing.Any or a neutral note) and remove any uses of private metadata API (_add_table) to prevent silent state corruption.
🧹 Nitpick comments (3)
src/cqrs/outbox/sqlalchemy.py (3)
138-156:.select_from(cls)is redundant whenselect(cls)already establishes the FROM clause.
sqlalchemy.select(cls)already impliesFROM outboxfor the mapped entity. The chained.select_from(cls)is a no-op and adds cognitive noise.♻️ Proposed fix
return ( sqlalchemy.select(cls) - .select_from(cls) .where(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 138 - 156, The query builder unnecessarily calls .select_from(cls) after sqlalchemy.select(cls), which is redundant and noisy; remove the chained .select_from(cls) call in the select construction (the expression starting with sqlalchemy.select(cls) and using cls.event_status, cls.flush_counter, cls.topic, cls.status_sorting_case(), cls.id, and .limit(size)) so the query simply uses sqlalchemy.select(cls).where(...).order_by(...).limit(size) instead.
117-125: Use@declared_attr.directivefor__table_args__to match__tablename__and SQLAlchemy 2.0 convention.
__tablename__(Line 66) already uses@declared_attr.directive, which is the recommended decorator in SQLAlchemy 2.0 for mixin class-level schema directives.__table_args__should follow suit for consistency and to suppress type-checker warnings.♻️ Proposed fix
- `@declared_attr` + `@declared_attr.directive` def __table_args__(self):🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 117 - 125, The __table_args__ mixin currently uses `@declared_attr` but should use `@declared_attr.directive` to match the __tablename__ usage and SQLAlchemy 2.0 conventions; update the decorator on the __table_args__ method to declared_attr.directive while keeping the return value (the sqlalchemy.UniqueConstraint over "event_id" and "event_name" named "event_id_unique_index") unchanged so type-checkers and mixin-level schema directives remain consistent.
158-173:update_status_queryunconditionally adds a no-opflush_counter = flush_counterself-reference for non-NOT_PRODUCEDstatuses.
values["flush_counter"] = cls.flush_counter(the bareInstrumentedAttribute) generatesSET flush_counter = flush_counter, which is a database round-trip with no effect. The cleaner approach is to only includeflush_counterin the update when it actually needs to change.♻️ Proposed fix
`@classmethod` def update_status_query( cls, outboxed_event_id: int, status: repository.EventStatus, ) -> sqlalchemy.Update: - values = { - "event_status": status, - "flush_counter": cls.flush_counter, - } - if status == repository.EventStatus.NOT_PRODUCED: - values["flush_counter"] += 1 + values: dict = {"event_status": status} + if status == repository.EventStatus.NOT_PRODUCED: + values["flush_counter"] = cls.flush_counter + 1 return ( sqlalchemy.update(cls).where(cls.id == outboxed_event_id).values(**values) )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 158 - 173, The update_status_query currently always includes a no-op self-reference by setting values["flush_counter"] = cls.flush_counter; change it so flush_counter is only placed into the values dict when it must change: remove the unconditional assignment and instead, inside the branch for repository.EventStatus.NOT_PRODUCED, set values["flush_counter"] to an increment expression (e.g., cls.flush_counter + 1 or an equivalent SQL expression) so the UPDATE only touches flush_counter when status == NOT_PRODUCED and avoids emitting SET flush_counter = flush_counter for other statuses.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 46-53: The process_bind_param method currently lets non-uuid.UUID
values slip through for non-PostgreSQL dialects which breaks BINARY(16) storage;
update process_bind_param in the SQLAlchemy type to detect str inputs and
convert them to uuid.UUID (e.g., uuid.UUID(value)) before returning .bytes for
MySQL/SQLite, keep returning the original uuid.UUID for PostgreSQL, and raise a
clear TypeError for any other unexpected types so invalid values don't silently
corrupt the DB.
---
Outside diff comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 283-294: The rebind_outbox_model function is unsafe and should
raise NotImplementedError instead of mutating class internals; replace the
current body of rebind_outbox_model (the function that currently assigns
model.__bases__, mutates model.__table__.name and calls
new_base.metadata._add_table) with a single raise NotImplementedError explaining
the function is deprecated and to use OutboxModelMixin; also adjust the type
hint/comments to avoid implying DeclarativeMeta is required (use typing.Any or a
neutral note) and remove any uses of private metadata API (_add_table) to
prevent silent state corruption.
---
Nitpick comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 138-156: The query builder unnecessarily calls .select_from(cls)
after sqlalchemy.select(cls), which is redundant and noisy; remove the chained
.select_from(cls) call in the select construction (the expression starting with
sqlalchemy.select(cls) and using cls.event_status, cls.flush_counter, cls.topic,
cls.status_sorting_case(), cls.id, and .limit(size)) so the query simply uses
sqlalchemy.select(cls).where(...).order_by(...).limit(size) instead.
- Around line 117-125: The __table_args__ mixin currently uses `@declared_attr`
but should use `@declared_attr.directive` to match the __tablename__ usage and
SQLAlchemy 2.0 conventions; update the decorator on the __table_args__ method to
declared_attr.directive while keeping the return value (the
sqlalchemy.UniqueConstraint over "event_id" and "event_name" named
"event_id_unique_index") unchanged so type-checkers and mixin-level schema
directives remain consistent.
- Around line 158-173: The update_status_query currently always includes a no-op
self-reference by setting values["flush_counter"] = cls.flush_counter; change it
so flush_counter is only placed into the values dict when it must change: remove
the unconditional assignment and instead, inside the branch for
repository.EventStatus.NOT_PRODUCED, set values["flush_counter"] to an increment
expression (e.g., cls.flush_counter + 1 or an equivalent SQL expression) so the
UPDATE only touches flush_counter when status == NOT_PRODUCED and avoids
emitting SET flush_counter = flush_counter for other statuses.
| def process_bind_param(self, value, dialect): | ||
| if value is None: | ||
| return value | ||
| if dialect.name == "postgresql": | ||
| return value # asyncpg work with uuid.UUID | ||
| if isinstance(value, uuid.UUID): | ||
| return value.bytes # For MySQL return 16 bytes | ||
| return value |
There was a problem hiding this comment.
process_bind_param silently passes non-uuid.UUID values through for non-PostgreSQL dialects.
If a string UUID (e.g., "550e8400-e29b-...") is passed for MySQL/SQLite, the final return value falls through without conversion, producing a type error or silent data corruption when SQLAlchemy tries to store it in a BINARY(16) column.
🛡️ Proposed fix: add a string-to-UUID conversion guard
def process_bind_param(self, value, dialect):
if value is None:
return value
if dialect.name == "postgresql":
return value # asyncpg works with uuid.UUID
+ if isinstance(value, str):
+ value = uuid.UUID(value)
if isinstance(value, uuid.UUID):
return value.bytes # For MySQL return 16 bytes
return value🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/cqrs/outbox/sqlalchemy.py` around lines 46 - 53, The process_bind_param
method currently lets non-uuid.UUID values slip through for non-PostgreSQL
dialects which breaks BINARY(16) storage; update process_bind_param in the
SQLAlchemy type to detect str inputs and convert them to uuid.UUID (e.g.,
uuid.UUID(value)) before returning .bytes for MySQL/SQLite, keep returning the
original uuid.UUID for PostgreSQL, and raise a clear TypeError for any other
unexpected types so invalid values don't silently corrupt the DB.
Merging this PR will not alter performance
Comparing Footnotes
|
Summary by CodeRabbit