diff --git a/.github/actions/e2e-boot/action.yml b/.github/actions/e2e-boot/action.yml
index 39d10df9..e6d49b1a 100644
--- a/.github/actions/e2e-boot/action.yml
+++ b/.github/actions/e2e-boot/action.yml
@@ -28,7 +28,7 @@ runs:
IMAGE_TAG='"$IMAGE_TAG"' docker compose pull --quiet 2>&1
echo "--- pull done, starting infra ---"
docker compose up -d --no-build \
- mongo redis shared-ca zookeeper-certgen zookeeper kafka 2>&1
+ mongo redis shared-ca kafka 2>&1
echo $? > /tmp/infra-pull.exit
' > /tmp/infra-pull.log 2>&1 &
echo $! > /tmp/infra-pull.pid
diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml
index 61b185e9..e40814e8 100644
--- a/.github/workflows/docker.yml
+++ b/.github/workflows/docker.yml
@@ -36,7 +36,6 @@ jobs:
- backend
- frontend
- cert-generator
- - zookeeper-certgen
steps:
- uses: actions/checkout@v6
@@ -110,7 +109,6 @@ jobs:
crane copy "$REGISTRY/$PREFIX/backend:$TAG" "$REGISTRY/$PREFIX/backend:latest"
crane copy "$REGISTRY/$PREFIX/frontend:$TAG" "$REGISTRY/$PREFIX/frontend:latest"
crane copy "$REGISTRY/$PREFIX/cert-generator:$TAG" "$REGISTRY/$PREFIX/cert-generator:latest"
- crane copy "$REGISTRY/$PREFIX/zookeeper-certgen:$TAG" "$REGISTRY/$PREFIX/zookeeper-certgen:latest"
summary:
name: Summary
@@ -142,4 +140,4 @@ jobs:
echo "| Frontend | \`docker pull $REGISTRY/$PREFIX/frontend:latest\` |" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### Security Scans" >> $GITHUB_STEP_SUMMARY
- echo "All 5 images scanned with Trivy (CRITICAL + HIGH, unfixed ignored)." >> $GITHUB_STEP_SUMMARY
+ echo "All 4 images scanned with Trivy (CRITICAL + HIGH, unfixed ignored)." >> $GITHUB_STEP_SUMMARY
diff --git a/.github/workflows/release-deploy.yml b/.github/workflows/release-deploy.yml
index 3bb636db..58596f5e 100644
--- a/.github/workflows/release-deploy.yml
+++ b/.github/workflows/release-deploy.yml
@@ -80,7 +80,7 @@ jobs:
run: |
PREFIX="${GITHUB_REPOSITORY_OWNER,,}/integr8scode"
VERSION="${{ steps.calver.outputs.version }}"
- for img in base backend frontend cert-generator zookeeper-certgen; do
+ for img in base backend frontend cert-generator; do
crane copy "$REGISTRY/$PREFIX/$img:latest" "$REGISTRY/$PREFIX/$img:$VERSION"
done
diff --git a/.github/workflows/stack-tests.yml b/.github/workflows/stack-tests.yml
index 9f695916..0ac76acd 100644
--- a/.github/workflows/stack-tests.yml
+++ b/.github/workflows/stack-tests.yml
@@ -29,7 +29,6 @@ env:
MONGO_IMAGE: mongo:8.0
REDIS_IMAGE: redis:7-alpine
KAFKA_IMAGE: confluentinc/cp-kafka:7.8.2
- ZOOKEEPER_IMAGE: confluentinc/cp-zookeeper:7.8.2
K3S_VERSION: v1.32.11+k3s1
K3S_INSTALL_SHA256: d75e014f2d2ab5d30a318efa5c326f3b0b7596f194afcff90fa7a7a91166d5f7
@@ -184,16 +183,6 @@ jobs:
cache-from: type=gha,scope=cert-generator
cache-to: type=gha,mode=max,scope=cert-generator
- - name: Build zookeeper-certgen image
- uses: docker/build-push-action@v6
- with:
- context: ./backend/zookeeper
- file: ./backend/zookeeper/Dockerfile.certgen
- load: true
- tags: integr8scode-zookeeper-certgen:latest
- cache-from: type=gha,scope=zookeeper-certgen
- cache-to: type=gha,mode=max,scope=zookeeper-certgen
-
# ── Frontend (nginx + SSL) ─────────────────────────────────────
- name: Build frontend image
uses: docker/build-push-action@v6
@@ -216,13 +205,11 @@ jobs:
docker tag integr8scode-base:latest "$IMG/base:$TAG"
docker tag integr8scode-backend:latest "$IMG/backend:$TAG"
docker tag integr8scode-cert-generator:latest "$IMG/cert-generator:$TAG"
- docker tag integr8scode-zookeeper-certgen:latest "$IMG/zookeeper-certgen:$TAG"
docker tag integr8scode-frontend:latest "$IMG/frontend:$TAG"
- # Push all 5 images in parallel, tracking each PID
+ # Push all 4 images in parallel, tracking each PID
declare -A PIDS
- for name in base backend cert-generator zookeeper-certgen \
- frontend; do
+ for name in base backend cert-generator frontend; do
docker push "$IMG/$name:$TAG" &
PIDS[$name]=$!
done
@@ -286,7 +273,7 @@ jobs:
run: |
mkdir -p logs
docker compose logs --timestamps > logs/docker-compose.log 2>&1
- for svc in backend mongo redis kafka zookeeper \
+ for svc in backend mongo redis kafka \
coordinator k8s-worker pod-monitor result-processor \
saga-orchestrator event-replay dlq-processor; do
docker compose logs --timestamps "$svc" > "logs/$svc.log" 2>&1 || true
diff --git a/backend/app/db/repositories/saga_repository.py b/backend/app/db/repositories/saga_repository.py
index ac453232..c998288d 100644
--- a/backend/app/db/repositories/saga_repository.py
+++ b/backend/app/db/repositories/saga_repository.py
@@ -1,17 +1,26 @@
import dataclasses
from datetime import datetime, timezone
from typing import Any
+from uuid import uuid4
from beanie.exceptions import RevisionIdWasChanged
from beanie.odm.enums import SortDirection
from beanie.odm.operators.find import BaseFindOperator
from beanie.odm.queries.update import UpdateResponse
-from beanie.operators import GT, LT, NE, Eq, In
+from beanie.operators import GT, LT, NE, Eq, In, Set
from monggregate import Pipeline, S
from app.db.docs import ExecutionDocument, SagaDocument
from app.domain.enums import SagaState
-from app.domain.saga import Saga, SagaConcurrencyError, SagaContextData, SagaFilter, SagaListResult, SagaNotFoundError
+from app.domain.saga import (
+ Saga,
+ SagaConcurrencyError,
+ SagaContextData,
+ SagaFilter,
+ SagaInvalidStateError,
+ SagaListResult,
+ SagaNotFoundError,
+)
_saga_fields = set(Saga.__dataclass_fields__)
@@ -65,6 +74,34 @@ async def save_saga(self, saga_id: str, **updates: Any) -> Saga:
raise SagaConcurrencyError(saga_id) from exc
return self._to_domain(doc)
+ async def atomic_cancel_saga(
+ self, saga_id: str, error_message: str, completed_at: datetime,
+ ) -> Saga:
+ """Atomically cancel a saga using findOneAndUpdate.
+
+ Bumps revision_id so any concurrent save_changes() (which filters on
+ the old revision_id) will see a mismatch and raise RevisionIdWasChanged.
+ """
+ doc = await SagaDocument.find_one(
+ SagaDocument.saga_id == saga_id,
+ In(SagaDocument.state, [SagaState.RUNNING, SagaState.CREATED]),
+ ).update(
+ Set({ # type: ignore[no-untyped-call]
+ SagaDocument.state: SagaState.CANCELLED,
+ SagaDocument.error_message: error_message,
+ SagaDocument.completed_at: completed_at,
+ SagaDocument.updated_at: datetime.now(timezone.utc),
+ "revision_id": uuid4(),
+ }),
+ response_type=UpdateResponse.NEW_DOCUMENT,
+ )
+ if not doc:
+ existing = await SagaDocument.find_one(SagaDocument.saga_id == saga_id)
+ if not existing:
+ raise SagaNotFoundError(saga_id)
+ raise SagaInvalidStateError(saga_id, existing.state, "cancel")
+ return self._to_domain(doc)
+
async def get_or_create_saga(self, saga: Saga) -> tuple[Saga, bool]:
"""Atomically get or create a saga by (execution_id, saga_name).
diff --git a/backend/app/services/saga/saga_orchestrator.py b/backend/app/services/saga/saga_orchestrator.py
index 1a796882..c27d23cd 100644
--- a/backend/app/services/saga/saga_orchestrator.py
+++ b/backend/app/services/saga/saga_orchestrator.py
@@ -23,8 +23,6 @@
SagaConcurrencyError,
SagaConfig,
SagaContextData,
- SagaInvalidStateError,
- SagaNotFoundError,
)
from app.events.core import UnifiedProducer
@@ -282,27 +280,23 @@ async def get_execution_sagas(self, execution_id: str) -> list[Saga]:
async def cancel_saga(self, saga_id: str) -> None:
"""Cancel a running saga and trigger compensation.
+ Uses an atomic findOneAndUpdate to set state=CANCELLED, eliminating
+ revision-check races with the step-execution loop.
+
Raises SagaNotFoundError if saga doesn't exist.
Raises SagaInvalidStateError if saga is not in a cancellable state.
- Raises SagaConcurrencyError if saga was modified concurrently.
"""
- saga_instance = await self.get_saga_status(saga_id)
- if not saga_instance:
- raise SagaNotFoundError(saga_id)
-
- if saga_instance.state not in (SagaState.RUNNING, SagaState.CREATED):
- raise SagaInvalidStateError(saga_id, saga_instance.state, "cancel")
+ saved = await self._repo.atomic_cancel_saga(
+ saga_id,
+ error_message="Saga cancelled by user request",
+ completed_at=datetime.now(UTC),
+ )
self.logger.info(
"Saga cancellation initiated",
saga_id=saga_id,
- execution_id=saga_instance.execution_id,
- user_id=saga_instance.context_data.user_id,
- )
-
- saved = await self._repo.save_saga(
- saga_id, state=SagaState.CANCELLED,
- error_message="Saga cancelled by user request", completed_at=datetime.now(UTC),
+ execution_id=saved.execution_id,
+ user_id=saved.context_data.user_id,
)
if self._producer and self.config.store_events:
diff --git a/backend/app/services/sse/sse_service.py b/backend/app/services/sse/sse_service.py
index c698ec0e..6bac8e09 100644
--- a/backend/app/services/sse/sse_service.py
+++ b/backend/app/services/sse/sse_service.py
@@ -9,7 +9,6 @@
from app.core.metrics import ConnectionMetrics
from app.db.repositories import SSERepository
from app.domain.enums import EventType, NotificationChannel, SSEControlEvent
-from app.domain.events import ResourceUsageDomain
from app.domain.execution.models import ExecutionResultDomain
from app.domain.sse import (
DomainNotificationSSEPayload,
@@ -128,7 +127,12 @@ async def create_execution_stream(self, execution_id: str, user_id: str) -> Asyn
self.logger.info("SSE connection closed", execution_id=execution_id)
async def _build_sse_event_from_redis(self, execution_id: str, msg: RedisSSEMessage) -> SSEExecutionEventData:
- """Build typed SSE event from Redis message."""
+ """Build typed SSE event from Redis message.
+
+ Uses validate_python to coerce JSON primitives (str → enum, ISO str → datetime,
+ dict → dataclass) back to proper types after the Redis JSON round-trip.
+ Extra keys from the original domain event are ignored.
+ """
result: ExecutionResultDomain | None = None
if msg.event_type == EventType.RESULT_STORED:
execution = await self.repository.get_execution(execution_id)
@@ -142,21 +146,12 @@ async def _build_sse_event_from_redis(self, execution_id: str, msg: RedisSSEMess
execution_id=execution_id,
)
- ru = msg.data.get("resource_usage")
- return SSEExecutionEventData(
- event_type=msg.data.get("event_type", msg.event_type),
- execution_id=execution_id,
- timestamp=msg.data.get("timestamp"),
- event_id=msg.data.get("event_id"),
- status=msg.data.get("status"),
- stdout=msg.data.get("stdout"),
- stderr=msg.data.get("stderr"),
- exit_code=msg.data.get("exit_code"),
- timeout_seconds=msg.data.get("timeout_seconds"),
- message=msg.data.get("message"),
- resource_usage=ResourceUsageDomain(**ru) if ru else None,
- result=result,
- )
+ return _sse_event_adapter.validate_python({
+ **msg.data,
+ "event_type": msg.event_type,
+ "execution_id": execution_id,
+ "result": result,
+ })
async def create_notification_stream(self, user_id: str) -> AsyncGenerator[dict[str, Any], None]:
subscription: SSERedisSubscription | None = None
diff --git a/backend/tests/e2e/conftest.py b/backend/tests/e2e/conftest.py
index 57fbd95e..f2e7ff8e 100644
--- a/backend/tests/e2e/conftest.py
+++ b/backend/tests/e2e/conftest.py
@@ -10,7 +10,7 @@
from aiokafka import AIOKafkaConsumer
from app.db.docs.saga import SagaDocument
from app.domain.enums import EventType, KafkaTopic, UserRole
-from app.domain.events import DomainEvent, DomainEventAdapter
+from app.domain.events import DomainEvent, DomainEventAdapter, SagaStartedEvent
from app.schemas_pydantic.execution import ExecutionRequest, ExecutionResponse
from app.schemas_pydantic.notification import NotificationListResponse, NotificationResponse
from app.schemas_pydantic.saga import SagaStatusResponse
@@ -110,15 +110,17 @@ async def wait_for_saga_command(self, execution_id: str, timeout: float = 15.0)
timeout=timeout,
)
- async def wait_for_saga_started(self, execution_id: str, timeout: float = 15.0) -> DomainEvent:
+ async def wait_for_saga_started(self, execution_id: str, timeout: float = 15.0) -> SagaStartedEvent:
"""Wait for SAGA_STARTED — saga document is guaranteed in MongoDB after this."""
- return await self.wait_for(
+ event = await self.wait_for(
lambda e: (
e.event_type == EventType.SAGA_STARTED
and e.execution_id == execution_id
),
timeout=timeout,
)
+ assert isinstance(event, SagaStartedEvent)
+ return event
async def wait_for_notification_created(self, execution_id: str, timeout: float = 15.0) -> DomainEvent:
"""Wait for NOTIFICATION_CREATED — notification is guaranteed in MongoDB after this."""
diff --git a/backend/tests/e2e/test_saga_routes.py b/backend/tests/e2e/test_saga_routes.py
index 6378520e..120fe9e7 100644
--- a/backend/tests/e2e/test_saga_routes.py
+++ b/backend/tests/e2e/test_saga_routes.py
@@ -201,25 +201,26 @@ async def test_cancel_saga(
assert exec_response.status_code == 200
execution = ExecutionResponse.model_validate(exec_response.json())
+
+ # Get saga_id from SAGA_STARTED event (published after saga persisted)
+ started = await event_waiter.wait_for_saga_started(execution.execution_id)
+
# Wait for CREATE_POD_COMMAND — the orchestrator's last step.
# After this the orchestrator is idle, so cancel won't race with
# concurrent step-processing writes to the saga document.
await event_waiter.wait_for_saga_command(execution.execution_id)
- saga_resp = await test_user.get(f"/api/v1/sagas/execution/{execution.execution_id}")
- saga = SagaListResponse.model_validate(saga_resp.json()).sagas[0]
-
- response = await test_user.post(f"/api/v1/sagas/{saga.saga_id}/cancel")
+ response = await test_user.post(f"/api/v1/sagas/{started.saga_id}/cancel")
assert response.status_code == 200
result = SagaCancellationResponse.model_validate(response.json())
- assert result.saga_id == saga.saga_id
+ assert result.saga_id == started.saga_id
assert result.success is True
assert result.message is not None
# cancel_saga sets state to CANCELLED synchronously in MongoDB
# before returning the HTTP response (compensation also runs inline).
- status_resp = await test_user.get(f"/api/v1/sagas/{saga.saga_id}")
+ status_resp = await test_user.get(f"/api/v1/sagas/{started.saga_id}")
assert status_resp.status_code == 200
updated_saga = SagaStatusResponse.model_validate(status_resp.json())
assert updated_saga.state == SagaState.CANCELLED
@@ -250,12 +251,9 @@ async def test_cancel_other_users_saga_forbidden(
assert exec_response.status_code == 200
execution = ExecutionResponse.model_validate(exec_response.json())
- await event_waiter.wait_for_saga_started(execution.execution_id)
-
- saga_resp = await test_user.get(f"/api/v1/sagas/execution/{execution.execution_id}")
- saga = SagaListResponse.model_validate(saga_resp.json()).sagas[0]
+ started = await event_waiter.wait_for_saga_started(execution.execution_id)
- response = await another_user.post(f"/api/v1/sagas/{saga.saga_id}/cancel")
+ response = await another_user.post(f"/api/v1/sagas/{started.saga_id}/cancel")
assert response.status_code == 403
diff --git a/backend/zookeeper/Dockerfile.certgen b/backend/zookeeper/Dockerfile.certgen
deleted file mode 100644
index cb0c7756..00000000
--- a/backend/zookeeper/Dockerfile.certgen
+++ /dev/null
@@ -1,12 +0,0 @@
-FROM alpine:3.18
-
-RUN apk add --no-cache openssl openjdk11-jre-headless
-
-# Create certificate directory
-RUN mkdir -p /certs
-
-# Copy certificate generation script
-COPY generate-certs.sh /generate-certs.sh
-RUN chmod +x /generate-certs.sh
-
-CMD ["/generate-certs.sh"]
\ No newline at end of file
diff --git a/backend/zookeeper/conf/log4j.properties b/backend/zookeeper/conf/log4j.properties
deleted file mode 100644
index 2b37b2a7..00000000
--- a/backend/zookeeper/conf/log4j.properties
+++ /dev/null
@@ -1,21 +0,0 @@
-# Zookeeper logging configuration
-log4j.rootLogger=WARN, CONSOLE, ROLLINGFILE
-
-# Console appender
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.Threshold=WARN
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
-
-# Rolling file appender
-log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
-log4j.appender.ROLLINGFILE.Threshold=INFO
-log4j.appender.ROLLINGFILE.File=/var/log/zookeeper/zookeeper.log
-log4j.appender.ROLLINGFILE.MaxFileSize=10MB
-log4j.appender.ROLLINGFILE.MaxBackupIndex=10
-log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
-log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
-
-# Set specific loggers
-log4j.logger.org.apache.zookeeper.server.auth=INFO
-log4j.logger.org.apache.zookeeper.audit=INFO
\ No newline at end of file
diff --git a/backend/zookeeper/conf/zoo.cfg b/backend/zookeeper/conf/zoo.cfg
deleted file mode 100644
index 496e14d3..00000000
--- a/backend/zookeeper/conf/zoo.cfg
+++ /dev/null
@@ -1,29 +0,0 @@
-# Zookeeper configuration file
-tickTime=2000
-dataDir=/var/lib/zookeeper/data
-dataLogDir=/var/lib/zookeeper/log
-clientPort=2181
-secureClientPort=2281
-serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
-authProvider.1=org.apache.zookeeper.server.auth.DigestAuthenticationProvider
-ssl.keyStore.location=/etc/kafka/certs/zookeeper.keystore.jks
-ssl.keyStore.password=zookeeper_keystore_password
-ssl.trustStore.location=/etc/kafka/certs/zookeeper.truststore.jks
-ssl.trustStore.password=zookeeper_truststore_password
-ssl.clientAuth=need
-sslQuorum=true
-
-# Autopurge settings
-autopurge.snapRetainCount=3
-autopurge.purgeInterval=24
-
-# Admin server settings
-admin.enableServer=false
-admin.serverPort=0
-
-# 4lw commands
-4lw.commands.whitelist=*
-
-# Metrics
-metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
-metricsProvider.httpPort=7070
\ No newline at end of file
diff --git a/backend/zookeeper/generate-certs.sh b/backend/zookeeper/generate-certs.sh
deleted file mode 100644
index 8cd4294b..00000000
--- a/backend/zookeeper/generate-certs.sh
+++ /dev/null
@@ -1,52 +0,0 @@
-#!/bin/sh
-set -e
-
-# Check if certificates already exist
-if [ -f /certs/zookeeper.keystore.jks ] && [ -f /certs/zookeeper.truststore.jks ]; then
- echo "Certificates already exist, skipping generation"
- exit 0
-fi
-
-# Generate private key
-openssl genrsa -out /certs/zookeeper.key 2048
-
-# Generate certificate signing request
-openssl req -new -key /certs/zookeeper.key -out /certs/zookeeper.csr \
- -subj "/C=US/ST=CA/L=SF/O=Integr8sCode/CN=zookeeper"
-
-# Generate self-signed certificate
-openssl x509 -req -days 365 -in /certs/zookeeper.csr \
- -signkey /certs/zookeeper.key -out /certs/zookeeper.crt
-
-# Create PKCS12 keystore
-openssl pkcs12 -export -in /certs/zookeeper.crt -inkey /certs/zookeeper.key \
- -out /certs/zookeeper.p12 -name zookeeper \
- -password pass:zookeeper_keystore_password
-
-# Remove existing keystore if it exists
-rm -f /certs/zookeeper.keystore.jks
-
-# Convert PKCS12 to JKS
-keytool -importkeystore \
- -deststorepass zookeeper_keystore_password \
- -destkeypass zookeeper_keystore_password \
- -destkeystore /certs/zookeeper.keystore.jks \
- -srckeystore /certs/zookeeper.p12 \
- -srcstoretype PKCS12 \
- -srcstorepass zookeeper_keystore_password \
- -alias zookeeper \
- -noprompt
-
-# Remove existing truststore if it exists
-rm -f /certs/zookeeper.truststore.jks
-
-# Create truststore
-keytool -keystore /certs/zookeeper.truststore.jks -alias zookeeper \
- -import -file /certs/zookeeper.crt \
- -storepass zookeeper_truststore_password -noprompt
-
-# Set permissions
-chmod 644 /certs/*
-
-echo "Zookeeper certificates generated successfully"
-ls -la /certs/
\ No newline at end of file
diff --git a/backend/zookeeper/secrets/kafka_jaas.conf b/backend/zookeeper/secrets/kafka_jaas.conf
deleted file mode 100644
index 9bea890e..00000000
--- a/backend/zookeeper/secrets/kafka_jaas.conf
+++ /dev/null
@@ -1,11 +0,0 @@
-Server {
- org.apache.zookeeper.server.auth.DigestLoginModule required
- user_super="admin-secret"
- user_kafka="kafka-secret";
-};
-
-Client {
- org.apache.zookeeper.server.auth.DigestLoginModule required
- username="kafka"
- password="kafka-secret";
-};
diff --git a/deploy.sh b/deploy.sh
index 3de68cfe..b2b1d904 100755
--- a/deploy.sh
+++ b/deploy.sh
@@ -178,8 +178,7 @@ cmd_infra() {
fi
# Start only infrastructure services (no app, no workers, no observability)
- # zookeeper-certgen is needed for kafka to start
- docker compose up -d zookeeper-certgen mongo redis zookeeper kafka $WAIT_FLAG $WAIT_TIMEOUT_FLAG
+ docker compose up -d mongo redis kafka $WAIT_FLAG $WAIT_TIMEOUT_FLAG
print_success "Infrastructure services started"
docker compose ps
diff --git a/docker-compose.yaml b/docker-compose.yaml
index 1fdde620..309eaeac 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -43,7 +43,7 @@ services:
image: mongo:8.0
command: ["mongod", "--wiredTigerCacheSizeGB", "0.4"]
ports:
- - "27017:27017"
+ - "127.0.0.1:27017:27017"
environment:
MONGO_INITDB_ROOT_USERNAME: ${MONGO_ROOT_USER:-root}
MONGO_INITDB_ROOT_PASSWORD: ${MONGO_ROOT_PASSWORD:-rootpassword}
@@ -70,7 +70,7 @@ services:
image: redis:7-alpine
container_name: redis
ports:
- - "6379:6379"
+ - "127.0.0.1:6379:6379"
command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru --save ""
volumes:
- redis_data:/data
@@ -118,7 +118,7 @@ services:
- ./backend/kubeconfig.yaml:/app/kubeconfig.yaml:ro
ports:
- "443:443"
- - "9090:9090" # Metrics port (host:container)
+ - "127.0.0.1:9090:9090" # Metrics port (host:container)
networks:
- app-network
container_name: backend
@@ -173,7 +173,7 @@ services:
image: grafana/grafana:12.3.1
user: "472"
ports:
- - "3000:3000"
+ - "127.0.0.1:3000:3000"
volumes:
- ./backend/grafana/grafana.ini:/etc/grafana/grafana.ini:ro
- ./backend/grafana/provisioning:/etc/grafana/provisioning:ro
@@ -188,126 +188,37 @@ services:
- GF_SECURITY_ADMIN_USER=${GRAFANA_ADMIN_USER:-admin}
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_ADMIN_PASSWORD:-admin123}
- # Kafka Infrastructure for Event-Driven Design
- # Certificate generator for Zookeeper/Kafka SSL
- zookeeper-certgen:
- image: ghcr.io/hardmax71/integr8scode/zookeeper-certgen:${IMAGE_TAG:-latest}
- build:
- context: ./backend/zookeeper
- dockerfile: Dockerfile.certgen
- container_name: zookeeper-certgen
- volumes:
- - zookeeper_certs:/certs
- networks:
- - app-network
- restart: "no"
-
- zookeeper:
- image: confluentinc/cp-zookeeper:7.8.2
- container_name: zookeeper
- depends_on:
- zookeeper-certgen:
- condition: service_completed_successfully
- environment:
- # Basic configuration
- ZOOKEEPER_CLIENT_PORT: 2181
- # Enable TLS-secure client port to match zoo.cfg
- ZOOKEEPER_SECURE_CLIENT_PORT: 2281
- ZOOKEEPER_TICK_TIME: 2000
-
- # Production settings
- ZOOKEEPER_MAX_CLIENT_CNXNS: 60
- ZOOKEEPER_INIT_LIMIT: 10
- ZOOKEEPER_SYNC_LIMIT: 5
-
- # Security settings (plaintext + SASL only)
- ZOOKEEPER_AUTH_PROVIDER_1: org.apache.zookeeper.server.auth.DigestAuthenticationProvider
- ZOOKEEPER_SERVER_CNXN_FACTORY: org.apache.zookeeper.server.NettyServerCnxnFactory
- KAFKA_OPTS: '-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory -Dzookeeper.4lw.commands.whitelist=* -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_jaas.conf'
-
- # TLS settings for secure client port (keystore/truststore produced by certgen)
- ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/kafka/certs/zookeeper.keystore.jks
- ZOOKEEPER_SSL_KEYSTORE_PASSWORD: zookeeper_keystore_password
- ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/certs/zookeeper.truststore.jks
- ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: zookeeper_truststore_password
- ZOOKEEPER_SSL_CLIENT_AUTH: need
-
- # 4lw commands whitelist (minimal for security)
- ZOOKEEPER_4LW_COMMANDS_WHITELIST: '*'
-
- # Autopurge settings
- ZOOKEEPER_AUTOPURGE_SNAP_RETAIN_COUNT: 3
- ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: 24
-
- # JVM settings
- KAFKA_HEAP_OPTS: '-Xms128m -Xmx256m'
- KAFKA_JVM_PERFORMANCE_OPTS: '-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:+ParallelRefProcEnabled -Djute.maxbuffer=4194304'
-
- # Logging
- ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: 'WARN'
- ZOOKEEPER_LOG4J_LOGGERS: 'org.apache.zookeeper.server.auth=INFO,org.apache.zookeeper.audit=INFO'
-
- # Disable admin server for security
- ZOOKEEPER_ADMIN_ENABLE_SERVER: 'false'
- ZOOKEEPER_ADMIN_SERVER_PORT: 0
-
- # Metrics
- ZOOKEEPER_METRICS_PROVIDER_CLASS_NAME: org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
- ZOOKEEPER_METRICS_PROVIDER_HTTP_PORT: 7070
-
- volumes:
- - ./backend/zookeeper/conf:/etc/kafka/conf:ro
- - ./backend/zookeeper/secrets:/etc/kafka/secrets:ro
- - zookeeper_certs:/etc/kafka/certs:ro
- - zookeeper_data:/var/lib/zookeeper/data
- - zookeeper_log:/var/lib/zookeeper/log
- - zookeeper_logs:/var/log/zookeeper
- ports:
- - "2181:2181"
- - "2281:2281" # Secure TLS client port
- - "7070:7070" # Metrics port
- networks:
- - app-network
- ulimits:
- nofile:
- soft: 65536
- hard: 65536
- mem_limit: 512m
- restart: unless-stopped
- healthcheck:
- test: ["CMD-SHELL", "echo ruok | nc localhost 2181 | grep imok"]
- interval: 3s
- timeout: 5s
- retries: 15
- start_period: 5s
-
+ # Kafka (KRaft mode — no ZooKeeper)
kafka:
image: confluentinc/cp-kafka:7.8.2
container_name: kafka
- depends_on:
- zookeeper:
- condition: service_healthy
ports:
- - "9092:9092"
- - "29092:29092"
+ - "127.0.0.1:9092:9092"
+ - "127.0.0.1:29092:29092"
environment:
- KAFKA_BROKER_ID: 1
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
+ # KRaft identity (combined broker + controller)
+ KAFKA_NODE_ID: 1
+ KAFKA_PROCESS_ROLES: 'broker,controller'
+ CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
+ KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
+ KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
+
+ # Listeners
+ KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
+ KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
+ KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
+
+ # Single-node replication
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
- KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' # Needed for initial setup
+
+ # Topic management
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_LOG_RETENTION_HOURS: 168
-
- # Security settings for Zookeeper connection (SASL/Digest over plaintext)
- KAFKA_OPTS: '-Dzookeeper.sasl.client=true -Dzookeeper.sasl.clientconfig=Client -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_jaas.conf'
- KAFKA_ZOOKEEPER_SET_ACL: 'false'
- # KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
- # KAFKA_SUPER_USERS: 'User:admin;User:kafka'
-
+
# Production settings
KAFKA_COMPRESSION_TYPE: 'gzip'
KAFKA_NUM_NETWORK_THREADS: 8
@@ -315,18 +226,16 @@ services:
KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
-
+
# Log settings
KAFKA_LOG_SEGMENT_BYTES: 1073741824
KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 300000
KAFKA_LOG_CLEANUP_POLICY: 'delete'
-
+
# JVM settings
KAFKA_HEAP_OPTS: '-Xms256m -Xmx1g'
KAFKA_JVM_PERFORMANCE_OPTS: '-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true'
-
volumes:
- - ./backend/zookeeper/secrets/kafka_jaas.conf:/etc/kafka/secrets/kafka_jaas.conf:ro
- kafka_data:/var/lib/kafka/data
- kafka_logs:/var/log/kafka
networks:
@@ -351,7 +260,7 @@ services:
depends_on:
- kafka
ports:
- - "9000:9000"
+ - "127.0.0.1:9000:9000"
environment:
KAFKA_BROKERCONNECT: kafka:29092
JVM_OPTS: "-Xms256M -Xmx512M"
@@ -508,14 +417,14 @@ services:
profiles: ["observability"]
mem_limit: 256m
ports:
- - "5775:5775/udp" # Zipkin/thrift compact
- - "6831:6831/udp" # Thrift compact
- - "6832:6832/udp" # Thrift binary
- - "5778:5778" # HTTP config
- - "16686:16686" # Jaeger UI
- - "14268:14268" # HTTP collector
- - "14250:14250" # gRPC collector
- - "9411:9411" # Zipkin compatible endpoint
+ - "127.0.0.1:5775:5775/udp" # Zipkin/thrift compact
+ - "127.0.0.1:6831:6831/udp" # Thrift compact
+ - "127.0.0.1:6832:6832/udp" # Thrift binary
+ - "127.0.0.1:5778:5778" # HTTP config
+ - "127.0.0.1:16686:16686" # Jaeger UI
+ - "127.0.0.1:14268:14268" # HTTP collector
+ - "127.0.0.1:14250:14250" # gRPC collector
+ - "127.0.0.1:9411:9411" # Zipkin compatible endpoint
environment:
- COLLECTOR_ZIPKIN_HOST_PORT=:9411
- COLLECTOR_OTLP_ENABLED=true
@@ -572,7 +481,7 @@ services:
profiles: ["observability"]
mem_limit: 256m
ports:
- - "8428:8428"
+ - "127.0.0.1:8428:8428"
volumes:
- victoria_metrics_data:/victoria-metrics-data
command:
@@ -601,7 +510,7 @@ services:
- "--topic.filter=.*"
- "--group.filter=.*"
ports:
- - "9308:9308"
+ - "127.0.0.1:9308:9308"
networks:
- app-network
depends_on:
@@ -620,9 +529,9 @@ services:
- ./backend/otel-collector-config.yaml:/etc/otel-collector-config.yaml:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
ports:
- - "4317:4317" # OTLP gRPC
- - "4318:4318" # OTLP HTTP
- - "13133:13133" # Health check
+ - "127.0.0.1:4317:4317" # OTLP gRPC
+ - "127.0.0.1:4318:4318" # OTLP HTTP
+ - "127.0.0.1:13133:13133" # Health check
networks:
- app-network
depends_on:
@@ -641,10 +550,6 @@ volumes:
shared_ca:
kafka_data:
kafka_logs:
- zookeeper_data:
- zookeeper_log:
- zookeeper_logs:
- zookeeper_certs:
# --8<-- [end:dev_volumes]
networks:
diff --git a/docs/architecture/overview.md b/docs/architecture/overview.md
index 297a79e0..a29d2a4f 100644
--- a/docs/architecture/overview.md
+++ b/docs/architecture/overview.md
@@ -31,7 +31,7 @@ Integr8sCode lets users submit Python scripts through a Svelte SPA. The FastAPI

The SPA hits the frontend, which proxies to the API over HTTPS; the API
-serves both REST and SSE. Kafka carries events as JSON (serialized by FastStream) with Zookeeper backing it; kafka-
+serves both REST and SSE. Kafka carries events as JSON (serialized by FastStream) using KRaft for metadata consensus; kafka-
init seeds topics. All workers are separate containers subscribed to Kafka; the k8s-worker talks to the
Kubernetes API to run code, the pod-monitor watches pods, the result-processor writes results to Mongo
and nudges Redis for SSE fanout, and the saga-orchestrator coordinates long flows with Mongo and Redis.
diff --git a/docs/operations/cicd.md b/docs/operations/cicd.md
index b22f10b9..cd715d7e 100644
--- a/docs/operations/cicd.md
+++ b/docs/operations/cicd.md
@@ -33,7 +33,7 @@ graph LR
end
subgraph "Docker Scan & Promote"
- Scan["Trivy Scan (5 images)"]
+ Scan["Trivy Scan (4 images)"]
Promote["Promote SHA → latest"]
Scan --> Promote
end
@@ -92,7 +92,7 @@ no setup to overlap.
## Stack Tests (the main workflow)
-This is the core testing workflow. It builds all 5 container images, pushes them to GHCR with immutable SHA-based
+This is the core testing workflow. It builds all 4 container images, pushes them to GHCR with immutable SHA-based
tags, then runs E2E tests on separate runners that pull images from the registry.
```mermaid
@@ -131,29 +131,28 @@ the image build is skipped entirely.
### Phase 2: Build and push
-All 5 images are built on a single runner and pushed to GHCR with an immutable `sha-<7chars>` tag:
+All 4 images are built on a single runner and pushed to GHCR with an immutable `sha-<7chars>` tag:
| Image | Source |
|----------------------|---------------------------------------------|
| `base` | `backend/Dockerfile.base` |
| `backend` | `backend/Dockerfile` |
| `cert-generator` | `cert-generator/Dockerfile` |
-| `zookeeper-certgen` | `backend/zookeeper/Dockerfile.certgen` |
| `frontend` | `frontend/Dockerfile` |
Workers reuse the `backend` image with different `command:` overrides in docker-compose, so no separate worker images
-are needed. All 5 images are scanned by Trivy and promoted to `latest` in the
+are needed. All 4 images are scanned by Trivy and promoted to `latest` in the
[Docker Scan & Promote](#docker-scan-promote) workflow.
The base image is cached separately as a zstd-compressed tarball since its dependencies rarely change. The backend
image depends on it via `--build-context base=docker-image://integr8scode-base:latest`. Utility and frontend images
use GHA layer caching.
-All 5 images are pushed to GHCR in parallel, with each push tracked by PID so individual failures are reported:
+All 4 images are pushed to GHCR in parallel, with each push tracked by PID so individual failures are reported:
```yaml
declare -A PIDS
-for name in base backend cert-generator zookeeper-certgen ...; do
+for name in base backend cert-generator frontend; do
docker push "$IMG/$name:$TAG" &
PIDS[$name]=$!
done
@@ -180,7 +179,7 @@ This action kicks off three slow tasks that can overlap:
1. **GHCR login** using `docker/login-action@v3`
2. **Background image pull + infra pre-warm** — pulls all compose images then starts infrastructure services
- (mongo, redis, kafka, zookeeper) in a background `nohup` process. The exit status is persisted
+ (mongo, redis, kafka) in a background `nohup` process. The exit status is persisted
to `/tmp/infra-pull.exit` so the next action can check for failures.
3. **k3s install** — downloads and installs a pinned k3s version with SHA256 checksum verification (see
[supply-chain hardening](#supply-chain-hardening) below)
@@ -238,7 +237,7 @@ sets it, and only after all tests pass.
```mermaid
graph LR
ST["Stack Tests
(main, success)"] -->|workflow_run trigger| Scan
- Scan["Trivy Scan
(5 images in parallel)"] --> Promote["crane copy
sha-xxx → latest"]
+ Scan["Trivy Scan
(4 images in parallel)"] --> Promote["crane copy
sha-xxx → latest"]
Promote --> Summary["Step Summary"]
```
@@ -249,7 +248,7 @@ Runs automatically when `Stack Tests` completes successfully on `main`. Can also
### Scan
-Uses [Trivy](https://trivy.dev/) (pinned at `v0.68.2`) to scan all 5 deployed images in parallel via matrix strategy.
+Uses [Trivy](https://trivy.dev/) (pinned at `v0.68.2`) to scan all 4 deployed images in parallel via matrix strategy.
Scans for `CRITICAL` and `HIGH` severity vulnerabilities with unfixed issues ignored. Results are uploaded as SARIF
files to GitHub's Security tab.
@@ -285,7 +284,7 @@ Releases use [Calendar Versioning](https://calver.org/) with the format `YYYY.M.
- `PATCH` — auto-incrementing counter within the month, starting at `0`
Examples: `2026.2.0`, `2026.2.1`, `2026.3.0`. The workflow counts existing tags matching the current `YYYY.M.*` pattern
-and increments the patch number. All 5 deployed GHCR images are tagged with the CalVer version using crane (same
+and increments the patch number. All 4 deployed GHCR images are tagged with the CalVer version using crane (same
registry-level manifest copy as the promote step).
### GitHub Release
@@ -438,7 +437,7 @@ Playwright browsers are cached by `package-lock.json` hash. On cache hit, only s
### Parallel image push
-All 5 images are pushed to GHCR concurrently using background processes with PID tracking. Each push failure is
+All 4 images are pushed to GHCR concurrently using background processes with PID tracking. Each push failure is
reported individually via `::error::` annotations.
## Running locally
diff --git a/docs/operations/deployment.md b/docs/operations/deployment.md
index 15203ff0..2d42a0a9 100644
--- a/docs/operations/deployment.md
+++ b/docs/operations/deployment.md
@@ -41,7 +41,7 @@ with health checks and dependency ordering, so containers start in the correct s
./deploy.sh dev
```
-This brings up MongoDB, Redis, Kafka with Zookeeper, all seven workers, the backend API, and the
+This brings up MongoDB, Redis, Kafka (KRaft mode), all seven workers, the backend API, and the
frontend. Two initialization containers run automatically: `kafka-init` creates required Kafka topics, and `user-seed`
populates the database with default user accounts.
@@ -129,7 +129,6 @@ services define healthchecks in `docker-compose.yaml`:
| Redis | `redis-cli ping` |
| Backend | `curl /api/v1/health/live` |
| Kafka | `kafka-broker-api-versions` |
-| Zookeeper | `echo ruok \| nc localhost 2181 \| grep imok` |
Services without explicit healthchecks (workers, Grafana, Kafdrop) are considered "started" when their container is
running. The test suite doesn't require worker containers since tests instantiate worker classes directly.
@@ -142,8 +141,7 @@ Every long-running service has a `mem_limit` in `docker-compose.yaml` to prevent
|---------|-------------|--------------|-------|
| MongoDB | 1024 m | wiredTiger 0.4 GB | `--wiredTigerCacheSizeGB 0.4` prevents default 50 %-of-RAM behavior |
| Redis | 300 m | 256 MB maxmemory | LRU eviction, persistence disabled |
-| Kafka | 1280 m | JVM `-Xms256m -Xmx1g` | Single-broker, low throughput workload |
-| Zookeeper | 512 m | JVM `-Xms128m -Xmx256m` | Metadata-only role |
+| Kafka | 1280 m | JVM `-Xms256m -Xmx1g` | Single-broker KRaft mode, low throughput workload |
| Backend API | 768 m | 2 gunicorn workers | Controlled by `WEB_CONCURRENCY` env var |
| Frontend | 128 m | nginx serving static assets | |
| Each worker (×7) | 160 m | Single-process Python | coordinator, k8s-worker, pod-monitor, result-processor, saga-orchestrator, event-replay, dlq-processor |
@@ -153,7 +151,7 @@ Every long-running service has a `mem_limit` in `docker-compose.yaml` to prevent
| OTel Collector | 192 m | `limit_mib: 150` in memory_limiter processor | Observability profile |
| Kafka Exporter | 96 m | | Observability profile |
-All long-running services — core infrastructure (MongoDB, Redis, Kafka, Zookeeper, backend, frontend), all seven workers (coordinator, k8s-worker, pod-monitor, result-processor, saga-orchestrator, event-replay, dlq-processor), and observability components (Grafana, kafka-exporter, victoria-metrics, otel-collector) — have `restart: unless-stopped` so they recover automatically after an OOM kill or crash.
+All long-running services — core infrastructure (MongoDB, Redis, Kafka, backend, frontend), all seven workers (coordinator, k8s-worker, pod-monitor, result-processor, saga-orchestrator, event-replay, dlq-processor), and observability components (Grafana, kafka-exporter, victoria-metrics, otel-collector) — have `restart: unless-stopped` so they recover automatically after an OOM kill or crash.
## Monitoring