Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/e2e-boot/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ runs:
IMAGE_TAG='"$IMAGE_TAG"' docker compose pull --quiet 2>&1
echo "--- pull done, starting infra ---"
docker compose up -d --no-build \
mongo redis shared-ca zookeeper-certgen zookeeper kafka 2>&1
mongo redis shared-ca kafka 2>&1
echo $? > /tmp/infra-pull.exit
' > /tmp/infra-pull.log 2>&1 &
echo $! > /tmp/infra-pull.pid
Expand Down
4 changes: 1 addition & 3 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ jobs:
- backend
- frontend
- cert-generator
- zookeeper-certgen
steps:
- uses: actions/checkout@v6

Expand Down Expand Up @@ -110,7 +109,6 @@ jobs:
crane copy "$REGISTRY/$PREFIX/backend:$TAG" "$REGISTRY/$PREFIX/backend:latest"
crane copy "$REGISTRY/$PREFIX/frontend:$TAG" "$REGISTRY/$PREFIX/frontend:latest"
crane copy "$REGISTRY/$PREFIX/cert-generator:$TAG" "$REGISTRY/$PREFIX/cert-generator:latest"
crane copy "$REGISTRY/$PREFIX/zookeeper-certgen:$TAG" "$REGISTRY/$PREFIX/zookeeper-certgen:latest"

summary:
name: Summary
Expand Down Expand Up @@ -142,4 +140,4 @@ jobs:
echo "| Frontend | \`docker pull $REGISTRY/$PREFIX/frontend:latest\` |" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### Security Scans" >> $GITHUB_STEP_SUMMARY
echo "All 5 images scanned with Trivy (CRITICAL + HIGH, unfixed ignored)." >> $GITHUB_STEP_SUMMARY
echo "All 4 images scanned with Trivy (CRITICAL + HIGH, unfixed ignored)." >> $GITHUB_STEP_SUMMARY
2 changes: 1 addition & 1 deletion .github/workflows/release-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ jobs:
run: |
PREFIX="${GITHUB_REPOSITORY_OWNER,,}/integr8scode"
VERSION="${{ steps.calver.outputs.version }}"
for img in base backend frontend cert-generator zookeeper-certgen; do
for img in base backend frontend cert-generator; do
crane copy "$REGISTRY/$PREFIX/$img:latest" "$REGISTRY/$PREFIX/$img:$VERSION"
done

Expand Down
19 changes: 3 additions & 16 deletions .github/workflows/stack-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ env:
MONGO_IMAGE: mongo:8.0
REDIS_IMAGE: redis:7-alpine
KAFKA_IMAGE: confluentinc/cp-kafka:7.8.2
ZOOKEEPER_IMAGE: confluentinc/cp-zookeeper:7.8.2
K3S_VERSION: v1.32.11+k3s1
K3S_INSTALL_SHA256: d75e014f2d2ab5d30a318efa5c326f3b0b7596f194afcff90fa7a7a91166d5f7

Expand Down Expand Up @@ -184,16 +183,6 @@ jobs:
cache-from: type=gha,scope=cert-generator
cache-to: type=gha,mode=max,scope=cert-generator

- name: Build zookeeper-certgen image
uses: docker/build-push-action@v6
with:
context: ./backend/zookeeper
file: ./backend/zookeeper/Dockerfile.certgen
load: true
tags: integr8scode-zookeeper-certgen:latest
cache-from: type=gha,scope=zookeeper-certgen
cache-to: type=gha,mode=max,scope=zookeeper-certgen

# ── Frontend (nginx + SSL) ─────────────────────────────────────
- name: Build frontend image
uses: docker/build-push-action@v6
Expand All @@ -216,13 +205,11 @@ jobs:
docker tag integr8scode-base:latest "$IMG/base:$TAG"
docker tag integr8scode-backend:latest "$IMG/backend:$TAG"
docker tag integr8scode-cert-generator:latest "$IMG/cert-generator:$TAG"
docker tag integr8scode-zookeeper-certgen:latest "$IMG/zookeeper-certgen:$TAG"
docker tag integr8scode-frontend:latest "$IMG/frontend:$TAG"

