diff --git a/.github/actions/e2e-boot/action.yml b/.github/actions/e2e-boot/action.yml index 39d10df9..e6d49b1a 100644 --- a/.github/actions/e2e-boot/action.yml +++ b/.github/actions/e2e-boot/action.yml @@ -28,7 +28,7 @@ runs: IMAGE_TAG='"$IMAGE_TAG"' docker compose pull --quiet 2>&1 echo "--- pull done, starting infra ---" docker compose up -d --no-build \ - mongo redis shared-ca zookeeper-certgen zookeeper kafka 2>&1 + mongo redis shared-ca kafka 2>&1 echo $? > /tmp/infra-pull.exit ' > /tmp/infra-pull.log 2>&1 & echo $! > /tmp/infra-pull.pid diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 61b185e9..e40814e8 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -36,7 +36,6 @@ jobs: - backend - frontend - cert-generator - - zookeeper-certgen steps: - uses: actions/checkout@v6 @@ -110,7 +109,6 @@ jobs: crane copy "$REGISTRY/$PREFIX/backend:$TAG" "$REGISTRY/$PREFIX/backend:latest" crane copy "$REGISTRY/$PREFIX/frontend:$TAG" "$REGISTRY/$PREFIX/frontend:latest" crane copy "$REGISTRY/$PREFIX/cert-generator:$TAG" "$REGISTRY/$PREFIX/cert-generator:latest" - crane copy "$REGISTRY/$PREFIX/zookeeper-certgen:$TAG" "$REGISTRY/$PREFIX/zookeeper-certgen:latest" summary: name: Summary @@ -142,4 +140,4 @@ jobs: echo "| Frontend | \`docker pull $REGISTRY/$PREFIX/frontend:latest\` |" >> $GITHUB_STEP_SUMMARY echo "" >> $GITHUB_STEP_SUMMARY echo "### Security Scans" >> $GITHUB_STEP_SUMMARY - echo "All 5 images scanned with Trivy (CRITICAL + HIGH, unfixed ignored)." >> $GITHUB_STEP_SUMMARY + echo "All 4 images scanned with Trivy (CRITICAL + HIGH, unfixed ignored)." >> $GITHUB_STEP_SUMMARY diff --git a/.github/workflows/release-deploy.yml b/.github/workflows/release-deploy.yml index 3bb636db..58596f5e 100644 --- a/.github/workflows/release-deploy.yml +++ b/.github/workflows/release-deploy.yml @@ -80,7 +80,7 @@ jobs: run: | PREFIX="${GITHUB_REPOSITORY_OWNER,,}/integr8scode" VERSION="${{ steps.calver.outputs.version }}" - for img in base backend frontend cert-generator zookeeper-certgen; do + for img in base backend frontend cert-generator; do crane copy "$REGISTRY/$PREFIX/$img:latest" "$REGISTRY/$PREFIX/$img:$VERSION" done diff --git a/.github/workflows/stack-tests.yml b/.github/workflows/stack-tests.yml index 9f695916..0ac76acd 100644 --- a/.github/workflows/stack-tests.yml +++ b/.github/workflows/stack-tests.yml @@ -29,7 +29,6 @@ env: MONGO_IMAGE: mongo:8.0 REDIS_IMAGE: redis:7-alpine KAFKA_IMAGE: confluentinc/cp-kafka:7.8.2 - ZOOKEEPER_IMAGE: confluentinc/cp-zookeeper:7.8.2 K3S_VERSION: v1.32.11+k3s1 K3S_INSTALL_SHA256: d75e014f2d2ab5d30a318efa5c326f3b0b7596f194afcff90fa7a7a91166d5f7 @@ -184,16 +183,6 @@ jobs: cache-from: type=gha,scope=cert-generator cache-to: type=gha,mode=max,scope=cert-generator - - name: Build zookeeper-certgen image - uses: docker/build-push-action@v6 - with: - context: ./backend/zookeeper - file: ./backend/zookeeper/Dockerfile.certgen - load: true - tags: integr8scode-zookeeper-certgen:latest - cache-from: type=gha,scope=zookeeper-certgen - cache-to: type=gha,mode=max,scope=zookeeper-certgen - # ── Frontend (nginx + SSL) ───────────────────────────────────── - name: Build frontend image uses: docker/build-push-action@v6 @@ -216,13 +205,11 @@ jobs: docker tag integr8scode-base:latest "$IMG/base:$TAG" docker tag integr8scode-backend:latest "$IMG/backend:$TAG" docker tag integr8scode-cert-generator:latest "$IMG/cert-generator:$TAG" - docker tag integr8scode-zookeeper-certgen:latest "$IMG/zookeeper-certgen:$TAG" docker tag integr8scode-frontend:latest "$IMG/frontend:$TAG" - # Push all 5 images in parallel, tracking each PID + # Push all 4 images in parallel, tracking each PID declare -A PIDS - for name in base backend cert-generator zookeeper-certgen \ - frontend; do + for name in base backend cert-generator frontend; do docker push "$IMG/$name:$TAG" & PIDS[$name]=$! done @@ -286,7 +273,7 @@ jobs: run: | mkdir -p logs docker compose logs --timestamps > logs/docker-compose.log 2>&1 - for svc in backend mongo redis kafka zookeeper \ + for svc in backend mongo redis kafka \ coordinator k8s-worker pod-monitor result-processor \ saga-orchestrator event-replay dlq-processor; do docker compose logs --timestamps "$svc" > "logs/$svc.log" 2>&1 || true diff --git a/backend/app/db/repositories/saga_repository.py b/backend/app/db/repositories/saga_repository.py index ac453232..c998288d 100644 --- a/backend/app/db/repositories/saga_repository.py +++ b/backend/app/db/repositories/saga_repository.py @@ -1,17 +1,26 @@ import dataclasses from datetime import datetime, timezone from typing import Any +from uuid import uuid4 from beanie.exceptions import RevisionIdWasChanged from beanie.odm.enums import SortDirection from beanie.odm.operators.find import BaseFindOperator from beanie.odm.queries.update import UpdateResponse -from beanie.operators import GT, LT, NE, Eq, In +from beanie.operators import GT, LT, NE, Eq, In, Set from monggregate import Pipeline, S from app.db.docs import ExecutionDocument, SagaDocument from app.domain.enums import SagaState -from app.domain.saga import Saga, SagaConcurrencyError, SagaContextData, SagaFilter, SagaListResult, SagaNotFoundError +from app.domain.saga import ( + Saga, + SagaConcurrencyError, + SagaContextData, + SagaFilter, + SagaInvalidStateError, + SagaListResult, + SagaNotFoundError, +) _saga_fields = set(Saga.__dataclass_fields__) @@ -65,6 +74,34 @@ async def save_saga(self, saga_id: str, **updates: Any) -> Saga: raise SagaConcurrencyError(saga_id) from exc return self._to_domain(doc) + async def atomic_cancel_saga( + self, saga_id: str, error_message: str, completed_at: datetime, + ) -> Saga: + """Atomically cancel a saga using findOneAndUpdate. + + Bumps revision_id so any concurrent save_changes() (which filters on + the old revision_id) will see a mismatch and raise RevisionIdWasChanged. + """ + doc = await SagaDocument.find_one( + SagaDocument.saga_id == saga_id, + In(SagaDocument.state, [SagaState.RUNNING, SagaState.CREATED]), + ).update( + Set({ # type: ignore[no-untyped-call] + SagaDocument.state: SagaState.CANCELLED, + SagaDocument.error_message: error_message, + SagaDocument.completed_at: completed_at, + SagaDocument.updated_at: datetime.now(timezone.utc), + "revision_id": uuid4(), + }), + response_type=UpdateResponse.NEW_DOCUMENT, + ) + if not doc: + existing = await SagaDocument.find_one(SagaDocument.saga_id == saga_id) + if not existing: + raise SagaNotFoundError(saga_id) + raise SagaInvalidStateError(saga_id, existing.state, "cancel") + return self._to_domain(doc) + async def get_or_create_saga(self, saga: Saga) -> tuple[Saga, bool]: """Atomically get or create a saga by (execution_id, saga_name). diff --git a/backend/app/services/saga/saga_orchestrator.py b/backend/app/services/saga/saga_orchestrator.py index 1a796882..c27d23cd 100644 --- a/backend/app/services/saga/saga_orchestrator.py +++ b/backend/app/services/saga/saga_orchestrator.py @@ -23,8 +23,6 @@ SagaConcurrencyError, SagaConfig, SagaContextData, - SagaInvalidStateError, - SagaNotFoundError, ) from app.events.core import UnifiedProducer @@ -282,27 +280,23 @@ async def get_execution_sagas(self, execution_id: str) -> list[Saga]: async def cancel_saga(self, saga_id: str) -> None: """Cancel a running saga and trigger compensation. + Uses an atomic findOneAndUpdate to set state=CANCELLED, eliminating + revision-check races with the step-execution loop. + Raises SagaNotFoundError if saga doesn't exist. Raises SagaInvalidStateError if saga is not in a cancellable state. - Raises SagaConcurrencyError if saga was modified concurrently. """ - saga_instance = await self.get_saga_status(saga_id) - if not saga_instance: - raise SagaNotFoundError(saga_id) - - if saga_instance.state not in (SagaState.RUNNING, SagaState.CREATED): - raise SagaInvalidStateError(saga_id, saga_instance.state, "cancel") + saved = await self._repo.atomic_cancel_saga( + saga_id, + error_message="Saga cancelled by user request", + completed_at=datetime.now(UTC), + ) self.logger.info( "Saga cancellation initiated", saga_id=saga_id, - execution_id=saga_instance.execution_id, - user_id=saga_instance.context_data.user_id, - ) - - saved = await self._repo.save_saga( - saga_id, state=SagaState.CANCELLED, - error_message="Saga cancelled by user request", completed_at=datetime.now(UTC), + execution_id=saved.execution_id, + user_id=saved.context_data.user_id, ) if self._producer and self.config.store_events: diff --git a/backend/app/services/sse/sse_service.py b/backend/app/services/sse/sse_service.py index c698ec0e..6bac8e09 100644 --- a/backend/app/services/sse/sse_service.py +++ b/backend/app/services/sse/sse_service.py @@ -9,7 +9,6 @@ from app.core.metrics import ConnectionMetrics from app.db.repositories import SSERepository from app.domain.enums import EventType, NotificationChannel, SSEControlEvent -from app.domain.events import ResourceUsageDomain from app.domain.execution.models import ExecutionResultDomain from app.domain.sse import ( DomainNotificationSSEPayload, @@ -128,7 +127,12 @@ async def create_execution_stream(self, execution_id: str, user_id: str) -> Asyn self.logger.info("SSE connection closed", execution_id=execution_id) async def _build_sse_event_from_redis(self, execution_id: str, msg: RedisSSEMessage) -> SSEExecutionEventData: - """Build typed SSE event from Redis message.""" + """Build typed SSE event from Redis message. + + Uses validate_python to coerce JSON primitives (str → enum, ISO str → datetime, + dict → dataclass) back to proper types after the Redis JSON round-trip. + Extra keys from the original domain event are ignored. + """ result: ExecutionResultDomain | None = None if msg.event_type == EventType.RESULT_STORED: execution = await self.repository.get_execution(execution_id) @@ -142,21 +146,12 @@ async def _build_sse_event_from_redis(self, execution_id: str, msg: RedisSSEMess execution_id=execution_id, ) - ru = msg.data.get("resource_usage") - return SSEExecutionEventData( - event_type=msg.data.get("event_type", msg.event_type), - execution_id=execution_id, - timestamp=msg.data.get("timestamp"), - event_id=msg.data.get("event_id"), - status=msg.data.get("status"), - stdout=msg.data.get("stdout"), - stderr=msg.data.get("stderr"), - exit_code=msg.data.get("exit_code"), - timeout_seconds=msg.data.get("timeout_seconds"), - message=msg.data.get("message"), - resource_usage=ResourceUsageDomain(**ru) if ru else None, - result=result, - ) + return _sse_event_adapter.validate_python({ + **msg.data, + "event_type": msg.event_type, + "execution_id": execution_id, + "result": result, + }) async def create_notification_stream(self, user_id: str) -> AsyncGenerator[dict[str, Any], None]: subscription: SSERedisSubscription | None = None diff --git a/backend/tests/e2e/conftest.py b/backend/tests/e2e/conftest.py index 57fbd95e..f2e7ff8e 100644 --- a/backend/tests/e2e/conftest.py +++ b/backend/tests/e2e/conftest.py @@ -10,7 +10,7 @@ from aiokafka import AIOKafkaConsumer from app.db.docs.saga import SagaDocument from app.domain.enums import EventType, KafkaTopic, UserRole -from app.domain.events import DomainEvent, DomainEventAdapter +from app.domain.events import DomainEvent, DomainEventAdapter, SagaStartedEvent from app.schemas_pydantic.execution import ExecutionRequest, ExecutionResponse from app.schemas_pydantic.notification import NotificationListResponse, NotificationResponse from app.schemas_pydantic.saga import SagaStatusResponse @@ -110,15 +110,17 @@ async def wait_for_saga_command(self, execution_id: str, timeout: float = 15.0) timeout=timeout, ) - async def wait_for_saga_started(self, execution_id: str, timeout: float = 15.0) -> DomainEvent: + async def wait_for_saga_started(self, execution_id: str, timeout: float = 15.0) -> SagaStartedEvent: """Wait for SAGA_STARTED — saga document is guaranteed in MongoDB after this.""" - return await self.wait_for( + event = await self.wait_for( lambda e: ( e.event_type == EventType.SAGA_STARTED and e.execution_id == execution_id ), timeout=timeout, ) + assert isinstance(event, SagaStartedEvent) + return event async def wait_for_notification_created(self, execution_id: str, timeout: float = 15.0) -> DomainEvent: """Wait for NOTIFICATION_CREATED — notification is guaranteed in MongoDB after this.""" diff --git a/backend/tests/e2e/test_saga_routes.py b/backend/tests/e2e/test_saga_routes.py index 6378520e..120fe9e7 100644 --- a/backend/tests/e2e/test_saga_routes.py +++ b/backend/tests/e2e/test_saga_routes.py @@ -201,25 +201,26 @@ async def test_cancel_saga( assert exec_response.status_code == 200 execution = ExecutionResponse.model_validate(exec_response.json()) + + # Get saga_id from SAGA_STARTED event (published after saga persisted) + started = await event_waiter.wait_for_saga_started(execution.execution_id) + # Wait for CREATE_POD_COMMAND — the orchestrator's last step. # After this the orchestrator is idle, so cancel won't race with # concurrent step-processing writes to the saga document. await event_waiter.wait_for_saga_command(execution.execution_id) - saga_resp = await test_user.get(f"/api/v1/sagas/execution/{execution.execution_id}") - saga = SagaListResponse.model_validate(saga_resp.json()).sagas[0] - - response = await test_user.post(f"/api/v1/sagas/{saga.saga_id}/cancel") + response = await test_user.post(f"/api/v1/sagas/{started.saga_id}/cancel") assert response.status_code == 200 result = SagaCancellationResponse.model_validate(response.json()) - assert result.saga_id == saga.saga_id + assert result.saga_id == started.saga_id assert result.success is True assert result.message is not None # cancel_saga sets state to CANCELLED synchronously in MongoDB # before returning the HTTP response (compensation also runs inline). - status_resp = await test_user.get(f"/api/v1/sagas/{saga.saga_id}") + status_resp = await test_user.get(f"/api/v1/sagas/{started.saga_id}") assert status_resp.status_code == 200 updated_saga = SagaStatusResponse.model_validate(status_resp.json()) assert updated_saga.state == SagaState.CANCELLED @@ -250,12 +251,9 @@ async def test_cancel_other_users_saga_forbidden( assert exec_response.status_code == 200 execution = ExecutionResponse.model_validate(exec_response.json()) - await event_waiter.wait_for_saga_started(execution.execution_id) - - saga_resp = await test_user.get(f"/api/v1/sagas/execution/{execution.execution_id}") - saga = SagaListResponse.model_validate(saga_resp.json()).sagas[0] + started = await event_waiter.wait_for_saga_started(execution.execution_id) - response = await another_user.post(f"/api/v1/sagas/{saga.saga_id}/cancel") + response = await another_user.post(f"/api/v1/sagas/{started.saga_id}/cancel") assert response.status_code == 403 diff --git a/backend/zookeeper/Dockerfile.certgen b/backend/zookeeper/Dockerfile.certgen deleted file mode 100644 index cb0c7756..00000000 --- a/backend/zookeeper/Dockerfile.certgen +++ /dev/null @@ -1,12 +0,0 @@ -FROM alpine:3.18 - -RUN apk add --no-cache openssl openjdk11-jre-headless - -# Create certificate directory -RUN mkdir -p /certs - -# Copy certificate generation script -COPY generate-certs.sh /generate-certs.sh -RUN chmod +x /generate-certs.sh - -CMD ["/generate-certs.sh"] \ No newline at end of file diff --git a/backend/zookeeper/conf/log4j.properties b/backend/zookeeper/conf/log4j.properties deleted file mode 100644 index 2b37b2a7..00000000 --- a/backend/zookeeper/conf/log4j.properties +++ /dev/null @@ -1,21 +0,0 @@ -# Zookeeper logging configuration -log4j.rootLogger=WARN, CONSOLE, ROLLINGFILE - -# Console appender -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.Threshold=WARN -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n - -# Rolling file appender -log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender -log4j.appender.ROLLINGFILE.Threshold=INFO -log4j.appender.ROLLINGFILE.File=/var/log/zookeeper/zookeeper.log -log4j.appender.ROLLINGFILE.MaxFileSize=10MB -log4j.appender.ROLLINGFILE.MaxBackupIndex=10 -log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout -log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n - -# Set specific loggers -log4j.logger.org.apache.zookeeper.server.auth=INFO -log4j.logger.org.apache.zookeeper.audit=INFO \ No newline at end of file diff --git a/backend/zookeeper/conf/zoo.cfg b/backend/zookeeper/conf/zoo.cfg deleted file mode 100644 index 496e14d3..00000000 --- a/backend/zookeeper/conf/zoo.cfg +++ /dev/null @@ -1,29 +0,0 @@ -# Zookeeper configuration file -tickTime=2000 -dataDir=/var/lib/zookeeper/data -dataLogDir=/var/lib/zookeeper/log -clientPort=2181 -secureClientPort=2281 -serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory -authProvider.1=org.apache.zookeeper.server.auth.DigestAuthenticationProvider -ssl.keyStore.location=/etc/kafka/certs/zookeeper.keystore.jks -ssl.keyStore.password=zookeeper_keystore_password -ssl.trustStore.location=/etc/kafka/certs/zookeeper.truststore.jks -ssl.trustStore.password=zookeeper_truststore_password -ssl.clientAuth=need -sslQuorum=true - -# Autopurge settings -autopurge.snapRetainCount=3 -autopurge.purgeInterval=24 - -# Admin server settings -admin.enableServer=false -admin.serverPort=0 - -# 4lw commands -4lw.commands.whitelist=* - -# Metrics -metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider -metricsProvider.httpPort=7070 \ No newline at end of file diff --git a/backend/zookeeper/generate-certs.sh b/backend/zookeeper/generate-certs.sh deleted file mode 100644 index 8cd4294b..00000000 --- a/backend/zookeeper/generate-certs.sh +++ /dev/null @@ -1,52 +0,0 @@ -#!/bin/sh -set -e - -# Check if certificates already exist -if [ -f /certs/zookeeper.keystore.jks ] && [ -f /certs/zookeeper.truststore.jks ]; then - echo "Certificates already exist, skipping generation" - exit 0 -fi - -# Generate private key -openssl genrsa -out /certs/zookeeper.key 2048 - -# Generate certificate signing request -openssl req -new -key /certs/zookeeper.key -out /certs/zookeeper.csr \ - -subj "/C=US/ST=CA/L=SF/O=Integr8sCode/CN=zookeeper" - -# Generate self-signed certificate -openssl x509 -req -days 365 -in /certs/zookeeper.csr \ - -signkey /certs/zookeeper.key -out /certs/zookeeper.crt - -# Create PKCS12 keystore -openssl pkcs12 -export -in /certs/zookeeper.crt -inkey /certs/zookeeper.key \ - -out /certs/zookeeper.p12 -name zookeeper \ - -password pass:zookeeper_keystore_password - -# Remove existing keystore if it exists -rm -f /certs/zookeeper.keystore.jks - -# Convert PKCS12 to JKS -keytool -importkeystore \ - -deststorepass zookeeper_keystore_password \ - -destkeypass zookeeper_keystore_password \ - -destkeystore /certs/zookeeper.keystore.jks \ - -srckeystore /certs/zookeeper.p12 \ - -srcstoretype PKCS12 \ - -srcstorepass zookeeper_keystore_password \ - -alias zookeeper \ - -noprompt - -# Remove existing truststore if it exists -rm -f /certs/zookeeper.truststore.jks - -# Create truststore -keytool -keystore /certs/zookeeper.truststore.jks -alias zookeeper \ - -import -file /certs/zookeeper.crt \ - -storepass zookeeper_truststore_password -noprompt - -# Set permissions -chmod 644 /certs/* - -echo "Zookeeper certificates generated successfully" -ls -la /certs/ \ No newline at end of file diff --git a/backend/zookeeper/secrets/kafka_jaas.conf b/backend/zookeeper/secrets/kafka_jaas.conf deleted file mode 100644 index 9bea890e..00000000 --- a/backend/zookeeper/secrets/kafka_jaas.conf +++ /dev/null @@ -1,11 +0,0 @@ -Server { - org.apache.zookeeper.server.auth.DigestLoginModule required - user_super="admin-secret" - user_kafka="kafka-secret"; -}; - -Client { - org.apache.zookeeper.server.auth.DigestLoginModule required - username="kafka" - password="kafka-secret"; -}; diff --git a/deploy.sh b/deploy.sh index 3de68cfe..b2b1d904 100755 --- a/deploy.sh +++ b/deploy.sh @@ -178,8 +178,7 @@ cmd_infra() { fi # Start only infrastructure services (no app, no workers, no observability) - # zookeeper-certgen is needed for kafka to start - docker compose up -d zookeeper-certgen mongo redis zookeeper kafka $WAIT_FLAG $WAIT_TIMEOUT_FLAG + docker compose up -d mongo redis kafka $WAIT_FLAG $WAIT_TIMEOUT_FLAG print_success "Infrastructure services started" docker compose ps diff --git a/docker-compose.yaml b/docker-compose.yaml index 1fdde620..309eaeac 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -43,7 +43,7 @@ services: image: mongo:8.0 command: ["mongod", "--wiredTigerCacheSizeGB", "0.4"] ports: - - "27017:27017" + - "127.0.0.1:27017:27017" environment: MONGO_INITDB_ROOT_USERNAME: ${MONGO_ROOT_USER:-root} MONGO_INITDB_ROOT_PASSWORD: ${MONGO_ROOT_PASSWORD:-rootpassword} @@ -70,7 +70,7 @@ services: image: redis:7-alpine container_name: redis ports: - - "6379:6379" + - "127.0.0.1:6379:6379" command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru --save "" volumes: - redis_data:/data @@ -118,7 +118,7 @@ services: - ./backend/kubeconfig.yaml:/app/kubeconfig.yaml:ro ports: - "443:443" - - "9090:9090" # Metrics port (host:container) + - "127.0.0.1:9090:9090" # Metrics port (host:container) networks: - app-network container_name: backend @@ -173,7 +173,7 @@ services: image: grafana/grafana:12.3.1 user: "472" ports: - - "3000:3000" + - "127.0.0.1:3000:3000" volumes: - ./backend/grafana/grafana.ini:/etc/grafana/grafana.ini:ro - ./backend/grafana/provisioning:/etc/grafana/provisioning:ro @@ -188,126 +188,37 @@ services: - GF_SECURITY_ADMIN_USER=${GRAFANA_ADMIN_USER:-admin} - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_ADMIN_PASSWORD:-admin123} - # Kafka Infrastructure for Event-Driven Design - # Certificate generator for Zookeeper/Kafka SSL - zookeeper-certgen: - image: ghcr.io/hardmax71/integr8scode/zookeeper-certgen:${IMAGE_TAG:-latest} - build: - context: ./backend/zookeeper - dockerfile: Dockerfile.certgen - container_name: zookeeper-certgen - volumes: - - zookeeper_certs:/certs - networks: - - app-network - restart: "no" - - zookeeper: - image: confluentinc/cp-zookeeper:7.8.2 - container_name: zookeeper - depends_on: - zookeeper-certgen: - condition: service_completed_successfully - environment: - # Basic configuration - ZOOKEEPER_CLIENT_PORT: 2181 - # Enable TLS-secure client port to match zoo.cfg - ZOOKEEPER_SECURE_CLIENT_PORT: 2281 - ZOOKEEPER_TICK_TIME: 2000 - - # Production settings - ZOOKEEPER_MAX_CLIENT_CNXNS: 60 - ZOOKEEPER_INIT_LIMIT: 10 - ZOOKEEPER_SYNC_LIMIT: 5 - - # Security settings (plaintext + SASL only) - ZOOKEEPER_AUTH_PROVIDER_1: org.apache.zookeeper.server.auth.DigestAuthenticationProvider - ZOOKEEPER_SERVER_CNXN_FACTORY: org.apache.zookeeper.server.NettyServerCnxnFactory - KAFKA_OPTS: '-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory -Dzookeeper.4lw.commands.whitelist=* -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_jaas.conf' - - # TLS settings for secure client port (keystore/truststore produced by certgen) - ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/kafka/certs/zookeeper.keystore.jks - ZOOKEEPER_SSL_KEYSTORE_PASSWORD: zookeeper_keystore_password - ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/certs/zookeeper.truststore.jks - ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: zookeeper_truststore_password - ZOOKEEPER_SSL_CLIENT_AUTH: need - - # 4lw commands whitelist (minimal for security) - ZOOKEEPER_4LW_COMMANDS_WHITELIST: '*' - - # Autopurge settings - ZOOKEEPER_AUTOPURGE_SNAP_RETAIN_COUNT: 3 - ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: 24 - - # JVM settings - KAFKA_HEAP_OPTS: '-Xms128m -Xmx256m' - KAFKA_JVM_PERFORMANCE_OPTS: '-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:+ParallelRefProcEnabled -Djute.maxbuffer=4194304' - - # Logging - ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: 'WARN' - ZOOKEEPER_LOG4J_LOGGERS: 'org.apache.zookeeper.server.auth=INFO,org.apache.zookeeper.audit=INFO' - - # Disable admin server for security - ZOOKEEPER_ADMIN_ENABLE_SERVER: 'false' - ZOOKEEPER_ADMIN_SERVER_PORT: 0 - - # Metrics - ZOOKEEPER_METRICS_PROVIDER_CLASS_NAME: org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider - ZOOKEEPER_METRICS_PROVIDER_HTTP_PORT: 7070 - - volumes: - - ./backend/zookeeper/conf:/etc/kafka/conf:ro - - ./backend/zookeeper/secrets:/etc/kafka/secrets:ro - - zookeeper_certs:/etc/kafka/certs:ro - - zookeeper_data:/var/lib/zookeeper/data - - zookeeper_log:/var/lib/zookeeper/log - - zookeeper_logs:/var/log/zookeeper - ports: - - "2181:2181" - - "2281:2281" # Secure TLS client port - - "7070:7070" # Metrics port - networks: - - app-network - ulimits: - nofile: - soft: 65536 - hard: 65536 - mem_limit: 512m - restart: unless-stopped - healthcheck: - test: ["CMD-SHELL", "echo ruok | nc localhost 2181 | grep imok"] - interval: 3s - timeout: 5s - retries: 15 - start_period: 5s - + # Kafka (KRaft mode — no ZooKeeper) kafka: image: confluentinc/cp-kafka:7.8.2 container_name: kafka - depends_on: - zookeeper: - condition: service_healthy ports: - - "9092:9092" - - "29092:29092" + - "127.0.0.1:9092:9092" + - "127.0.0.1:29092:29092" environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + # KRaft identity (combined broker + controller) + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: 'broker,controller' + CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + + # Listeners + KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + + # Single-node replication KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' # Needed for initial setup + + # Topic management + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_LOG_RETENTION_HOURS: 168 - - # Security settings for Zookeeper connection (SASL/Digest over plaintext) - KAFKA_OPTS: '-Dzookeeper.sasl.client=true -Dzookeeper.sasl.clientconfig=Client -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_jaas.conf' - KAFKA_ZOOKEEPER_SET_ACL: 'false' - # KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer' - # KAFKA_SUPER_USERS: 'User:admin;User:kafka' - + # Production settings KAFKA_COMPRESSION_TYPE: 'gzip' KAFKA_NUM_NETWORK_THREADS: 8 @@ -315,18 +226,16 @@ services: KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400 KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600 - + # Log settings KAFKA_LOG_SEGMENT_BYTES: 1073741824 KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 300000 KAFKA_LOG_CLEANUP_POLICY: 'delete' - + # JVM settings KAFKA_HEAP_OPTS: '-Xms256m -Xmx1g' KAFKA_JVM_PERFORMANCE_OPTS: '-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true' - volumes: - - ./backend/zookeeper/secrets/kafka_jaas.conf:/etc/kafka/secrets/kafka_jaas.conf:ro - kafka_data:/var/lib/kafka/data - kafka_logs:/var/log/kafka networks: @@ -351,7 +260,7 @@ services: depends_on: - kafka ports: - - "9000:9000" + - "127.0.0.1:9000:9000" environment: KAFKA_BROKERCONNECT: kafka:29092 JVM_OPTS: "-Xms256M -Xmx512M" @@ -508,14 +417,14 @@ services: profiles: ["observability"] mem_limit: 256m ports: - - "5775:5775/udp" # Zipkin/thrift compact - - "6831:6831/udp" # Thrift compact - - "6832:6832/udp" # Thrift binary - - "5778:5778" # HTTP config - - "16686:16686" # Jaeger UI - - "14268:14268" # HTTP collector - - "14250:14250" # gRPC collector - - "9411:9411" # Zipkin compatible endpoint + - "127.0.0.1:5775:5775/udp" # Zipkin/thrift compact + - "127.0.0.1:6831:6831/udp" # Thrift compact + - "127.0.0.1:6832:6832/udp" # Thrift binary + - "127.0.0.1:5778:5778" # HTTP config + - "127.0.0.1:16686:16686" # Jaeger UI + - "127.0.0.1:14268:14268" # HTTP collector + - "127.0.0.1:14250:14250" # gRPC collector + - "127.0.0.1:9411:9411" # Zipkin compatible endpoint environment: - COLLECTOR_ZIPKIN_HOST_PORT=:9411 - COLLECTOR_OTLP_ENABLED=true @@ -572,7 +481,7 @@ services: profiles: ["observability"] mem_limit: 256m ports: - - "8428:8428" + - "127.0.0.1:8428:8428" volumes: - victoria_metrics_data:/victoria-metrics-data command: @@ -601,7 +510,7 @@ services: - "--topic.filter=.*" - "--group.filter=.*" ports: - - "9308:9308" + - "127.0.0.1:9308:9308" networks: - app-network depends_on: @@ -620,9 +529,9 @@ services: - ./backend/otel-collector-config.yaml:/etc/otel-collector-config.yaml:ro - /var/run/docker.sock:/var/run/docker.sock:ro ports: - - "4317:4317" # OTLP gRPC - - "4318:4318" # OTLP HTTP - - "13133:13133" # Health check + - "127.0.0.1:4317:4317" # OTLP gRPC + - "127.0.0.1:4318:4318" # OTLP HTTP + - "127.0.0.1:13133:13133" # Health check networks: - app-network depends_on: @@ -641,10 +550,6 @@ volumes: shared_ca: kafka_data: kafka_logs: - zookeeper_data: - zookeeper_log: - zookeeper_logs: - zookeeper_certs: # --8<-- [end:dev_volumes] networks: diff --git a/docs/architecture/overview.md b/docs/architecture/overview.md index 297a79e0..a29d2a4f 100644 --- a/docs/architecture/overview.md +++ b/docs/architecture/overview.md @@ -31,7 +31,7 @@ Integr8sCode lets users submit Python scripts through a Svelte SPA. The FastAPI ![System diagram](/assets/images/system_diagram.png) The SPA hits the frontend, which proxies to the API over HTTPS; the API -serves both REST and SSE. Kafka carries events as JSON (serialized by FastStream) with Zookeeper backing it; kafka- +serves both REST and SSE. Kafka carries events as JSON (serialized by FastStream) using KRaft for metadata consensus; kafka- init seeds topics. All workers are separate containers subscribed to Kafka; the k8s-worker talks to the Kubernetes API to run code, the pod-monitor watches pods, the result-processor writes results to Mongo and nudges Redis for SSE fanout, and the saga-orchestrator coordinates long flows with Mongo and Redis. diff --git a/docs/operations/cicd.md b/docs/operations/cicd.md index b22f10b9..cd715d7e 100644 --- a/docs/operations/cicd.md +++ b/docs/operations/cicd.md @@ -33,7 +33,7 @@ graph LR end subgraph "Docker Scan & Promote" - Scan["Trivy Scan (5 images)"] + Scan["Trivy Scan (4 images)"] Promote["Promote SHA → latest"] Scan --> Promote end @@ -92,7 +92,7 @@ no setup to overlap. ## Stack Tests (the main workflow) -This is the core testing workflow. It builds all 5 container images, pushes them to GHCR with immutable SHA-based +This is the core testing workflow. It builds all 4 container images, pushes them to GHCR with immutable SHA-based tags, then runs E2E tests on separate runners that pull images from the registry. ```mermaid @@ -131,29 +131,28 @@ the image build is skipped entirely. ### Phase 2: Build and push -All 5 images are built on a single runner and pushed to GHCR with an immutable `sha-<7chars>` tag: +All 4 images are built on a single runner and pushed to GHCR with an immutable `sha-<7chars>` tag: | Image | Source | |----------------------|---------------------------------------------| | `base` | `backend/Dockerfile.base` | | `backend` | `backend/Dockerfile` | | `cert-generator` | `cert-generator/Dockerfile` | -| `zookeeper-certgen` | `backend/zookeeper/Dockerfile.certgen` | | `frontend` | `frontend/Dockerfile` | Workers reuse the `backend` image with different `command:` overrides in docker-compose, so no separate worker images -are needed. All 5 images are scanned by Trivy and promoted to `latest` in the +are needed. All 4 images are scanned by Trivy and promoted to `latest` in the [Docker Scan & Promote](#docker-scan-promote) workflow. The base image is cached separately as a zstd-compressed tarball since its dependencies rarely change. The backend image depends on it via `--build-context base=docker-image://integr8scode-base:latest`. Utility and frontend images use GHA layer caching. -All 5 images are pushed to GHCR in parallel, with each push tracked by PID so individual failures are reported: +All 4 images are pushed to GHCR in parallel, with each push tracked by PID so individual failures are reported: ```yaml declare -A PIDS -for name in base backend cert-generator zookeeper-certgen ...; do +for name in base backend cert-generator frontend; do docker push "$IMG/$name:$TAG" & PIDS[$name]=$! done @@ -180,7 +179,7 @@ This action kicks off three slow tasks that can overlap: 1. **GHCR login** using `docker/login-action@v3` 2. **Background image pull + infra pre-warm** — pulls all compose images then starts infrastructure services - (mongo, redis, kafka, zookeeper) in a background `nohup` process. The exit status is persisted + (mongo, redis, kafka) in a background `nohup` process. The exit status is persisted to `/tmp/infra-pull.exit` so the next action can check for failures. 3. **k3s install** — downloads and installs a pinned k3s version with SHA256 checksum verification (see [supply-chain hardening](#supply-chain-hardening) below) @@ -238,7 +237,7 @@ sets it, and only after all tests pass. ```mermaid graph LR ST["Stack Tests
(main, success)"] -->|workflow_run trigger| Scan - Scan["Trivy Scan
(5 images in parallel)"] --> Promote["crane copy
sha-xxx → latest"] + Scan["Trivy Scan
(4 images in parallel)"] --> Promote["crane copy
sha-xxx → latest"] Promote --> Summary["Step Summary"] ``` @@ -249,7 +248,7 @@ Runs automatically when `Stack Tests` completes successfully on `main`. Can also ### Scan -Uses [Trivy](https://trivy.dev/) (pinned at `v0.68.2`) to scan all 5 deployed images in parallel via matrix strategy. +Uses [Trivy](https://trivy.dev/) (pinned at `v0.68.2`) to scan all 4 deployed images in parallel via matrix strategy. Scans for `CRITICAL` and `HIGH` severity vulnerabilities with unfixed issues ignored. Results are uploaded as SARIF files to GitHub's Security tab. @@ -285,7 +284,7 @@ Releases use [Calendar Versioning](https://calver.org/) with the format `YYYY.M. - `PATCH` — auto-incrementing counter within the month, starting at `0` Examples: `2026.2.0`, `2026.2.1`, `2026.3.0`. The workflow counts existing tags matching the current `YYYY.M.*` pattern -and increments the patch number. All 5 deployed GHCR images are tagged with the CalVer version using crane (same +and increments the patch number. All 4 deployed GHCR images are tagged with the CalVer version using crane (same registry-level manifest copy as the promote step). ### GitHub Release @@ -438,7 +437,7 @@ Playwright browsers are cached by `package-lock.json` hash. On cache hit, only s ### Parallel image push -All 5 images are pushed to GHCR concurrently using background processes with PID tracking. Each push failure is +All 4 images are pushed to GHCR concurrently using background processes with PID tracking. Each push failure is reported individually via `::error::` annotations. ## Running locally diff --git a/docs/operations/deployment.md b/docs/operations/deployment.md index 15203ff0..2d42a0a9 100644 --- a/docs/operations/deployment.md +++ b/docs/operations/deployment.md @@ -41,7 +41,7 @@ with health checks and dependency ordering, so containers start in the correct s ./deploy.sh dev ``` -This brings up MongoDB, Redis, Kafka with Zookeeper, all seven workers, the backend API, and the +This brings up MongoDB, Redis, Kafka (KRaft mode), all seven workers, the backend API, and the frontend. Two initialization containers run automatically: `kafka-init` creates required Kafka topics, and `user-seed` populates the database with default user accounts. @@ -129,7 +129,6 @@ services define healthchecks in `docker-compose.yaml`: | Redis | `redis-cli ping` | | Backend | `curl /api/v1/health/live` | | Kafka | `kafka-broker-api-versions` | -| Zookeeper | `echo ruok \| nc localhost 2181 \| grep imok` | Services without explicit healthchecks (workers, Grafana, Kafdrop) are considered "started" when their container is running. The test suite doesn't require worker containers since tests instantiate worker classes directly. @@ -142,8 +141,7 @@ Every long-running service has a `mem_limit` in `docker-compose.yaml` to prevent |---------|-------------|--------------|-------| | MongoDB | 1024 m | wiredTiger 0.4 GB | `--wiredTigerCacheSizeGB 0.4` prevents default 50 %-of-RAM behavior | | Redis | 300 m | 256 MB maxmemory | LRU eviction, persistence disabled | -| Kafka | 1280 m | JVM `-Xms256m -Xmx1g` | Single-broker, low throughput workload | -| Zookeeper | 512 m | JVM `-Xms128m -Xmx256m` | Metadata-only role | +| Kafka | 1280 m | JVM `-Xms256m -Xmx1g` | Single-broker KRaft mode, low throughput workload | | Backend API | 768 m | 2 gunicorn workers | Controlled by `WEB_CONCURRENCY` env var | | Frontend | 128 m | nginx serving static assets | | | Each worker (×7) | 160 m | Single-process Python | coordinator, k8s-worker, pod-monitor, result-processor, saga-orchestrator, event-replay, dlq-processor | @@ -153,7 +151,7 @@ Every long-running service has a `mem_limit` in `docker-compose.yaml` to prevent | OTel Collector | 192 m | `limit_mib: 150` in memory_limiter processor | Observability profile | | Kafka Exporter | 96 m | | Observability profile | -All long-running services — core infrastructure (MongoDB, Redis, Kafka, Zookeeper, backend, frontend), all seven workers (coordinator, k8s-worker, pod-monitor, result-processor, saga-orchestrator, event-replay, dlq-processor), and observability components (Grafana, kafka-exporter, victoria-metrics, otel-collector) — have `restart: unless-stopped` so they recover automatically after an OOM kill or crash. +All long-running services — core infrastructure (MongoDB, Redis, Kafka, backend, frontend), all seven workers (coordinator, k8s-worker, pod-monitor, result-processor, saga-orchestrator, event-replay, dlq-processor), and observability components (Grafana, kafka-exporter, victoria-metrics, otel-collector) — have `restart: unless-stopped` so they recover automatically after an OOM kill or crash. ## Monitoring