Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
echo "image=${{ env.REGISTRY }}/$PREFIX/${{ matrix.image }}:$TAG" >> $GITHUB_OUTPUT

- name: Run Trivy vulnerability scanner
uses: aquasecurity/trivy-action@0.33.1
uses: aquasecurity/trivy-action@0.34.0
with:
image-ref: ${{ steps.ref.outputs.image }}
format: 'sarif'
Expand Down
1 change: 1 addition & 0 deletions backend/app/db/docs/saga.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class SagaDocument(Document):
class Settings:
name = "sagas"
use_state_management = True
use_revision = True
bson_encoders = {SagaContextData: dataclasses.asdict}
indexes = [
IndexModel([("state", ASCENDING)], name="idx_saga_state"),
Expand Down
32 changes: 23 additions & 9 deletions backend/app/db/repositories/saga_repository.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import dataclasses
from datetime import datetime
from datetime import datetime, timezone
from typing import Any

from beanie.exceptions import RevisionIdWasChanged
from beanie.odm.enums import SortDirection
from beanie.odm.operators.find import BaseFindOperator
from beanie.odm.queries.update import UpdateResponse
Expand All @@ -10,7 +11,7 @@

from app.db.docs import ExecutionDocument, SagaDocument
from app.domain.enums import SagaState
from app.domain.saga import Saga, SagaContextData, SagaFilter, SagaListResult
from app.domain.saga import Saga, SagaConcurrencyError, SagaContextData, SagaFilter, SagaListResult, SagaNotFoundError

_saga_fields = set(Saga.__dataclass_fields__)

Expand Down Expand Up @@ -43,13 +44,26 @@ def _filter_conditions(self, saga_filter: SagaFilter) -> list[BaseFindOperator]:
conditions.append(Eq(SagaDocument.error_message, None))
return conditions

async def upsert_saga(self, saga: Saga) -> bool:
existing = await SagaDocument.find_one(SagaDocument.saga_id == saga.saga_id)
doc = SagaDocument(**dataclasses.asdict(saga))
if existing:
doc.id = existing.id
await doc.save()
return existing is not None
async def save_saga(self, saga_id: str, **updates: Any) -> Saga:
"""Load document, apply partial updates, and persist via save_changes().

Only the provided fields are written (Beanie diffs against the loaded
snapshot), so concurrent changes to *other* fields are never overwritten.

Raises SagaConcurrencyError if the document was modified between load
and save (revision mismatch).
"""
doc = await SagaDocument.find_one(SagaDocument.saga_id == saga_id)
if not doc:
raise SagaNotFoundError(saga_id)
for field, value in updates.items():
setattr(doc, field, value)
doc.updated_at = datetime.now(timezone.utc)
try:
await doc.save_changes()
except RevisionIdWasChanged as exc:
raise SagaConcurrencyError(saga_id) from exc
return self._to_domain(doc)

async def get_or_create_saga(self, saga: Saga) -> tuple[Saga, bool]:
"""Atomically get or create a saga by (execution_id, saga_name).
Expand Down
4 changes: 2 additions & 2 deletions backend/app/services/notification_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,13 @@ async def create_notification(
# Save to database
notification = await self.repository.create_notification(create_data)

await self._publish_notification_created_event(notification)

# Deliver immediately if not scheduled; scheduled notifications are
# picked up by the NotificationScheduler worker.
if scheduled_for is None:
await self._deliver_notification(notification)

await self._publish_notification_created_event(notification)

return notification

async def _publish_notification_created_event(self, notification: DomainNotification) -> None:
Expand Down
Loading
Loading