# Push all 5 images in parallel, tracking each PID
# Push all 4 images in parallel, tracking each PID
declare -A PIDS
for name in base backend cert-generator zookeeper-certgen \
frontend; do
for name in base backend cert-generator frontend; do
docker push "$IMG/$name:$TAG" &
PIDS[$name]=$!
done
Expand Down Expand Up @@ -286,7 +273,7 @@ jobs:
run: |
mkdir -p logs
docker compose logs --timestamps > logs/docker-compose.log 2>&1
for svc in backend mongo redis kafka zookeeper \
for svc in backend mongo redis kafka \
coordinator k8s-worker pod-monitor result-processor \
saga-orchestrator event-replay dlq-processor; do
docker compose logs --timestamps "$svc" > "logs/$svc.log" 2>&1 || true
Expand Down
41 changes: 39 additions & 2 deletions backend/app/db/repositories/saga_repository.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
import dataclasses
from datetime import datetime, timezone
from typing import Any
from uuid import uuid4

from beanie.exceptions import RevisionIdWasChanged
from beanie.odm.enums import SortDirection
from beanie.odm.operators.find import BaseFindOperator
from beanie.odm.queries.update import UpdateResponse
from beanie.operators import GT, LT, NE, Eq, In
from beanie.operators import GT, LT, NE, Eq, In, Set
from monggregate import Pipeline, S

from app.db.docs import ExecutionDocument, SagaDocument
from app.domain.enums import SagaState
from app.domain.saga import Saga, SagaConcurrencyError, SagaContextData, SagaFilter, SagaListResult, SagaNotFoundError
from app.domain.saga import (
Saga,
SagaConcurrencyError,
SagaContextData,
SagaFilter,
SagaInvalidStateError,
SagaListResult,
SagaNotFoundError,
)

_saga_fields = set(Saga.__dataclass_fields__)

Expand Down Expand Up @@ -65,6 +74,34 @@ async def save_saga(self, saga_id: str, **updates: Any) -> Saga:
raise SagaConcurrencyError(saga_id) from exc
return self._to_domain(doc)

async def atomic_cancel_saga(
self, saga_id: str, error_message: str, completed_at: datetime,
) -> Saga:
"""Atomically cancel a saga using findOneAndUpdate.

Bumps revision_id so any concurrent save_changes() (which filters on
the old revision_id) will see a mismatch and raise RevisionIdWasChanged.
"""
doc = await SagaDocument.find_one(
SagaDocument.saga_id == saga_id,
In(SagaDocument.state, [SagaState.RUNNING, SagaState.CREATED]),
).update(
Set({ # type: ignore[no-untyped-call]
SagaDocument.state: SagaState.CANCELLED,
SagaDocument.error_message: error_message,
SagaDocument.completed_at: completed_at,
SagaDocument.updated_at: datetime.now(timezone.utc),
"revision_id": uuid4(),
}),
response_type=UpdateResponse.NEW_DOCUMENT,
)
if not doc:
existing = await SagaDocument.find_one(SagaDocument.saga_id == saga_id)
if not existing:
raise SagaNotFoundError(saga_id)
raise SagaInvalidStateError(saga_id, existing.state, "cancel")
return self._to_domain(doc)

async def get_or_create_saga(self, saga: Saga) -> tuple[Saga, bool]:
"""Atomically get or create a saga by (execution_id, saga_name).

Expand Down
26 changes: 10 additions & 16 deletions backend/app/services/saga/saga_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
SagaConcurrencyError,
SagaConfig,
SagaContextData,
SagaInvalidStateError,
SagaNotFoundError,
)
from app.events.core import UnifiedProducer

Expand Down Expand Up @@ -282,27 +280,23 @@ async def get_execution_sagas(self, execution_id: str) -> list[Saga]:
async def cancel_saga(self, saga_id: str) -> None:
"""Cancel a running saga and trigger compensation.

