From 68449e7e601898cebe22b4becdb4b74ed86e743d Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Wed, 18 Feb 2026 01:15:04 +0100 Subject: [PATCH 1/8] fix: tls+opentext ports for zookeeper -> only tls (2281) --- backend/zookeeper/conf/zoo.cfg | 4 +- backend/zookeeper/generate-certs.sh | 65 ++++++++++++++++++++--------- docker-compose.yaml | 52 +++++++++++++---------- docs/operations/deployment.md | 2 +- 4 files changed, 76 insertions(+), 47 deletions(-) diff --git a/backend/zookeeper/conf/zoo.cfg b/backend/zookeeper/conf/zoo.cfg index 496e14d3..32da9a10 100644 --- a/backend/zookeeper/conf/zoo.cfg +++ b/backend/zookeeper/conf/zoo.cfg @@ -2,7 +2,6 @@ tickTime=2000 dataDir=/var/lib/zookeeper/data dataLogDir=/var/lib/zookeeper/log -clientPort=2181 secureClientPort=2281 serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory authProvider.1=org.apache.zookeeper.server.auth.DigestAuthenticationProvider @@ -11,7 +10,6 @@ ssl.keyStore.password=zookeeper_keystore_password ssl.trustStore.location=/etc/kafka/certs/zookeeper.truststore.jks ssl.trustStore.password=zookeeper_truststore_password ssl.clientAuth=need -sslQuorum=true # Autopurge settings autopurge.snapRetainCount=3 @@ -26,4 +24,4 @@ admin.serverPort=0 # Metrics metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider -metricsProvider.httpPort=7070 \ No newline at end of file +metricsProvider.httpPort=7070 diff --git a/backend/zookeeper/generate-certs.sh b/backend/zookeeper/generate-certs.sh index 8cd4294b..55f3f770 100644 --- a/backend/zookeeper/generate-certs.sh +++ b/backend/zookeeper/generate-certs.sh @@ -2,31 +2,37 @@ set -e # Check if certificates already exist -if [ -f /certs/zookeeper.keystore.jks ] && [ -f /certs/zookeeper.truststore.jks ]; then +if [ -f /certs/zookeeper.keystore.jks ] && [ -f /certs/kafka-client.keystore.jks ]; then echo "Certificates already exist, skipping generation" exit 0 fi -# Generate private key -openssl genrsa -out /certs/zookeeper.key 2048 +# 1. Generate CA key and self-signed CA certificate +openssl genrsa -out /certs/ca.key 2048 +openssl req -new -x509 -days 365 -key /certs/ca.key -out /certs/ca.crt \ + -subj "/C=US/ST=CA/L=SF/O=Integr8sCode/CN=Integr8sCode-CA" -# Generate certificate signing request +# 2. Generate Zookeeper server key and CA-signed certificate +openssl genrsa -out /certs/zookeeper.key 2048 openssl req -new -key /certs/zookeeper.key -out /certs/zookeeper.csr \ -subj "/C=US/ST=CA/L=SF/O=Integr8sCode/CN=zookeeper" - -# Generate self-signed certificate openssl x509 -req -days 365 -in /certs/zookeeper.csr \ - -signkey /certs/zookeeper.key -out /certs/zookeeper.crt + -CA /certs/ca.crt -CAkey /certs/ca.key -CAcreateserial \ + -out /certs/zookeeper.crt + +# 3. Generate Kafka client key and CA-signed certificate +openssl genrsa -out /certs/kafka-client.key 2048 +openssl req -new -key /certs/kafka-client.key -out /certs/kafka-client.csr \ + -subj "/C=US/ST=CA/L=SF/O=Integr8sCode/CN=kafka" +openssl x509 -req -days 365 -in /certs/kafka-client.csr \ + -CA /certs/ca.crt -CAkey /certs/ca.key -CAcreateserial \ + -out /certs/kafka-client.crt -# Create PKCS12 keystore +# 4. Build Zookeeper server keystore (JKS) +rm -f /certs/zookeeper.keystore.jks openssl pkcs12 -export -in /certs/zookeeper.crt -inkey /certs/zookeeper.key \ -out /certs/zookeeper.p12 -name zookeeper \ -password pass:zookeeper_keystore_password - -# Remove existing keystore if it exists -rm -f /certs/zookeeper.keystore.jks - -# Convert PKCS12 to JKS keytool -importkeystore \ -deststorepass zookeeper_keystore_password \ -destkeypass zookeeper_keystore_password \ @@ -37,16 +43,35 @@ keytool -importkeystore \ -alias zookeeper \ -noprompt -# Remove existing truststore if it exists +# 5. Build Zookeeper truststore (CA cert — trusts any client signed by the CA) rm -f /certs/zookeeper.truststore.jks - -# Create truststore -keytool -keystore /certs/zookeeper.truststore.jks -alias zookeeper \ - -import -file /certs/zookeeper.crt \ +keytool -keystore /certs/zookeeper.truststore.jks -alias ca \ + -import -file /certs/ca.crt \ -storepass zookeeper_truststore_password -noprompt +# 6. Build Kafka client keystore (JKS) +rm -f /certs/kafka-client.keystore.jks +openssl pkcs12 -export -in /certs/kafka-client.crt -inkey /certs/kafka-client.key \ + -out /certs/kafka-client.p12 -name kafka \ + -password pass:kafka_keystore_password +keytool -importkeystore \ + -deststorepass kafka_keystore_password \ + -destkeypass kafka_keystore_password \ + -destkeystore /certs/kafka-client.keystore.jks \ + -srckeystore /certs/kafka-client.p12 \ + -srcstoretype PKCS12 \ + -srcstorepass kafka_keystore_password \ + -alias kafka \ + -noprompt + +# 7. Build Kafka client truststore (CA cert — trusts Zookeeper's server cert) +rm -f /certs/kafka-client.truststore.jks +keytool -keystore /certs/kafka-client.truststore.jks -alias ca \ + -import -file /certs/ca.crt \ + -storepass kafka_truststore_password -noprompt + # Set permissions chmod 644 /certs/* -echo "Zookeeper certificates generated successfully" -ls -la /certs/ \ No newline at end of file +echo "Zookeeper and Kafka certificates generated successfully" +ls -la /certs/ diff --git a/docker-compose.yaml b/docker-compose.yaml index 1fdde620..9c265fb4 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -189,7 +189,7 @@ services: - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_ADMIN_PASSWORD:-admin123} # Kafka Infrastructure for Event-Driven Design - # Certificate generator for Zookeeper/Kafka SSL + # Certificate generator for Zookeeper/Kafka mTLS zookeeper-certgen: image: ghcr.io/hardmax71/integr8scode/zookeeper-certgen:${IMAGE_TAG:-latest} build: @@ -209,52 +209,50 @@ services: zookeeper-certgen: condition: service_completed_successfully environment: - # Basic configuration - ZOOKEEPER_CLIENT_PORT: 2181 - # Enable TLS-secure client port to match zoo.cfg + # TLS-only: no plaintext client port ZOOKEEPER_SECURE_CLIENT_PORT: 2281 ZOOKEEPER_TICK_TIME: 2000 - + # Production settings ZOOKEEPER_MAX_CLIENT_CNXNS: 60 ZOOKEEPER_INIT_LIMIT: 10 ZOOKEEPER_SYNC_LIMIT: 5 - - # Security settings (plaintext + SASL only) + + # Security settings (SASL + mTLS) ZOOKEEPER_AUTH_PROVIDER_1: org.apache.zookeeper.server.auth.DigestAuthenticationProvider ZOOKEEPER_SERVER_CNXN_FACTORY: org.apache.zookeeper.server.NettyServerCnxnFactory KAFKA_OPTS: '-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory -Dzookeeper.4lw.commands.whitelist=* -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_jaas.conf' - # TLS settings for secure client port (keystore/truststore produced by certgen) + # TLS settings (keystore/truststore produced by certgen) ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/kafka/certs/zookeeper.keystore.jks ZOOKEEPER_SSL_KEYSTORE_PASSWORD: zookeeper_keystore_password ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/certs/zookeeper.truststore.jks ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: zookeeper_truststore_password ZOOKEEPER_SSL_CLIENT_AUTH: need - - # 4lw commands whitelist (minimal for security) + + # 4lw commands whitelist ZOOKEEPER_4LW_COMMANDS_WHITELIST: '*' - + # Autopurge settings ZOOKEEPER_AUTOPURGE_SNAP_RETAIN_COUNT: 3 ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: 24 - + # JVM settings KAFKA_HEAP_OPTS: '-Xms128m -Xmx256m' KAFKA_JVM_PERFORMANCE_OPTS: '-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:+ParallelRefProcEnabled -Djute.maxbuffer=4194304' - # Logging + # Logging — suppress TLS handshake noise from internet scanners ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: 'WARN' - ZOOKEEPER_LOG4J_LOGGERS: 'org.apache.zookeeper.server.auth=INFO,org.apache.zookeeper.audit=INFO' - + ZOOKEEPER_LOG4J_LOGGERS: 'org.apache.zookeeper.server.auth=INFO,org.apache.zookeeper.audit=INFO,org.apache.zookeeper.server.NettyServerCnxnFactory=ERROR' + # Disable admin server for security ZOOKEEPER_ADMIN_ENABLE_SERVER: 'false' ZOOKEEPER_ADMIN_SERVER_PORT: 0 - + # Metrics ZOOKEEPER_METRICS_PROVIDER_CLASS_NAME: org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider ZOOKEEPER_METRICS_PROVIDER_HTTP_PORT: 7070 - + volumes: - ./backend/zookeeper/conf:/etc/kafka/conf:ro - ./backend/zookeeper/secrets:/etc/kafka/secrets:ro @@ -263,8 +261,7 @@ services: - zookeeper_log:/var/lib/zookeeper/log - zookeeper_logs:/var/log/zookeeper ports: - - "2181:2181" - - "2281:2281" # Secure TLS client port + - "2281:2281" - "7070:7070" # Metrics port networks: - app-network @@ -275,7 +272,7 @@ services: mem_limit: 512m restart: unless-stopped healthcheck: - test: ["CMD-SHELL", "echo ruok | nc localhost 2181 | grep imok"] + test: ["CMD-SHELL", "curl -sf http://localhost:7070/metrics >/dev/null"] interval: 3s timeout: 5s retries: 15 @@ -285,6 +282,8 @@ services: image: confluentinc/cp-kafka:7.8.2 container_name: kafka depends_on: + zookeeper-certgen: + condition: service_completed_successfully zookeeper: condition: service_healthy ports: @@ -292,7 +291,13 @@ services: - "29092:29092" environment: KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2281 + KAFKA_ZOOKEEPER_SSL_CLIENT_ENABLE: 'true' + KAFKA_ZOOKEEPER_CLIENT_CNXN_SOCKET: org.apache.zookeeper.ClientCnxnSocketNetty + KAFKA_ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/kafka/certs/kafka-client.keystore.jks + KAFKA_ZOOKEEPER_SSL_KEYSTORE_PASSWORD: kafka_keystore_password + KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/certs/kafka-client.truststore.jks + KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: kafka_truststore_password KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 @@ -301,8 +306,8 @@ services: KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' # Needed for initial setup KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_LOG_RETENTION_HOURS: 168 - - # Security settings for Zookeeper connection (SASL/Digest over plaintext) + + # Security settings for Zookeeper connection (SASL/Digest over TLS) KAFKA_OPTS: '-Dzookeeper.sasl.client=true -Dzookeeper.sasl.clientconfig=Client -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_jaas.conf' KAFKA_ZOOKEEPER_SET_ACL: 'false' # KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer' @@ -327,6 +332,7 @@ services: volumes: - ./backend/zookeeper/secrets/kafka_jaas.conf:/etc/kafka/secrets/kafka_jaas.conf:ro + - zookeeper_certs:/etc/kafka/certs:ro - kafka_data:/var/lib/kafka/data - kafka_logs:/var/log/kafka networks: diff --git a/docs/operations/deployment.md b/docs/operations/deployment.md index 15203ff0..8e2cb677 100644 --- a/docs/operations/deployment.md +++ b/docs/operations/deployment.md @@ -129,7 +129,7 @@ services define healthchecks in `docker-compose.yaml`: | Redis | `redis-cli ping` | | Backend | `curl /api/v1/health/live` | | Kafka | `kafka-broker-api-versions` | -| Zookeeper | `echo ruok \| nc localhost 2181 \| grep imok` | +| Zookeeper | `curl -sf http://localhost:7070/metrics` (Prometheus metrics port; TLS-only, no plaintext client port) | Services without explicit healthchecks (workers, Grafana, Kafdrop) are considered "started" when their container is running. The test suite doesn't require worker containers since tests instantiate worker classes directly. From e00af67e4d505bd9ac4a796a78583fc8f8fa1329 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Wed, 18 Feb 2026 07:18:29 +0100 Subject: [PATCH 2/8] fix: health-check for zookeeper updated (http -> nc) --- docker-compose.yaml | 2 +- docs/operations/deployment.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 9c265fb4..293b6026 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -272,7 +272,7 @@ services: mem_limit: 512m restart: unless-stopped healthcheck: - test: ["CMD-SHELL", "curl -sf http://localhost:7070/metrics >/dev/null"] + test: ["CMD-SHELL", "nc -z localhost 7070"] interval: 3s timeout: 5s retries: 15 diff --git a/docs/operations/deployment.md b/docs/operations/deployment.md index 8e2cb677..0b7f9d33 100644 --- a/docs/operations/deployment.md +++ b/docs/operations/deployment.md @@ -129,7 +129,7 @@ services define healthchecks in `docker-compose.yaml`: | Redis | `redis-cli ping` | | Backend | `curl /api/v1/health/live` | | Kafka | `kafka-broker-api-versions` | -| Zookeeper | `curl -sf http://localhost:7070/metrics` (Prometheus metrics port; TLS-only, no plaintext client port) | +| Zookeeper | `nc -z localhost 7070` (Prometheus metrics port; TLS-only, no plaintext client port) | Services without explicit healthchecks (workers, Grafana, Kafdrop) are considered "started" when their container is running. The test suite doesn't require worker containers since tests instantiate worker classes directly. From 7c18b8673da88e29c72c7ebd8916ae740ff4ea3c Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Wed, 18 Feb 2026 10:24:39 +0100 Subject: [PATCH 3/8] fix: closed ports, updated docs --- docker-compose.yaml | 50 +++++++++++++++-------------------- docs/operations/deployment.md | 2 +- 2 files changed, 23 insertions(+), 29 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 293b6026..27590ac8 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -43,7 +43,7 @@ services: image: mongo:8.0 command: ["mongod", "--wiredTigerCacheSizeGB", "0.4"] ports: - - "27017:27017" + - "127.0.0.1:27017:27017" environment: MONGO_INITDB_ROOT_USERNAME: ${MONGO_ROOT_USER:-root} MONGO_INITDB_ROOT_PASSWORD: ${MONGO_ROOT_PASSWORD:-rootpassword} @@ -70,7 +70,7 @@ services: image: redis:7-alpine container_name: redis ports: - - "6379:6379" + - "127.0.0.1:6379:6379" command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru --save "" volumes: - redis_data:/data @@ -118,7 +118,7 @@ services: - ./backend/kubeconfig.yaml:/app/kubeconfig.yaml:ro ports: - "443:443" - - "9090:9090" # Metrics port (host:container) + - "127.0.0.1:9090:9090" # Metrics port (host:container) networks: - app-network container_name: backend @@ -173,7 +173,7 @@ services: image: grafana/grafana:12.3.1 user: "472" ports: - - "3000:3000" + - "127.0.0.1:3000:3000" volumes: - ./backend/grafana/grafana.ini:/etc/grafana/grafana.ini:ro - ./backend/grafana/provisioning:/etc/grafana/provisioning:ro @@ -249,20 +249,14 @@ services: ZOOKEEPER_ADMIN_ENABLE_SERVER: 'false' ZOOKEEPER_ADMIN_SERVER_PORT: 0 - # Metrics - ZOOKEEPER_METRICS_PROVIDER_CLASS_NAME: org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider - ZOOKEEPER_METRICS_PROVIDER_HTTP_PORT: 7070 - volumes: - - ./backend/zookeeper/conf:/etc/kafka/conf:ro - ./backend/zookeeper/secrets:/etc/kafka/secrets:ro - zookeeper_certs:/etc/kafka/certs:ro - zookeeper_data:/var/lib/zookeeper/data - zookeeper_log:/var/lib/zookeeper/log - zookeeper_logs:/var/log/zookeeper ports: - - "2281:2281" - - "7070:7070" # Metrics port + - "127.0.0.1:2281:2281" networks: - app-network ulimits: @@ -272,7 +266,7 @@ services: mem_limit: 512m restart: unless-stopped healthcheck: - test: ["CMD-SHELL", "nc -z localhost 7070"] + test: ["CMD-SHELL", "nc -z localhost 2281"] interval: 3s timeout: 5s retries: 15 @@ -287,8 +281,8 @@ services: zookeeper: condition: service_healthy ports: - - "9092:9092" - - "29092:29092" + - "127.0.0.1:9092:9092" + - "127.0.0.1:29092:29092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2281 @@ -357,7 +351,7 @@ services: depends_on: - kafka ports: - - "9000:9000" + - "127.0.0.1:9000:9000" environment: KAFKA_BROKERCONNECT: kafka:29092 JVM_OPTS: "-Xms256M -Xmx512M" @@ -514,14 +508,14 @@ services: profiles: ["observability"] mem_limit: 256m ports: - - "5775:5775/udp" # Zipkin/thrift compact - - "6831:6831/udp" # Thrift compact - - "6832:6832/udp" # Thrift binary - - "5778:5778" # HTTP config - - "16686:16686" # Jaeger UI - - "14268:14268" # HTTP collector - - "14250:14250" # gRPC collector - - "9411:9411" # Zipkin compatible endpoint + - "127.0.0.1:5775:5775/udp" # Zipkin/thrift compact + - "127.0.0.1:6831:6831/udp" # Thrift compact + - "127.0.0.1:6832:6832/udp" # Thrift binary + - "127.0.0.1:5778:5778" # HTTP config + - "127.0.0.1:16686:16686" # Jaeger UI + - "127.0.0.1:14268:14268" # HTTP collector + - "127.0.0.1:14250:14250" # gRPC collector + - "127.0.0.1:9411:9411" # Zipkin compatible endpoint environment: - COLLECTOR_ZIPKIN_HOST_PORT=:9411 - COLLECTOR_OTLP_ENABLED=true @@ -578,7 +572,7 @@ services: profiles: ["observability"] mem_limit: 256m ports: - - "8428:8428" + - "127.0.0.1:8428:8428" volumes: - victoria_metrics_data:/victoria-metrics-data command: @@ -607,7 +601,7 @@ services: - "--topic.filter=.*" - "--group.filter=.*" ports: - - "9308:9308" + - "127.0.0.1:9308:9308" networks: - app-network depends_on: @@ -626,9 +620,9 @@ services: - ./backend/otel-collector-config.yaml:/etc/otel-collector-config.yaml:ro - /var/run/docker.sock:/var/run/docker.sock:ro ports: - - "4317:4317" # OTLP gRPC - - "4318:4318" # OTLP HTTP - - "13133:13133" # Health check + - "127.0.0.1:4317:4317" # OTLP gRPC + - "127.0.0.1:4318:4318" # OTLP HTTP + - "127.0.0.1:13133:13133" # Health check networks: - app-network depends_on: diff --git a/docs/operations/deployment.md b/docs/operations/deployment.md index 0b7f9d33..cf1af52e 100644 --- a/docs/operations/deployment.md +++ b/docs/operations/deployment.md @@ -129,7 +129,7 @@ services define healthchecks in `docker-compose.yaml`: | Redis | `redis-cli ping` | | Backend | `curl /api/v1/health/live` | | Kafka | `kafka-broker-api-versions` | -| Zookeeper | `nc -z localhost 7070` (Prometheus metrics port; TLS-only, no plaintext client port) | +| Zookeeper | `nc -z localhost 2281` (TLS-only, no plaintext client port) | Services without explicit healthchecks (workers, Grafana, Kafdrop) are considered "started" when their container is running. The test suite doesn't require worker containers since tests instantiate worker classes directly. From c56c54158c99a4e52c719969a116e3ff4d408e1e Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Wed, 18 Feb 2026 10:52:34 +0100 Subject: [PATCH 4/8] fix: saga flky tests --- backend/tests/e2e/conftest.py | 8 +++++--- backend/tests/e2e/test_saga_routes.py | 20 +++++++++----------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/backend/tests/e2e/conftest.py b/backend/tests/e2e/conftest.py index 57fbd95e..f2e7ff8e 100644 --- a/backend/tests/e2e/conftest.py +++ b/backend/tests/e2e/conftest.py @@ -10,7 +10,7 @@ from aiokafka import AIOKafkaConsumer from app.db.docs.saga import SagaDocument from app.domain.enums import EventType, KafkaTopic, UserRole -from app.domain.events import DomainEvent, DomainEventAdapter +from app.domain.events import DomainEvent, DomainEventAdapter, SagaStartedEvent from app.schemas_pydantic.execution import ExecutionRequest, ExecutionResponse from app.schemas_pydantic.notification import NotificationListResponse, NotificationResponse from app.schemas_pydantic.saga import SagaStatusResponse @@ -110,15 +110,17 @@ async def wait_for_saga_command(self, execution_id: str, timeout: float = 15.0) timeout=timeout, ) - async def wait_for_saga_started(self, execution_id: str, timeout: float = 15.0) -> DomainEvent: + async def wait_for_saga_started(self, execution_id: str, timeout: float = 15.0) -> SagaStartedEvent: """Wait for SAGA_STARTED — saga document is guaranteed in MongoDB after this.""" - return await self.wait_for( + event = await self.wait_for( lambda e: ( e.event_type == EventType.SAGA_STARTED and e.execution_id == execution_id ), timeout=timeout, ) + assert isinstance(event, SagaStartedEvent) + return event async def wait_for_notification_created(self, execution_id: str, timeout: float = 15.0) -> DomainEvent: """Wait for NOTIFICATION_CREATED — notification is guaranteed in MongoDB after this.""" diff --git a/backend/tests/e2e/test_saga_routes.py b/backend/tests/e2e/test_saga_routes.py index 6378520e..120fe9e7 100644 --- a/backend/tests/e2e/test_saga_routes.py +++ b/backend/tests/e2e/test_saga_routes.py @@ -201,25 +201,26 @@ async def test_cancel_saga( assert exec_response.status_code == 200 execution = ExecutionResponse.model_validate(exec_response.json()) + + # Get saga_id from SAGA_STARTED event (published after saga persisted) + started = await event_waiter.wait_for_saga_started(execution.execution_id) + # Wait for CREATE_POD_COMMAND — the orchestrator's last step. # After this the orchestrator is idle, so cancel won't race with # concurrent step-processing writes to the saga document. await event_waiter.wait_for_saga_command(execution.execution_id) - saga_resp = await test_user.get(f"/api/v1/sagas/execution/{execution.execution_id}") - saga = SagaListResponse.model_validate(saga_resp.json()).sagas[0] - - response = await test_user.post(f"/api/v1/sagas/{saga.saga_id}/cancel") + response = await test_user.post(f"/api/v1/sagas/{started.saga_id}/cancel") assert response.status_code == 200 result = SagaCancellationResponse.model_validate(response.json()) - assert result.saga_id == saga.saga_id + assert result.saga_id == started.saga_id assert result.success is True assert result.message is not None # cancel_saga sets state to CANCELLED synchronously in MongoDB # before returning the HTTP response (compensation also runs inline). - status_resp = await test_user.get(f"/api/v1/sagas/{saga.saga_id}") + status_resp = await test_user.get(f"/api/v1/sagas/{started.saga_id}") assert status_resp.status_code == 200 updated_saga = SagaStatusResponse.model_validate(status_resp.json()) assert updated_saga.state == SagaState.CANCELLED @@ -250,12 +251,9 @@ async def test_cancel_other_users_saga_forbidden( assert exec_response.status_code == 200 execution = ExecutionResponse.model_validate(exec_response.json()) - await event_waiter.wait_for_saga_started(execution.execution_id) - - saga_resp = await test_user.get(f"/api/v1/sagas/execution/{execution.execution_id}") - saga = SagaListResponse.model_validate(saga_resp.json()).sagas[0] + started = await event_waiter.wait_for_saga_started(execution.execution_id) - response = await another_user.post(f"/api/v1/sagas/{saga.saga_id}/cancel") + response = await another_user.post(f"/api/v1/sagas/{started.saga_id}/cancel") assert response.status_code == 403 From e20c7b2774fd67dd525b04f118b6a06597708a51 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Wed, 18 Feb 2026 11:09:38 +0100 Subject: [PATCH 5/8] fix: sse service flaky data conversion --- backend/app/services/sse/sse_service.py | 29 ++++++++++--------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/backend/app/services/sse/sse_service.py b/backend/app/services/sse/sse_service.py index c698ec0e..3b87ec96 100644 --- a/backend/app/services/sse/sse_service.py +++ b/backend/app/services/sse/sse_service.py @@ -9,7 +9,6 @@ from app.core.metrics import ConnectionMetrics from app.db.repositories import SSERepository from app.domain.enums import EventType, NotificationChannel, SSEControlEvent -from app.domain.events import ResourceUsageDomain from app.domain.execution.models import ExecutionResultDomain from app.domain.sse import ( DomainNotificationSSEPayload, @@ -128,7 +127,12 @@ async def create_execution_stream(self, execution_id: str, user_id: str) -> Asyn self.logger.info("SSE connection closed", execution_id=execution_id) async def _build_sse_event_from_redis(self, execution_id: str, msg: RedisSSEMessage) -> SSEExecutionEventData: - """Build typed SSE event from Redis message.""" + """Build typed SSE event from Redis message. + + Uses validate_python to coerce JSON primitives (str → enum, ISO str → datetime, + dict → dataclass) back to proper types after the Redis JSON round-trip. + Extra keys from the original domain event are ignored. + """ result: ExecutionResultDomain | None = None if msg.event_type == EventType.RESULT_STORED: execution = await self.repository.get_execution(execution_id) @@ -142,21 +146,12 @@ async def _build_sse_event_from_redis(self, execution_id: str, msg: RedisSSEMess execution_id=execution_id, ) - ru = msg.data.get("resource_usage") - return SSEExecutionEventData( - event_type=msg.data.get("event_type", msg.event_type), - execution_id=execution_id, - timestamp=msg.data.get("timestamp"), - event_id=msg.data.get("event_id"), - status=msg.data.get("status"), - stdout=msg.data.get("stdout"), - stderr=msg.data.get("stderr"), - exit_code=msg.data.get("exit_code"), - timeout_seconds=msg.data.get("timeout_seconds"), - message=msg.data.get("message"), - resource_usage=ResourceUsageDomain(**ru) if ru else None, - result=result, - ) + return _sse_event_adapter.validate_python({ + "event_type": msg.event_type, + **msg.data, + "execution_id": execution_id, + "result": result, + }) async def create_notification_stream(self, user_id: str) -> AsyncGenerator[dict[str, Any], None]: subscription: SSERedisSubscription | None = None From 127958b2cb6fdbcd6bfb230d40488eb96cb5a22f Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Wed, 18 Feb 2026 11:31:47 +0100 Subject: [PATCH 6/8] fix: saga atomic delete --- .../app/db/repositories/saga_repository.py | 39 ++++++++++++++++++- .../app/services/saga/saga_orchestrator.py | 26 +++++-------- backend/app/services/sse/sse_service.py | 2 +- 3 files changed, 48 insertions(+), 19 deletions(-) diff --git a/backend/app/db/repositories/saga_repository.py b/backend/app/db/repositories/saga_repository.py index ac453232..b33951ec 100644 --- a/backend/app/db/repositories/saga_repository.py +++ b/backend/app/db/repositories/saga_repository.py @@ -6,12 +6,20 @@ from beanie.odm.enums import SortDirection from beanie.odm.operators.find import BaseFindOperator from beanie.odm.queries.update import UpdateResponse -from beanie.operators import GT, LT, NE, Eq, In +from beanie.operators import GT, LT, NE, Eq, In, Set from monggregate import Pipeline, S from app.db.docs import ExecutionDocument, SagaDocument from app.domain.enums import SagaState -from app.domain.saga import Saga, SagaConcurrencyError, SagaContextData, SagaFilter, SagaListResult, SagaNotFoundError +from app.domain.saga import ( + Saga, + SagaConcurrencyError, + SagaContextData, + SagaFilter, + SagaInvalidStateError, + SagaListResult, + SagaNotFoundError, +) _saga_fields = set(Saga.__dataclass_fields__) @@ -65,6 +73,33 @@ async def save_saga(self, saga_id: str, **updates: Any) -> Saga: raise SagaConcurrencyError(saga_id) from exc return self._to_domain(doc) + async def atomic_cancel_saga( + self, saga_id: str, error_message: str, completed_at: datetime, + ) -> Saga: + """Atomically cancel a saga using findOneAndUpdate. + + Bypasses Beanie's revision check to eliminate TOCTOU races with + concurrent step saves from the orchestrator. + """ + doc = await SagaDocument.find_one( + SagaDocument.saga_id == saga_id, + In(SagaDocument.state, [SagaState.RUNNING, SagaState.CREATED]), + ).update( + Set({ # type: ignore[no-untyped-call] + SagaDocument.state: SagaState.CANCELLED, + SagaDocument.error_message: error_message, + SagaDocument.completed_at: completed_at, + SagaDocument.updated_at: datetime.now(timezone.utc), + }), + response_type=UpdateResponse.NEW_DOCUMENT, + ) + if not doc: + existing = await SagaDocument.find_one(SagaDocument.saga_id == saga_id) + if not existing: + raise SagaNotFoundError(saga_id) + raise SagaInvalidStateError(saga_id, existing.state, "cancel") + return self._to_domain(doc) + async def get_or_create_saga(self, saga: Saga) -> tuple[Saga, bool]: """Atomically get or create a saga by (execution_id, saga_name). diff --git a/backend/app/services/saga/saga_orchestrator.py b/backend/app/services/saga/saga_orchestrator.py index 1a796882..c27d23cd 100644 --- a/backend/app/services/saga/saga_orchestrator.py +++ b/backend/app/services/saga/saga_orchestrator.py @@ -23,8 +23,6 @@ SagaConcurrencyError, SagaConfig, SagaContextData, - SagaInvalidStateError, - SagaNotFoundError, ) from app.events.core import UnifiedProducer @@ -282,27 +280,23 @@ async def get_execution_sagas(self, execution_id: str) -> list[Saga]: async def cancel_saga(self, saga_id: str) -> None: """Cancel a running saga and trigger compensation. + Uses an atomic findOneAndUpdate to set state=CANCELLED, eliminating + revision-check races with the step-execution loop. + Raises SagaNotFoundError if saga doesn't exist. Raises SagaInvalidStateError if saga is not in a cancellable state. - Raises SagaConcurrencyError if saga was modified concurrently. """ - saga_instance = await self.get_saga_status(saga_id) - if not saga_instance: - raise SagaNotFoundError(saga_id) - - if saga_instance.state not in (SagaState.RUNNING, SagaState.CREATED): - raise SagaInvalidStateError(saga_id, saga_instance.state, "cancel") + saved = await self._repo.atomic_cancel_saga( + saga_id, + error_message="Saga cancelled by user request", + completed_at=datetime.now(UTC), + ) self.logger.info( "Saga cancellation initiated", saga_id=saga_id, - execution_id=saga_instance.execution_id, - user_id=saga_instance.context_data.user_id, - ) - - saved = await self._repo.save_saga( - saga_id, state=SagaState.CANCELLED, - error_message="Saga cancelled by user request", completed_at=datetime.now(UTC), + execution_id=saved.execution_id, + user_id=saved.context_data.user_id, ) if self._producer and self.config.store_events: diff --git a/backend/app/services/sse/sse_service.py b/backend/app/services/sse/sse_service.py index 3b87ec96..6bac8e09 100644 --- a/backend/app/services/sse/sse_service.py +++ b/backend/app/services/sse/sse_service.py @@ -147,8 +147,8 @@ async def _build_sse_event_from_redis(self, execution_id: str, msg: RedisSSEMess ) return _sse_event_adapter.validate_python({ - "event_type": msg.event_type, **msg.data, + "event_type": msg.event_type, "execution_id": execution_id, "result": result, }) From b7e3db75f8437f27e531b8b933a2310da0a608a8 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Wed, 18 Feb 2026 12:25:18 +0100 Subject: [PATCH 7/8] feat: zookeeper -> raft --- .github/actions/e2e-boot/action.yml | 2 +- .github/workflows/docker.yml | 4 +- .github/workflows/release-deploy.yml | 2 +- .github/workflows/stack-tests.yml | 19 +-- backend/zookeeper/Dockerfile.certgen | 12 -- backend/zookeeper/conf/log4j.properties | 21 ---- backend/zookeeper/conf/zoo.cfg | 27 ----- backend/zookeeper/generate-certs.sh | 77 ------------ backend/zookeeper/secrets/kafka_jaas.conf | 11 -- deploy.sh | 3 +- docker-compose.yaml | 135 ++++------------------ docs/architecture/overview.md | 2 +- docs/operations/cicd.md | 23 ++-- docs/operations/deployment.md | 8 +- 14 files changed, 42 insertions(+), 304 deletions(-) delete mode 100644 backend/zookeeper/Dockerfile.certgen delete mode 100644 backend/zookeeper/conf/log4j.properties delete mode 100644 backend/zookeeper/conf/zoo.cfg delete mode 100644 backend/zookeeper/generate-certs.sh delete mode 100644 backend/zookeeper/secrets/kafka_jaas.conf diff --git a/.github/actions/e2e-boot/action.yml b/.github/actions/e2e-boot/action.yml index 39d10df9..e6d49b1a 100644 --- a/.github/actions/e2e-boot/action.yml +++ b/.github/actions/e2e-boot/action.yml @@ -28,7 +28,7 @@ runs: IMAGE_TAG='"$IMAGE_TAG"' docker compose pull --quiet 2>&1 echo "--- pull done, starting infra ---" docker compose up -d --no-build \ - mongo redis shared-ca zookeeper-certgen zookeeper kafka 2>&1 + mongo redis shared-ca kafka 2>&1 echo $? > /tmp/infra-pull.exit ' > /tmp/infra-pull.log 2>&1 & echo $! > /tmp/infra-pull.pid diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 61b185e9..e40814e8 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -36,7 +36,6 @@ jobs: - backend - frontend - cert-generator - - zookeeper-certgen steps: - uses: actions/checkout@v6 @@ -110,7 +109,6 @@ jobs: crane copy "$REGISTRY/$PREFIX/backend:$TAG" "$REGISTRY/$PREFIX/backend:latest" crane copy "$REGISTRY/$PREFIX/frontend:$TAG" "$REGISTRY/$PREFIX/frontend:latest" crane copy "$REGISTRY/$PREFIX/cert-generator:$TAG" "$REGISTRY/$PREFIX/cert-generator:latest" - crane copy "$REGISTRY/$PREFIX/zookeeper-certgen:$TAG" "$REGISTRY/$PREFIX/zookeeper-certgen:latest" summary: name: Summary @@ -142,4 +140,4 @@ jobs: echo "| Frontend | \`docker pull $REGISTRY/$PREFIX/frontend:latest\` |" >> $GITHUB_STEP_SUMMARY echo "" >> $GITHUB_STEP_SUMMARY echo "### Security Scans" >> $GITHUB_STEP_SUMMARY - echo "All 5 images scanned with Trivy (CRITICAL + HIGH, unfixed ignored)." >> $GITHUB_STEP_SUMMARY + echo "All 4 images scanned with Trivy (CRITICAL + HIGH, unfixed ignored)." >> $GITHUB_STEP_SUMMARY diff --git a/.github/workflows/release-deploy.yml b/.github/workflows/release-deploy.yml index 3bb636db..58596f5e 100644 --- a/.github/workflows/release-deploy.yml +++ b/.github/workflows/release-deploy.yml @@ -80,7 +80,7 @@ jobs: run: | PREFIX="${GITHUB_REPOSITORY_OWNER,,}/integr8scode" VERSION="${{ steps.calver.outputs.version }}" - for img in base backend frontend cert-generator zookeeper-certgen; do + for img in base backend frontend cert-generator; do crane copy "$REGISTRY/$PREFIX/$img:latest" "$REGISTRY/$PREFIX/$img:$VERSION" done diff --git a/.github/workflows/stack-tests.yml b/.github/workflows/stack-tests.yml index 9f695916..0ac76acd 100644 --- a/.github/workflows/stack-tests.yml +++ b/.github/workflows/stack-tests.yml @@ -29,7 +29,6 @@ env: MONGO_IMAGE: mongo:8.0 REDIS_IMAGE: redis:7-alpine KAFKA_IMAGE: confluentinc/cp-kafka:7.8.2 - ZOOKEEPER_IMAGE: confluentinc/cp-zookeeper:7.8.2 K3S_VERSION: v1.32.11+k3s1 K3S_INSTALL_SHA256: d75e014f2d2ab5d30a318efa5c326f3b0b7596f194afcff90fa7a7a91166d5f7 @@ -184,16 +183,6 @@ jobs: cache-from: type=gha,scope=cert-generator cache-to: type=gha,mode=max,scope=cert-generator - - name: Build zookeeper-certgen image - uses: docker/build-push-action@v6 - with: - context: ./backend/zookeeper - file: ./backend/zookeeper/Dockerfile.certgen - load: true - tags: integr8scode-zookeeper-certgen:latest - cache-from: type=gha,scope=zookeeper-certgen - cache-to: type=gha,mode=max,scope=zookeeper-certgen - # ── Frontend (nginx + SSL) ───────────────────────────────────── - name: Build frontend image uses: docker/build-push-action@v6 @@ -216,13 +205,11 @@ jobs: docker tag integr8scode-base:latest "$IMG/base:$TAG" docker tag integr8scode-backend:latest "$IMG/backend:$TAG" docker tag integr8scode-cert-generator:latest "$IMG/cert-generator:$TAG" - docker tag integr8scode-zookeeper-certgen:latest "$IMG/zookeeper-certgen:$TAG" docker tag integr8scode-frontend:latest "$IMG/frontend:$TAG" - # Push all 5 images in parallel, tracking each PID + # Push all 4 images in parallel, tracking each PID declare -A PIDS - for name in base backend cert-generator zookeeper-certgen \ - frontend; do + for name in base backend cert-generator frontend; do docker push "$IMG/$name:$TAG" & PIDS[$name]=$! done @@ -286,7 +273,7 @@ jobs: run: | mkdir -p logs docker compose logs --timestamps > logs/docker-compose.log 2>&1 - for svc in backend mongo redis kafka zookeeper \ + for svc in backend mongo redis kafka \ coordinator k8s-worker pod-monitor result-processor \ saga-orchestrator event-replay dlq-processor; do docker compose logs --timestamps "$svc" > "logs/$svc.log" 2>&1 || true diff --git a/backend/zookeeper/Dockerfile.certgen b/backend/zookeeper/Dockerfile.certgen deleted file mode 100644 index cb0c7756..00000000 --- a/backend/zookeeper/Dockerfile.certgen +++ /dev/null @@ -1,12 +0,0 @@ -FROM alpine:3.18 - -RUN apk add --no-cache openssl openjdk11-jre-headless - -# Create certificate directory -RUN mkdir -p /certs - -# Copy certificate generation script -COPY generate-certs.sh /generate-certs.sh -RUN chmod +x /generate-certs.sh - -CMD ["/generate-certs.sh"] \ No newline at end of file diff --git a/backend/zookeeper/conf/log4j.properties b/backend/zookeeper/conf/log4j.properties deleted file mode 100644 index 2b37b2a7..00000000 --- a/backend/zookeeper/conf/log4j.properties +++ /dev/null @@ -1,21 +0,0 @@ -# Zookeeper logging configuration -log4j.rootLogger=WARN, CONSOLE, ROLLINGFILE - -# Console appender -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.Threshold=WARN -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n - -# Rolling file appender -log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender -log4j.appender.ROLLINGFILE.Threshold=INFO -log4j.appender.ROLLINGFILE.File=/var/log/zookeeper/zookeeper.log -log4j.appender.ROLLINGFILE.MaxFileSize=10MB -log4j.appender.ROLLINGFILE.MaxBackupIndex=10 -log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout -log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n - -# Set specific loggers -log4j.logger.org.apache.zookeeper.server.auth=INFO -log4j.logger.org.apache.zookeeper.audit=INFO \ No newline at end of file diff --git a/backend/zookeeper/conf/zoo.cfg b/backend/zookeeper/conf/zoo.cfg deleted file mode 100644 index 32da9a10..00000000 --- a/backend/zookeeper/conf/zoo.cfg +++ /dev/null @@ -1,27 +0,0 @@ -# Zookeeper configuration file -tickTime=2000 -dataDir=/var/lib/zookeeper/data -dataLogDir=/var/lib/zookeeper/log -secureClientPort=2281 -serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory -authProvider.1=org.apache.zookeeper.server.auth.DigestAuthenticationProvider -ssl.keyStore.location=/etc/kafka/certs/zookeeper.keystore.jks -ssl.keyStore.password=zookeeper_keystore_password -ssl.trustStore.location=/etc/kafka/certs/zookeeper.truststore.jks -ssl.trustStore.password=zookeeper_truststore_password -ssl.clientAuth=need - -# Autopurge settings -autopurge.snapRetainCount=3 -autopurge.purgeInterval=24 - -# Admin server settings -admin.enableServer=false -admin.serverPort=0 - -# 4lw commands -4lw.commands.whitelist=* - -# Metrics -metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider -metricsProvider.httpPort=7070 diff --git a/backend/zookeeper/generate-certs.sh b/backend/zookeeper/generate-certs.sh deleted file mode 100644 index 55f3f770..00000000 --- a/backend/zookeeper/generate-certs.sh +++ /dev/null @@ -1,77 +0,0 @@ -#!/bin/sh -set -e - -# Check if certificates already exist -if [ -f /certs/zookeeper.keystore.jks ] && [ -f /certs/kafka-client.keystore.jks ]; then - echo "Certificates already exist, skipping generation" - exit 0 -fi - -# 1. Generate CA key and self-signed CA certificate -openssl genrsa -out /certs/ca.key 2048 -openssl req -new -x509 -days 365 -key /certs/ca.key -out /certs/ca.crt \ - -subj "/C=US/ST=CA/L=SF/O=Integr8sCode/CN=Integr8sCode-CA" - -# 2. Generate Zookeeper server key and CA-signed certificate -openssl genrsa -out /certs/zookeeper.key 2048 -openssl req -new -key /certs/zookeeper.key -out /certs/zookeeper.csr \ - -subj "/C=US/ST=CA/L=SF/O=Integr8sCode/CN=zookeeper" -openssl x509 -req -days 365 -in /certs/zookeeper.csr \ - -CA /certs/ca.crt -CAkey /certs/ca.key -CAcreateserial \ - -out /certs/zookeeper.crt - -# 3. Generate Kafka client key and CA-signed certificate -openssl genrsa -out /certs/kafka-client.key 2048 -openssl req -new -key /certs/kafka-client.key -out /certs/kafka-client.csr \ - -subj "/C=US/ST=CA/L=SF/O=Integr8sCode/CN=kafka" -openssl x509 -req -days 365 -in /certs/kafka-client.csr \ - -CA /certs/ca.crt -CAkey /certs/ca.key -CAcreateserial \ - -out /certs/kafka-client.crt - -# 4. Build Zookeeper server keystore (JKS) -rm -f /certs/zookeeper.keystore.jks -openssl pkcs12 -export -in /certs/zookeeper.crt -inkey /certs/zookeeper.key \ - -out /certs/zookeeper.p12 -name zookeeper \ - -password pass:zookeeper_keystore_password -keytool -importkeystore \ - -deststorepass zookeeper_keystore_password \ - -destkeypass zookeeper_keystore_password \ - -destkeystore /certs/zookeeper.keystore.jks \ - -srckeystore /certs/zookeeper.p12 \ - -srcstoretype PKCS12 \ - -srcstorepass zookeeper_keystore_password \ - -alias zookeeper \ - -noprompt - -# 5. Build Zookeeper truststore (CA cert — trusts any client signed by the CA) -rm -f /certs/zookeeper.truststore.jks -keytool -keystore /certs/zookeeper.truststore.jks -alias ca \ - -import -file /certs/ca.crt \ - -storepass zookeeper_truststore_password -noprompt - -# 6. Build Kafka client keystore (JKS) -rm -f /certs/kafka-client.keystore.jks -openssl pkcs12 -export -in /certs/kafka-client.crt -inkey /certs/kafka-client.key \ - -out /certs/kafka-client.p12 -name kafka \ - -password pass:kafka_keystore_password -keytool -importkeystore \ - -deststorepass kafka_keystore_password \ - -destkeypass kafka_keystore_password \ - -destkeystore /certs/kafka-client.keystore.jks \ - -srckeystore /certs/kafka-client.p12 \ - -srcstoretype PKCS12 \ - -srcstorepass kafka_keystore_password \ - -alias kafka \ - -noprompt - -# 7. Build Kafka client truststore (CA cert — trusts Zookeeper's server cert) -rm -f /certs/kafka-client.truststore.jks -keytool -keystore /certs/kafka-client.truststore.jks -alias ca \ - -import -file /certs/ca.crt \ - -storepass kafka_truststore_password -noprompt - -# Set permissions -chmod 644 /certs/* - -echo "Zookeeper and Kafka certificates generated successfully" -ls -la /certs/ diff --git a/backend/zookeeper/secrets/kafka_jaas.conf b/backend/zookeeper/secrets/kafka_jaas.conf deleted file mode 100644 index 9bea890e..00000000 --- a/backend/zookeeper/secrets/kafka_jaas.conf +++ /dev/null @@ -1,11 +0,0 @@ -Server { - org.apache.zookeeper.server.auth.DigestLoginModule required - user_super="admin-secret" - user_kafka="kafka-secret"; -}; - -Client { - org.apache.zookeeper.server.auth.DigestLoginModule required - username="kafka" - password="kafka-secret"; -}; diff --git a/deploy.sh b/deploy.sh index 3de68cfe..b2b1d904 100755 --- a/deploy.sh +++ b/deploy.sh @@ -178,8 +178,7 @@ cmd_infra() { fi # Start only infrastructure services (no app, no workers, no observability) - # zookeeper-certgen is needed for kafka to start - docker compose up -d zookeeper-certgen mongo redis zookeeper kafka $WAIT_FLAG $WAIT_TIMEOUT_FLAG + docker compose up -d mongo redis kafka $WAIT_FLAG $WAIT_TIMEOUT_FLAG print_success "Infrastructure services started" docker compose ps diff --git a/docker-compose.yaml b/docker-compose.yaml index 27590ac8..309eaeac 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -188,125 +188,37 @@ services: - GF_SECURITY_ADMIN_USER=${GRAFANA_ADMIN_USER:-admin} - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_ADMIN_PASSWORD:-admin123} - # Kafka Infrastructure for Event-Driven Design - # Certificate generator for Zookeeper/Kafka mTLS - zookeeper-certgen: - image: ghcr.io/hardmax71/integr8scode/zookeeper-certgen:${IMAGE_TAG:-latest} - build: - context: ./backend/zookeeper - dockerfile: Dockerfile.certgen - container_name: zookeeper-certgen - volumes: - - zookeeper_certs:/certs - networks: - - app-network - restart: "no" - - zookeeper: - image: confluentinc/cp-zookeeper:7.8.2 - container_name: zookeeper - depends_on: - zookeeper-certgen: - condition: service_completed_successfully - environment: - # TLS-only: no plaintext client port - ZOOKEEPER_SECURE_CLIENT_PORT: 2281 - ZOOKEEPER_TICK_TIME: 2000 - - # Production settings - ZOOKEEPER_MAX_CLIENT_CNXNS: 60 - ZOOKEEPER_INIT_LIMIT: 10 - ZOOKEEPER_SYNC_LIMIT: 5 - - # Security settings (SASL + mTLS) - ZOOKEEPER_AUTH_PROVIDER_1: org.apache.zookeeper.server.auth.DigestAuthenticationProvider - ZOOKEEPER_SERVER_CNXN_FACTORY: org.apache.zookeeper.server.NettyServerCnxnFactory - KAFKA_OPTS: '-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory -Dzookeeper.4lw.commands.whitelist=* -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_jaas.conf' - - # TLS settings (keystore/truststore produced by certgen) - ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/kafka/certs/zookeeper.keystore.jks - ZOOKEEPER_SSL_KEYSTORE_PASSWORD: zookeeper_keystore_password - ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/certs/zookeeper.truststore.jks - ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: zookeeper_truststore_password - ZOOKEEPER_SSL_CLIENT_AUTH: need - - # 4lw commands whitelist - ZOOKEEPER_4LW_COMMANDS_WHITELIST: '*' - - # Autopurge settings - ZOOKEEPER_AUTOPURGE_SNAP_RETAIN_COUNT: 3 - ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: 24 - - # JVM settings - KAFKA_HEAP_OPTS: '-Xms128m -Xmx256m' - KAFKA_JVM_PERFORMANCE_OPTS: '-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:+ParallelRefProcEnabled -Djute.maxbuffer=4194304' - - # Logging — suppress TLS handshake noise from internet scanners - ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: 'WARN' - ZOOKEEPER_LOG4J_LOGGERS: 'org.apache.zookeeper.server.auth=INFO,org.apache.zookeeper.audit=INFO,org.apache.zookeeper.server.NettyServerCnxnFactory=ERROR' - - # Disable admin server for security - ZOOKEEPER_ADMIN_ENABLE_SERVER: 'false' - ZOOKEEPER_ADMIN_SERVER_PORT: 0 - - volumes: - - ./backend/zookeeper/secrets:/etc/kafka/secrets:ro - - zookeeper_certs:/etc/kafka/certs:ro - - zookeeper_data:/var/lib/zookeeper/data - - zookeeper_log:/var/lib/zookeeper/log - - zookeeper_logs:/var/log/zookeeper - ports: - - "127.0.0.1:2281:2281" - networks: - - app-network - ulimits: - nofile: - soft: 65536 - hard: 65536 - mem_limit: 512m - restart: unless-stopped - healthcheck: - test: ["CMD-SHELL", "nc -z localhost 2281"] - interval: 3s - timeout: 5s - retries: 15 - start_period: 5s - + # Kafka (KRaft mode — no ZooKeeper) kafka: image: confluentinc/cp-kafka:7.8.2 container_name: kafka - depends_on: - zookeeper-certgen: - condition: service_completed_successfully - zookeeper: - condition: service_healthy ports: - "127.0.0.1:9092:9092" - "127.0.0.1:29092:29092" environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2281 - KAFKA_ZOOKEEPER_SSL_CLIENT_ENABLE: 'true' - KAFKA_ZOOKEEPER_CLIENT_CNXN_SOCKET: org.apache.zookeeper.ClientCnxnSocketNetty - KAFKA_ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/kafka/certs/kafka-client.keystore.jks - KAFKA_ZOOKEEPER_SSL_KEYSTORE_PASSWORD: kafka_keystore_password - KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/certs/kafka-client.truststore.jks - KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: kafka_truststore_password - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + # KRaft identity (combined broker + controller) + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: 'broker,controller' + CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + + # Listeners + KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + + # Single-node replication KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' # Needed for initial setup + + # Topic management + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_LOG_RETENTION_HOURS: 168 - # Security settings for Zookeeper connection (SASL/Digest over TLS) - KAFKA_OPTS: '-Dzookeeper.sasl.client=true -Dzookeeper.sasl.clientconfig=Client -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_jaas.conf' - KAFKA_ZOOKEEPER_SET_ACL: 'false' - # KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer' - # KAFKA_SUPER_USERS: 'User:admin;User:kafka' - # Production settings KAFKA_COMPRESSION_TYPE: 'gzip' KAFKA_NUM_NETWORK_THREADS: 8 @@ -314,19 +226,16 @@ services: KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400 KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600 - + # Log settings KAFKA_LOG_SEGMENT_BYTES: 1073741824 KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 300000 KAFKA_LOG_CLEANUP_POLICY: 'delete' - + # JVM settings KAFKA_HEAP_OPTS: '-Xms256m -Xmx1g' KAFKA_JVM_PERFORMANCE_OPTS: '-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true' - volumes: - - ./backend/zookeeper/secrets/kafka_jaas.conf:/etc/kafka/secrets/kafka_jaas.conf:ro - - zookeeper_certs:/etc/kafka/certs:ro - kafka_data:/var/lib/kafka/data - kafka_logs:/var/log/kafka networks: @@ -641,10 +550,6 @@ volumes: shared_ca: kafka_data: kafka_logs: - zookeeper_data: - zookeeper_log: - zookeeper_logs: - zookeeper_certs: # --8<-- [end:dev_volumes] networks: diff --git a/docs/architecture/overview.md b/docs/architecture/overview.md index 297a79e0..a29d2a4f 100644 --- a/docs/architecture/overview.md +++ b/docs/architecture/overview.md @@ -31,7 +31,7 @@ Integr8sCode lets users submit Python scripts through a Svelte SPA. The FastAPI ![System diagram](/assets/images/system_diagram.png) The SPA hits the frontend, which proxies to the API over HTTPS; the API -serves both REST and SSE. Kafka carries events as JSON (serialized by FastStream) with Zookeeper backing it; kafka- +serves both REST and SSE. Kafka carries events as JSON (serialized by FastStream) using KRaft for metadata consensus; kafka- init seeds topics. All workers are separate containers subscribed to Kafka; the k8s-worker talks to the Kubernetes API to run code, the pod-monitor watches pods, the result-processor writes results to Mongo and nudges Redis for SSE fanout, and the saga-orchestrator coordinates long flows with Mongo and Redis. diff --git a/docs/operations/cicd.md b/docs/operations/cicd.md index b22f10b9..cd715d7e 100644 --- a/docs/operations/cicd.md +++ b/docs/operations/cicd.md @@ -33,7 +33,7 @@ graph LR end subgraph "Docker Scan & Promote" - Scan["Trivy Scan (5 images)"] + Scan["Trivy Scan (4 images)"] Promote["Promote SHA → latest"] Scan --> Promote end @@ -92,7 +92,7 @@ no setup to overlap. ## Stack Tests (the main workflow) -This is the core testing workflow. It builds all 5 container images, pushes them to GHCR with immutable SHA-based +This is the core testing workflow. It builds all 4 container images, pushes them to GHCR with immutable SHA-based tags, then runs E2E tests on separate runners that pull images from the registry. ```mermaid @@ -131,29 +131,28 @@ the image build is skipped entirely. ### Phase 2: Build and push -All 5 images are built on a single runner and pushed to GHCR with an immutable `sha-<7chars>` tag: +All 4 images are built on a single runner and pushed to GHCR with an immutable `sha-<7chars>` tag: | Image | Source | |----------------------|---------------------------------------------| | `base` | `backend/Dockerfile.base` | | `backend` | `backend/Dockerfile` | | `cert-generator` | `cert-generator/Dockerfile` | -| `zookeeper-certgen` | `backend/zookeeper/Dockerfile.certgen` | | `frontend` | `frontend/Dockerfile` | Workers reuse the `backend` image with different `command:` overrides in docker-compose, so no separate worker images -are needed. All 5 images are scanned by Trivy and promoted to `latest` in the +are needed. All 4 images are scanned by Trivy and promoted to `latest` in the [Docker Scan & Promote](#docker-scan-promote) workflow. The base image is cached separately as a zstd-compressed tarball since its dependencies rarely change. The backend image depends on it via `--build-context base=docker-image://integr8scode-base:latest`. Utility and frontend images use GHA layer caching. -All 5 images are pushed to GHCR in parallel, with each push tracked by PID so individual failures are reported: +All 4 images are pushed to GHCR in parallel, with each push tracked by PID so individual failures are reported: ```yaml declare -A PIDS -for name in base backend cert-generator zookeeper-certgen ...; do +for name in base backend cert-generator frontend; do docker push "$IMG/$name:$TAG" & PIDS[$name]=$! done @@ -180,7 +179,7 @@ This action kicks off three slow tasks that can overlap: 1. **GHCR login** using `docker/login-action@v3` 2. **Background image pull + infra pre-warm** — pulls all compose images then starts infrastructure services - (mongo, redis, kafka, zookeeper) in a background `nohup` process. The exit status is persisted + (mongo, redis, kafka) in a background `nohup` process. The exit status is persisted to `/tmp/infra-pull.exit` so the next action can check for failures. 3. **k3s install** — downloads and installs a pinned k3s version with SHA256 checksum verification (see [supply-chain hardening](#supply-chain-hardening) below) @@ -238,7 +237,7 @@ sets it, and only after all tests pass. ```mermaid graph LR ST["Stack Tests
(main, success)"] -->|workflow_run trigger| Scan - Scan["Trivy Scan
(5 images in parallel)"] --> Promote["crane copy
sha-xxx → latest"] + Scan["Trivy Scan
(4 images in parallel)"] --> Promote["crane copy
sha-xxx → latest"] Promote --> Summary["Step Summary"] ``` @@ -249,7 +248,7 @@ Runs automatically when `Stack Tests` completes successfully on `main`. Can also ### Scan -Uses [Trivy](https://trivy.dev/) (pinned at `v0.68.2`) to scan all 5 deployed images in parallel via matrix strategy. +Uses [Trivy](https://trivy.dev/) (pinned at `v0.68.2`) to scan all 4 deployed images in parallel via matrix strategy. Scans for `CRITICAL` and `HIGH` severity vulnerabilities with unfixed issues ignored. Results are uploaded as SARIF files to GitHub's Security tab. @@ -285,7 +284,7 @@ Releases use [Calendar Versioning](https://calver.org/) with the format `YYYY.M. - `PATCH` — auto-incrementing counter within the month, starting at `0` Examples: `2026.2.0`, `2026.2.1`, `2026.3.0`. The workflow counts existing tags matching the current `YYYY.M.*` pattern -and increments the patch number. All 5 deployed GHCR images are tagged with the CalVer version using crane (same +and increments the patch number. All 4 deployed GHCR images are tagged with the CalVer version using crane (same registry-level manifest copy as the promote step). ### GitHub Release @@ -438,7 +437,7 @@ Playwright browsers are cached by `package-lock.json` hash. On cache hit, only s ### Parallel image push -All 5 images are pushed to GHCR concurrently using background processes with PID tracking. Each push failure is +All 4 images are pushed to GHCR concurrently using background processes with PID tracking. Each push failure is reported individually via `::error::` annotations. ## Running locally diff --git a/docs/operations/deployment.md b/docs/operations/deployment.md index cf1af52e..2d42a0a9 100644 --- a/docs/operations/deployment.md +++ b/docs/operations/deployment.md @@ -41,7 +41,7 @@ with health checks and dependency ordering, so containers start in the correct s ./deploy.sh dev ``` -This brings up MongoDB, Redis, Kafka with Zookeeper, all seven workers, the backend API, and the +This brings up MongoDB, Redis, Kafka (KRaft mode), all seven workers, the backend API, and the frontend. Two initialization containers run automatically: `kafka-init` creates required Kafka topics, and `user-seed` populates the database with default user accounts. @@ -129,7 +129,6 @@ services define healthchecks in `docker-compose.yaml`: | Redis | `redis-cli ping` | | Backend | `curl /api/v1/health/live` | | Kafka | `kafka-broker-api-versions` | -| Zookeeper | `nc -z localhost 2281` (TLS-only, no plaintext client port) | Services without explicit healthchecks (workers, Grafana, Kafdrop) are considered "started" when their container is running. The test suite doesn't require worker containers since tests instantiate worker classes directly. @@ -142,8 +141,7 @@ Every long-running service has a `mem_limit` in `docker-compose.yaml` to prevent |---------|-------------|--------------|-------| | MongoDB | 1024 m | wiredTiger 0.4 GB | `--wiredTigerCacheSizeGB 0.4` prevents default 50 %-of-RAM behavior | | Redis | 300 m | 256 MB maxmemory | LRU eviction, persistence disabled | -| Kafka | 1280 m | JVM `-Xms256m -Xmx1g` | Single-broker, low throughput workload | -| Zookeeper | 512 m | JVM `-Xms128m -Xmx256m` | Metadata-only role | +| Kafka | 1280 m | JVM `-Xms256m -Xmx1g` | Single-broker KRaft mode, low throughput workload | | Backend API | 768 m | 2 gunicorn workers | Controlled by `WEB_CONCURRENCY` env var | | Frontend | 128 m | nginx serving static assets | | | Each worker (×7) | 160 m | Single-process Python | coordinator, k8s-worker, pod-monitor, result-processor, saga-orchestrator, event-replay, dlq-processor | @@ -153,7 +151,7 @@ Every long-running service has a `mem_limit` in `docker-compose.yaml` to prevent | OTel Collector | 192 m | `limit_mib: 150` in memory_limiter processor | Observability profile | | Kafka Exporter | 96 m | | Observability profile | -All long-running services — core infrastructure (MongoDB, Redis, Kafka, Zookeeper, backend, frontend), all seven workers (coordinator, k8s-worker, pod-monitor, result-processor, saga-orchestrator, event-replay, dlq-processor), and observability components (Grafana, kafka-exporter, victoria-metrics, otel-collector) — have `restart: unless-stopped` so they recover automatically after an OOM kill or crash. +All long-running services — core infrastructure (MongoDB, Redis, Kafka, backend, frontend), all seven workers (coordinator, k8s-worker, pod-monitor, result-processor, saga-orchestrator, event-replay, dlq-processor), and observability components (Grafana, kafka-exporter, victoria-metrics, otel-collector) — have `restart: unless-stopped` so they recover automatically after an OOM kill or crash. ## Monitoring From cb84794444062cff3f8fc947594a69a8ff574271 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Wed, 18 Feb 2026 12:32:47 +0100 Subject: [PATCH 8/8] fix: saga repo revision update --- backend/app/db/repositories/saga_repository.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/app/db/repositories/saga_repository.py b/backend/app/db/repositories/saga_repository.py index b33951ec..c998288d 100644 --- a/backend/app/db/repositories/saga_repository.py +++ b/backend/app/db/repositories/saga_repository.py @@ -1,6 +1,7 @@ import dataclasses from datetime import datetime, timezone from typing import Any +from uuid import uuid4 from beanie.exceptions import RevisionIdWasChanged from beanie.odm.enums import SortDirection @@ -78,8 +79,8 @@ async def atomic_cancel_saga( ) -> Saga: """Atomically cancel a saga using findOneAndUpdate. - Bypasses Beanie's revision check to eliminate TOCTOU races with - concurrent step saves from the orchestrator. + Bumps revision_id so any concurrent save_changes() (which filters on + the old revision_id) will see a mismatch and raise RevisionIdWasChanged. """ doc = await SagaDocument.find_one( SagaDocument.saga_id == saga_id, @@ -90,6 +91,7 @@ async def atomic_cancel_saga( SagaDocument.error_message: error_message, SagaDocument.completed_at: completed_at, SagaDocument.updated_at: datetime.now(timezone.utc), + "revision_id": uuid4(), }), response_type=UpdateResponse.NEW_DOCUMENT, )