Uses an atomic findOneAndUpdate to set state=CANCELLED, eliminating
revision-check races with the step-execution loop.

Raises SagaNotFoundError if saga doesn't exist.
Raises SagaInvalidStateError if saga is not in a cancellable state.
Raises SagaConcurrencyError if saga was modified concurrently.
"""
saga_instance = await self.get_saga_status(saga_id)
if not saga_instance:
raise SagaNotFoundError(saga_id)

if saga_instance.state not in (SagaState.RUNNING, SagaState.CREATED):
raise SagaInvalidStateError(saga_id, saga_instance.state, "cancel")
saved = await self._repo.atomic_cancel_saga(
saga_id,
error_message="Saga cancelled by user request",
completed_at=datetime.now(UTC),
)

self.logger.info(
"Saga cancellation initiated",
saga_id=saga_id,
execution_id=saga_instance.execution_id,
user_id=saga_instance.context_data.user_id,
)

saved = await self._repo.save_saga(
saga_id, state=SagaState.CANCELLED,
error_message="Saga cancelled by user request", completed_at=datetime.now(UTC),
execution_id=saved.execution_id,
user_id=saved.context_data.user_id,
)

if self._producer and self.config.store_events:
Expand Down
29 changes: 12 additions & 17 deletions backend/app/services/sse/sse_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from app.core.metrics import ConnectionMetrics
from app.db.repositories import SSERepository
from app.domain.enums import EventType, NotificationChannel, SSEControlEvent
from app.domain.events import ResourceUsageDomain
from app.domain.execution.models import ExecutionResultDomain
from app.domain.sse import (
DomainNotificationSSEPayload,
Expand Down Expand Up @@ -128,7 +127,12 @@ async def create_execution_stream(self, execution_id: str, user_id: str) -> Asyn
self.logger.info("SSE connection closed", execution_id=execution_id)

async def _build_sse_event_from_redis(self, execution_id: str, msg: RedisSSEMessage) -> SSEExecutionEventData:
"""Build typed SSE event from Redis message."""
"""Build typed SSE event from Redis message.

Uses validate_python to coerce JSON primitives (str → enum, ISO str → datetime,
dict → dataclass) back to proper types after the Redis JSON round-trip.
Extra keys from the original domain event are ignored.
"""
result: ExecutionResultDomain | None = None
if msg.event_type == EventType.RESULT_STORED:
execution = await self.repository.get_execution(execution_id)
Expand All @@ -142,21 +146,12 @@ async def _build_sse_event_from_redis(self, execution_id: str, msg: RedisSSEMess
execution_id=execution_id,
)

ru = msg.data.get("resource_usage")
return SSEExecutionEventData(
event_type=msg.data.get("event_type", msg.event_type),
execution_id=execution_id,
timestamp=msg.data.get("timestamp"),
event_id=msg.data.get("event_id"),
status=msg.data.get("status"),
stdout=msg.data.get("stdout"),
stderr=msg.data.get("stderr"),
exit_code=msg.data.get("exit_code"),
timeout_seconds=msg.data.get("timeout_seconds"),
message=msg.data.get("message"),
resource_usage=ResourceUsageDomain(**ru) if ru else None,
result=result,
)
return _sse_event_adapter.validate_python({
**msg.data,
"event_type": msg.event_type,
"execution_id": execution_id,
"result": result,
})

async def create_notification_stream(self, user_id: str) -> AsyncGenerator[dict[str, Any], None]:
subscription: SSERedisSubscription | None = None
Expand Down
8 changes: 5 additions & 3 deletions backend/tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from aiokafka import AIOKafkaConsumer
from app.db.docs.saga import SagaDocument
from app.domain.enums import EventType, KafkaTopic, UserRole
from app.domain.events import DomainEvent, DomainEventAdapter
from app.domain.events import DomainEvent, DomainEventAdapter, SagaStartedEvent
from app.schemas_pydantic.execution import ExecutionRequest, ExecutionResponse
from app.schemas_pydantic.notification import NotificationListResponse, NotificationResponse
from app.schemas_pydantic.saga import SagaStatusResponse
Expand Down Expand Up @@ -110,15 +110,17 @@ async def wait_for_saga_command(self, execution_id: str, timeout: float = 15.0)
timeout=timeout,
)

async def wait_for_saga_started(self, execution_id: str, timeout: float = 15.0) -> DomainEvent:
async def wait_for_saga_started(self, execution_id: str, timeout: float = 15.0) -> SagaStartedEvent:
"""Wait for SAGA_STARTED — saga document is guaranteed in MongoDB after this."""
return await self.wait_for(
event = await self.wait_for(
lambda e: (
e.event_type == EventType.SAGA_STARTED
and e.execution_id == execution_id
),
timeout=timeout,
)
assert isinstance(event, SagaStartedEvent)
return event

async def wait_for_notification_created(self, execution_id: str, timeout: float = 15.0) -> DomainEvent:
"""Wait for NOTIFICATION_CREATED — notification is guaranteed in MongoDB after this."""
Expand Down
20 changes: 9 additions & 11 deletions backend/tests/e2e/test_saga_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,25 +201,26 @@ async def test_cancel_saga(
assert exec_response.status_code == 200

execution = ExecutionResponse.model_validate(exec_response.json())

# Get saga_id from SAGA_STARTED event (published after saga persisted)
started = await event_waiter.wait_for_saga_started(execution.execution_id)

# Wait for CREATE_POD_COMMAND — the orchestrator's last step.
# After this the orchestrator is idle, so cancel won't race with
# concurrent step-processing writes to the saga document.
await event_waiter.wait_for_saga_command(execution.execution_id)

saga_resp = await test_user.get(f"/api/v1/sagas/execution/{execution.execution_id}")
saga = SagaListResponse.model_validate(saga_resp.json()).sagas[0]

response = await test_user.post(f"/api/v1/sagas/{saga.saga_id}/cancel")
response = await test_user.post(f"/api/v1/sagas/{started.saga_id}/cancel")

assert response.status_code == 200
result = SagaCancellationResponse.model_validate(response.json())
assert result.saga_id == saga.saga_id
assert result.saga_id == started.saga_id
assert result.success is True
assert result.message is not None

# cancel_saga sets state to CANCELLED synchronously in MongoDB
# before returning the HTTP response (compensation also runs inline).
status_resp = await test_user.get(f"/api/v1/sagas/{saga.saga_id}")
status_resp = await test_user.get(f"/api/v1/sagas/{started.saga_id}")
assert status_resp.status_code == 200
updated_saga = SagaStatusResponse.model_validate(status_resp.json())
assert updated_saga.state == SagaState.CANCELLED
Expand Down Expand Up @@ -250,12 +251,9 @@ async def test_cancel_other_users_saga_forbidden(
assert exec_response.status_code == 200

execution = ExecutionResponse.model_validate(exec_response.json())
await event_waiter.wait_for_saga_started(execution.execution_id)

saga_resp = await test_user.get(f"/api/v1/sagas/execution/{execution.execution_id}")
saga = SagaListResponse.model_validate(saga_resp.json()).sagas[0]
started = await event_waiter.wait_for_saga_started(execution.execution_id)

response = await another_user.post(f"/api/v1/sagas/{saga.saga_id}/cancel")
response = await another_user.post(f"/api/v1/sagas/{started.saga_id}/cancel")

assert response.status_code == 403

Expand Down
12 changes: 0 additions & 12 deletions backend/zookeeper/Dockerfile.certgen

This file was deleted.

21 changes: 0 additions & 21 deletions backend/zookeeper/conf/log4j.properties

This file was deleted.

29 changes: 0 additions & 29 deletions backend/zookeeper/conf/zoo.cfg

This file was deleted.

Loading
Loading