diff --git a/.github/workflows/harness-image.yml b/.github/workflows/harness-image.yml new file mode 100644 index 00000000..6ed06bca --- /dev/null +++ b/.github/workflows/harness-image.yml @@ -0,0 +1,61 @@ +name: Harness Worker Image + +on: + push: + branches: [main, certification-worker] + paths: + - "Dockerfile" + - "harness/**" + - "src/**" + - "pyproject.toml" + - "poetry.lock" + - ".github/workflows/harness-image.yml" + release: + types: [published] + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + build-and-push: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to GHCR + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Docker metadata + id: meta + uses: docker/metadata-action@v5 + with: + images: ghcr.io/conductor-oss/python-sdk/harness-worker + tags: | + type=raw,value=latest + type=raw,value=${{ github.event.release.tag_name }},enable=${{ github.event_name == 'release' }} + + - name: Build and push + uses: docker/build-push-action@v6 + with: + context: . + file: ./Dockerfile + target: harness + platforms: linux/amd64,linux/arm64 + push: true + tags: ${{ steps.meta.outputs.tags }} diff --git a/Dockerfile b/Dockerfile index ca535ea6..2d863b92 100644 --- a/Dockerfile +++ b/Dockerfile @@ -41,6 +41,19 @@ ENV CONDUCTOR_AUTH_SECRET ${CONDUCTOR_AUTH_SECRET} ENV CONDUCTOR_SERVER_URL ${CONDUCTOR_SERVER_URL} RUN python3 ./tests/integration/main.py +FROM python_test_base AS harness-build +COPY /harness /package/harness + +FROM python:3.12-alpine AS harness +RUN adduser -D -u 65532 nonroot +WORKDIR /app +COPY --from=harness-build /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages +COPY --from=harness-build /package/src /app/src +COPY --from=harness-build /package/harness /app/harness +ENV PYTHONPATH=/app/src +USER nonroot +ENTRYPOINT ["python", "-u", "harness/main.py"] + FROM python:3.12-alpine AS publish RUN apk add --no-cache tk curl WORKDIR /package diff --git a/harness/README.md b/harness/README.md new file mode 100644 index 00000000..9cab07da --- /dev/null +++ b/harness/README.md @@ -0,0 +1,81 @@ +# Python SDK Docker Harness + +Two Docker targets built from the root `Dockerfile`: an **SDK build** and a **long-running worker harness**. + +## Worker Harness + +A self-feeding worker that runs indefinitely. On startup it registers five simulated tasks (`python_worker_0` through `python_worker_4`) and the `python_simulated_tasks_workflow`, then runs two background services: + +- **WorkflowGovernor** -- starts a configurable number of `python_simulated_tasks_workflow` instances per second (default 2), indefinitely. +- **SimulatedTaskWorkers** -- five task handlers, each with a codename and a default sleep duration. Each worker supports configurable delay types, failure simulation, and output generation via task input parameters. The workflow chains them in sequence: quickpulse (1s) → whisperlink (2s) → shadowfetch (3s) → ironforge (4s) → deepcrawl (5s). + +### Building Locally + +```bash +docker build --target harness -t python-sdk-harness . +``` + +### Multiplatform Build and Push + +To build for both `linux/amd64` and `linux/arm64` and push to GHCR: + +```bash +# One-time: create a buildx builder if you don't have one +docker buildx create --name multiarch --use --bootstrap + +# Build and push +docker buildx build \ + --platform linux/amd64,linux/arm64 \ + --target harness \ + -t ghcr.io/conductor-oss/python-sdk/harness-worker:latest \ + --push . +``` + +> **Note:** Multi-platform builds require `docker buildx` and a builder that supports cross-compilation. On macOS this works out of the box with Docker Desktop. On Linux you may need to install QEMU user-space emulators: +> +> ```bash +> docker run --privileged --rm tonistiigi/binfmt --install all +> ``` + +### Running + +```bash +docker run -d \ + -e CONDUCTOR_SERVER_URL=https://your-cluster.example.com/api \ + -e CONDUCTOR_AUTH_KEY=$CONDUCTOR_AUTH_KEY \ + -e CONDUCTOR_AUTH_SECRET=$CONDUCTOR_AUTH_SECRET \ + -e HARNESS_WORKFLOWS_PER_SEC=4 \ + python-sdk-harness +``` + +You can also run the harness locally without Docker (from the repo root): + +```bash +# Install the SDK in development mode (one-time) +pip3 install -e . + +export CONDUCTOR_SERVER_URL=https://your-cluster.example.com/api +export CONDUCTOR_AUTH_KEY=$CONDUCTOR_AUTH_KEY +export CONDUCTOR_AUTH_SECRET=$CONDUCTOR_AUTH_SECRET + +python3 harness/main.py +``` + +Override defaults with environment variables as needed: + +```bash +HARNESS_WORKFLOWS_PER_SEC=4 HARNESS_BATCH_SIZE=10 python3 harness/main.py +``` + +All resource names use a `python_` prefix so multiple SDK harnesses (C#, JS, Go, Java, etc.) can coexist on the same cluster. + +### Environment Variables + +| Variable | Required | Default | Description | +|---|---|---|---| +| `CONDUCTOR_SERVER_URL` | yes | -- | Conductor API base URL | +| `CONDUCTOR_AUTH_KEY` | no | -- | Orkes auth key | +| `CONDUCTOR_AUTH_SECRET` | no | -- | Orkes auth secret | +| `HARNESS_WORKFLOWS_PER_SEC` | no | 2 | Workflows to start per second | +| `HARNESS_BATCH_SIZE` | no | 20 | Number of tasks each worker polls per batch | +| `HARNESS_POLL_INTERVAL_MS` | no | 100 | Milliseconds between poll cycles | diff --git a/harness/main.py b/harness/main.py new file mode 100644 index 00000000..123655ec --- /dev/null +++ b/harness/main.py @@ -0,0 +1,113 @@ +from __future__ import annotations + +import os +import signal +import sys + +from conductor.client.automator.task_handler import TaskHandler +from conductor.client.configuration.configuration import Configuration +from conductor.client.http.models.task_def import TaskDef +from conductor.client.orkes_clients import OrkesClients +from conductor.client.workflow.conductor_workflow import ConductorWorkflow +from conductor.client.workflow.task.simple_task import SimpleTask + +from simulated_task_worker import SimulatedTaskWorker +from workflow_governor import WorkflowGovernor + +WORKFLOW_NAME = "python_simulated_tasks_workflow" + +SIMULATED_WORKERS = [ + ("python_worker_0", "quickpulse", 1), + ("python_worker_1", "whisperlink", 2), + ("python_worker_2", "shadowfetch", 3), + ("python_worker_3", "ironforge", 4), + ("python_worker_4", "deepcrawl", 5), +] + + +def env_int_or_default(key: str, default: int) -> int: + s = os.environ.get(key, "") + if not s: + return default + try: + return int(s) + except ValueError: + return default + + +def register_metadata(clients: OrkesClients) -> None: + metadata_client = clients.get_metadata_client() + workflow_executor = clients.get_workflow_executor() + + for task_name, codename, sleep_seconds in SIMULATED_WORKERS: + task_def = TaskDef( + name=task_name, + description=f"Python SDK harness simulated task ({codename}, default delay {sleep_seconds}s)", + retry_count=1, + timeout_seconds=300, + response_timeout_seconds=300, + ) + metadata_client.register_task_def(task_def) + + wf = ConductorWorkflow( + executor=workflow_executor, + name=WORKFLOW_NAME, + version=1, + description="Python SDK harness simulated task workflow", + ) + wf.owner_email("python-sdk-harness@conductor.io") + + for task_name, codename, _ in SIMULATED_WORKERS: + wf.add(SimpleTask(task_def_name=task_name, task_reference_name=codename)) + + wf.register(overwrite=True) + print(f"Registered workflow {WORKFLOW_NAME} with {len(SIMULATED_WORKERS)} tasks") + + +def main() -> None: + configuration = Configuration() + clients = OrkesClients(configuration) + + register_metadata(clients) + + workflows_per_sec = env_int_or_default("HARNESS_WORKFLOWS_PER_SEC", 2) + batch_size = env_int_or_default("HARNESS_BATCH_SIZE", 20) + poll_interval_ms = env_int_or_default("HARNESS_POLL_INTERVAL_MS", 100) + + workers = [] + for task_name, codename, sleep_seconds in SIMULATED_WORKERS: + worker = SimulatedTaskWorker(task_name, codename, sleep_seconds, batch_size, poll_interval_ms) + workers.append(worker) + + task_handler = TaskHandler( + workers=workers, + configuration=configuration, + scan_for_annotated_workers=False, + ) + + workflow_executor = clients.get_workflow_executor() + governor = WorkflowGovernor(workflow_executor, WORKFLOW_NAME, workflows_per_sec) + governor.start() + + main_pid = os.getpid() + shutting_down = False + + def shutdown(signum, frame): + nonlocal shutting_down + if os.getpid() != main_pid or shutting_down: + return + shutting_down = True + print("Shutting down...") + governor.stop() + task_handler.stop_processes() + sys.exit(0) + + signal.signal(signal.SIGINT, shutdown) + signal.signal(signal.SIGTERM, shutdown) + + task_handler.start_processes() + task_handler.join_processes() + + +if __name__ == "__main__": + main() diff --git a/harness/manifests/README.md b/harness/manifests/README.md new file mode 100644 index 00000000..4e0780af --- /dev/null +++ b/harness/manifests/README.md @@ -0,0 +1,132 @@ +# Kubernetes Manifests + +This directory contains Kubernetes manifests for deploying the Python SDK harness worker to the certification clusters. + +## Prerequisites + +**Set your namespace environment variable:** +```bash +export NS=your-namespace-here +``` + +All kubectl commands below use `-n $NS` to specify the namespace. The manifests intentionally do not include hardcoded namespaces. + +**Note:** The harness worker images are published as public packages on GHCR and do not require authentication to pull. No image pull secrets are needed. + +## Files + +| File | Description | +|---|---| +| `deployment.yaml` | Deployment (single file, works on all clusters) | +| `configmap-aws.yaml` | Conductor URL + auth key for certification-aws | +| `configmap-azure.yaml` | Conductor URL + auth key for certification-az | +| `configmap-gcp.yaml` | Conductor URL + auth key for certification-gcp | +| `secret-conductor.yaml` | Conductor auth secret (placeholder template) | + +## Quick Start + +### 1. Create the Conductor Auth Secret + +The `CONDUCTOR_AUTH_SECRET` must be created as a Kubernetes secret before deploying. + +```bash +kubectl create secret generic conductor-credentials \ + --from-literal=auth-secret=YOUR_AUTH_SECRET \ + -n $NS +``` + +If the `conductor-credentials` secret already exists in the namespace (e.g. from the e2e-testrunner-worker), it can be reused as-is. + +See `secret-conductor.yaml` for more details. + +### 2. Apply the ConfigMap for Your Cluster + +```bash +# AWS +kubectl apply -f manifests/configmap-aws.yaml -n $NS + +# Azure +kubectl apply -f manifests/configmap-azure.yaml -n $NS + +# GCP +kubectl apply -f manifests/configmap-gcp.yaml -n $NS +``` + +### 3. Deploy + +```bash +kubectl apply -f manifests/deployment.yaml -n $NS +``` + +### 4. Verify + +```bash +# Check pod status +kubectl get pods -n $NS -l app=python-sdk-harness-worker + +# Watch logs +kubectl logs -n $NS -l app=python-sdk-harness-worker -f +``` + +## Building and Pushing the Image + +From the repository root: + +```bash +# Build the harness target and push to GHCR +docker buildx build \ + --platform linux/amd64,linux/arm64 \ + --target harness \ + -t ghcr.io/conductor-oss/python-sdk/harness-worker:latest \ + --push . +``` + +After pushing a new image with the same tag, restart the deployment to pull it: + +```bash +kubectl rollout restart deployment/python-sdk-harness-worker -n $NS +kubectl rollout status deployment/python-sdk-harness-worker -n $NS +``` + +## Tuning + +The harness worker accepts these optional environment variables (set in `deployment.yaml`): + +| Variable | Default | Description | +|---|---|---| +| `HARNESS_WORKFLOWS_PER_SEC` | 2 | Workflows to start per second | +| `HARNESS_BATCH_SIZE` | 20 | Tasks each worker polls per batch | +| `HARNESS_POLL_INTERVAL_MS` | 100 | Milliseconds between poll cycles | + +Edit `deployment.yaml` to change these, then re-apply: + +```bash +kubectl apply -f manifests/deployment.yaml -n $NS +``` + +## Troubleshooting + +### Pod not starting + +```bash +kubectl describe pod -n $NS -l app=python-sdk-harness-worker +kubectl logs -n $NS -l app=python-sdk-harness-worker --tail=100 +``` + +### Secret not found + +```bash +kubectl get secret conductor-credentials -n $NS +``` + +## Resource Limits + +Default resource allocation: +- **Memory**: 256Mi (request) / 512Mi (limit) +- **CPU**: 100m (request) / 500m (limit) + +Adjust in `deployment.yaml` based on workload. Higher `HARNESS_WORKFLOWS_PER_SEC` values may need more CPU/memory. + +## Service + +The harness worker does **not** need a Service or Ingress. It connects to Conductor via outbound HTTP polling. All communication is outbound. diff --git a/harness/manifests/configmap-aws.yaml b/harness/manifests/configmap-aws.yaml new file mode 100644 index 00000000..1af1edf7 --- /dev/null +++ b/harness/manifests/configmap-aws.yaml @@ -0,0 +1,13 @@ +--- +# ConfigMap for certification-aws cluster +# Contains Conductor connection details specific to this cluster +apiVersion: v1 +kind: ConfigMap +metadata: + name: python-sdk-harness-config + # namespace: xxxxx # supply this in kubectl command + labels: + app: python-sdk-harness-worker +data: + CONDUCTOR_SERVER_URL: "https://certification-aws.orkesconductor.io/api" + CONDUCTOR_AUTH_KEY: "7ba9d0ec-247b-11f1-8d42-ea3efeda41b2" diff --git a/harness/manifests/configmap-azure.yaml b/harness/manifests/configmap-azure.yaml new file mode 100644 index 00000000..2cb355bf --- /dev/null +++ b/harness/manifests/configmap-azure.yaml @@ -0,0 +1,13 @@ +--- +# ConfigMap for certification-az cluster +# Contains Conductor connection details specific to this cluster +apiVersion: v1 +kind: ConfigMap +metadata: + name: python-sdk-harness-config + # namespace: xxxxx # supply this in kubectl command + labels: + app: python-sdk-harness-worker +data: + CONDUCTOR_SERVER_URL: "https://certification-az.orkesconductor.io/api" + CONDUCTOR_AUTH_KEY: "bf170d61-2797-11f1-833e-4ae04d100a03" diff --git a/harness/manifests/configmap-gcp.yaml b/harness/manifests/configmap-gcp.yaml new file mode 100644 index 00000000..949b38b3 --- /dev/null +++ b/harness/manifests/configmap-gcp.yaml @@ -0,0 +1,13 @@ +--- +# ConfigMap for certification-gcp cluster +# Contains Conductor connection details specific to this cluster +apiVersion: v1 +kind: ConfigMap +metadata: + name: python-sdk-harness-config + # namespace: xxxxx # supply this in kubectl command + labels: + app: python-sdk-harness-worker +data: + CONDUCTOR_SERVER_URL: "https://certification-gcp.orkesconductor.com/api" + CONDUCTOR_AUTH_KEY: "e6c1ac61-286b-11f1-be01-c682b5750c3a" diff --git a/harness/manifests/deployment.yaml b/harness/manifests/deployment.yaml new file mode 100644 index 00000000..39133906 --- /dev/null +++ b/harness/manifests/deployment.yaml @@ -0,0 +1,84 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: python-sdk-harness-worker + # namespace: xxxxx # supply this in kubectl command + labels: + app: python-sdk-harness-worker +spec: + replicas: 2 + selector: + matchLabels: + app: python-sdk-harness-worker + template: + metadata: + labels: + app: python-sdk-harness-worker + spec: + # note: imagePullSecrets is not needed for public images + containers: + - name: harness + image: ghcr.io/conductor-oss/python-sdk/harness-worker:latest + imagePullPolicy: Always + env: + # === CONDUCTOR CONNECTION (from per-cloud ConfigMap) === + + - name: CONDUCTOR_SERVER_URL + valueFrom: + configMapKeyRef: + name: python-sdk-harness-config + key: CONDUCTOR_SERVER_URL + + - name: CONDUCTOR_AUTH_KEY + valueFrom: + configMapKeyRef: + name: python-sdk-harness-config + key: CONDUCTOR_AUTH_KEY + + - name: CONDUCTOR_AUTH_SECRET + valueFrom: + secretKeyRef: + name: conductor-credentials + key: auth-secret + + # === HARNESS TUNING (defaults match main.py) === + - name: HARNESS_WORKFLOWS_PER_SEC + value: "1" + + - name: HARNESS_BATCH_SIZE + value: "20" + + - name: HARNESS_POLL_INTERVAL_MS + value: "100" + + resources: + requests: + memory: "256Mi" + cpu: "100m" + limits: + memory: "512Mi" + cpu: "500m" + + livenessProbe: + exec: + command: + - /bin/sh + - -c + - test -e /proc/1/cmdline + initialDelaySeconds: 30 + periodSeconds: 30 + timeoutSeconds: 5 + failureThreshold: 3 + + readinessProbe: + exec: + command: + - /bin/sh + - -c + - test -e /proc/1/cmdline + initialDelaySeconds: 10 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 + + restartPolicy: Always diff --git a/harness/manifests/secret-conductor.yaml b/harness/manifests/secret-conductor.yaml new file mode 100644 index 00000000..0a6420e0 --- /dev/null +++ b/harness/manifests/secret-conductor.yaml @@ -0,0 +1,35 @@ +# Conductor API Secret +# This secret contains the Conductor AUTH SECRET for authentication. +# The auth key (key ID) is NOT secret and lives in the per-cloud ConfigMap. +# Create this secret before deploying the worker. + +apiVersion: v1 +kind: Secret +metadata: + name: conductor-credentials + # namespace: xxxxx # supply this in kubectl command + labels: + app: python-sdk-harness-worker +type: Opaque +stringData: + # TODO: Replace with your actual Conductor AUTH SECRET (not the key ID) + auth-secret: "YOUR_CONDUCTOR_AUTH_SECRET_HERE" + +--- +# Instructions for creating this secret: +# +# Option 1 - kubectl imperative command (recommended): +# kubectl create secret generic conductor-credentials \ +# --from-literal=auth-secret=YOUR_AUTH_SECRET \ +# -n $NS +# +# Option 2 - edit this file and apply: +# 1. Replace YOUR_CONDUCTOR_AUTH_SECRET_HERE with the real secret value +# 2. kubectl apply -f manifests/secret-conductor.yaml -n $NS +# +# Note: The auth key (key ID) is not secret and is stored in the per-cloud +# ConfigMap (configmap-aws.yaml, configmap-azure.yaml, configmap-gcp.yaml). +# +# Note: If the e2e-testrunner-worker already runs in the same namespace, the +# conductor-credentials secret may already exist and can be reused as-is +# (same credential, same secret name). diff --git a/harness/simulated_task_worker.py b/harness/simulated_task_worker.py new file mode 100644 index 00000000..57c5bf1e --- /dev/null +++ b/harness/simulated_task_worker.py @@ -0,0 +1,273 @@ +from __future__ import annotations + +import math +import os +import random +import string +import time +from typing import Any, Dict, Optional, Tuple + +from conductor.client.http.models.task import Task +from conductor.client.http.models.task_result import TaskResult +from conductor.client.http.models.task_result_status import TaskResultStatus +from conductor.client.worker.worker_interface import WorkerInterface + +ALPHANUMERIC_CHARS = string.ascii_letters + string.digits + +_instance_id: str = os.environ.get("HOSTNAME") or os.urandom(4).hex() + + +class SimulatedTaskWorker(WorkerInterface): + + def __init__( + self, + task_name: str, + codename: str, + sleep_seconds: int, + batch_size: int = 5, + poll_interval_ms: int = 1000, + ) -> None: + super().__init__(task_definition_name=task_name) + self._task_name = task_name + self._codename = codename + self._default_delay_ms = sleep_seconds * 1000 + self.thread_count = batch_size + self.poll_interval = poll_interval_ms + self._worker_id = f"{task_name}-{_instance_id}" + self._rng = random.Random() + + print( + f"[{self._task_name}] Initialized worker " + f"[workerId={self._worker_id}, codename={self._codename}, " + f"batchSize={batch_size}, pollInterval={poll_interval_ms}ms]" + ) + + def get_identity(self) -> str: + return self._worker_id + + def execute(self, task: Task) -> TaskResult: + inp: Dict[str, Any] = task.input_data or {} + task_id = task.task_id or "" + task_index = _get_int(inp, "taskIndex", -1) + + print( + f"[{self._task_name}] Starting simulated task " + f"[id={task_id}, index={task_index}, codename={self._codename}]" + ) + + start_time = time.monotonic() + + delay_type = _get_str(inp, "delayType", "fixed") + min_delay = _get_int(inp, "minDelay", self._default_delay_ms) + max_delay = _get_int(inp, "maxDelay", min_delay + 100) + mean_delay = _get_int(inp, "meanDelay", (min_delay + max_delay) // 2) + std_deviation = _get_int(inp, "stdDeviation", 30) + success_rate = _get_float(inp, "successRate", 1.0) + failure_mode = _get_str(inp, "failureMode", "random") + output_size = _get_int(inp, "outputSize", 1024) + + delay_ms = 0 + if delay_type.lower() != "wait": + delay_ms = self._calculate_delay(delay_type, min_delay, max_delay, mean_delay, std_deviation) + + print( + f"[{self._task_name}] Simulated task " + f"[id={task_id}, index={task_index}] sleeping for {delay_ms} ms" + ) + time.sleep(delay_ms / 1000.0) + + if not self._should_task_succeed(success_rate, failure_mode, inp): + print( + f"[{self._task_name}] Simulated task " + f"[id={task_id}, index={task_index}] failed as configured" + ) + result = self.get_task_result_from_task(task) + result.status = TaskResultStatus.FAILED + result.reason_for_incompletion = "Simulated task failure based on configuration" + return result + + elapsed_ms = int((time.monotonic() - start_time) * 1000) + output = self._generate_output(inp, task_id, task_index, delay_ms, elapsed_ms, output_size) + + result = self.get_task_result_from_task(task) + result.status = TaskResultStatus.COMPLETED + result.output_data = output + return result + + # ── Delay calculation ───────────────────────────────────────── + + def _calculate_delay( + self, delay_type: str, min_delay: int, max_delay: int, mean_delay: int, std_deviation: int, + ) -> int: + dt = delay_type.lower() + if dt == "fixed": + return min_delay + if dt == "random": + spread = max(1, max_delay - min_delay + 1) + return min_delay + self._rng.randint(0, spread - 1) + if dt == "normal": + gaussian = self._next_gaussian() + delay = round(mean_delay + gaussian * std_deviation) + return max(1, delay) + if dt == "exponential": + exp = -mean_delay * math.log(1 - self._rng.random()) + return max(min_delay, min(max_delay, int(exp))) + return min_delay + + def _next_gaussian(self) -> float: + u1 = 1.0 - self._rng.random() + u2 = self._rng.random() + return math.sqrt(-2.0 * math.log(u1)) * math.sin(2.0 * math.pi * u2) + + # ── Failure simulation ──────────────────────────────────────── + + def _should_task_succeed(self, success_rate: float, failure_mode: str, inp: Dict[str, Any]) -> bool: + force_success = inp.get("forceSuccess") + if force_success is not None: + b, ok = _to_bool(force_success) + if ok: + return b + force_fail = inp.get("forceFail") + if force_fail is not None: + b, ok = _to_bool(force_fail) + if ok: + return not b + + fm = failure_mode.lower() + if fm == "random": + return self._rng.random() < success_rate + if fm == "conditional": + return self._should_conditional_succeed(success_rate, inp) + if fm == "sequential": + attempt = _get_int(inp, "attempt", 1) + fail_until_attempt = _get_int(inp, "failUntilAttempt", 2) + return attempt >= fail_until_attempt + return self._rng.random() < success_rate + + def _should_conditional_succeed(self, success_rate: float, inp: Dict[str, Any]) -> bool: + task_index = _get_int(inp, "taskIndex", -1) + if task_index >= 0: + fail_indexes = inp.get("failIndexes") + if isinstance(fail_indexes, list): + for idx in fail_indexes: + if _to_int(idx) == task_index: + return False + fail_every = _get_int(inp, "failEvery", 0) + if fail_every > 0 and task_index % fail_every == 0: + return False + return self._rng.random() < success_rate + + # ── Output generation ───────────────────────────────────────── + + def _generate_output( + self, + inp: Dict[str, Any], + task_id: str, + task_index: int, + delay_ms: int, + elapsed_time_ms: int, + output_size: int, + ) -> Dict[str, Any]: + output: Dict[str, Any] = { + "taskId": task_id, + "taskIndex": task_index, + "codename": self._codename, + "status": "completed", + "configuredDelayMs": delay_ms, + "actualExecutionTimeMs": elapsed_time_ms, + "a_or_b": "a" if self._rng.randint(0, 99) > 20 else "b", + "c_or_d": "c" if self._rng.randint(0, 99) > 33 else "d", + } + + if _get_bool(inp, "includeInput", False): + output["input"] = inp + + prev = inp.get("previousTaskOutput") + if prev is not None: + output["previousTaskData"] = prev + + if output_size > 0: + output["data"] = _generate_random_data(self._rng, output_size) + + output_template = inp.get("outputTemplate") + if isinstance(output_template, dict): + output.update(output_template) + + return output + + +# ── Helpers ─────────────────────────────────────────────────────── + +def _to_int(v: Any) -> int: + if isinstance(v, int): + return v + if isinstance(v, float): + return int(v) + if isinstance(v, str): + try: + return int(v) + except ValueError: + return 0 + return 0 + + +def _to_float(v: Any) -> float: + if isinstance(v, (int, float)): + return float(v) + if isinstance(v, str): + try: + return float(v) + except ValueError: + return 0.0 + return 0.0 + + +def _to_bool(v: Any) -> Tuple[bool, bool]: + if isinstance(v, bool): + return v, True + if isinstance(v, str): + low = v.lower() + if low in ("true", "1"): + return True, True + if low in ("false", "0"): + return False, True + if isinstance(v, (int, float)): + return v != 0, True + return False, False + + +def _get_int(inp: Dict[str, Any], key: str, default: int) -> int: + v = inp.get(key) + if v is None: + return default + return _to_int(v) + + +def _get_float(inp: Dict[str, Any], key: str, default: float) -> float: + v = inp.get(key) + if v is None: + return default + return _to_float(v) + + +def _get_str(inp: Dict[str, Any], key: str, default: str) -> str: + v = inp.get(key) + if v is None: + return default + if isinstance(v, str): + return v + return default + + +def _get_bool(inp: Dict[str, Any], key: str, default: bool) -> bool: + v = inp.get(key) + if v is None: + return default + b, ok = _to_bool(v) + return b if ok else default + + +def _generate_random_data(rng: random.Random, size: int) -> str: + if size <= 0: + return "" + return "".join(rng.choices(ALPHANUMERIC_CHARS, k=size)) diff --git a/harness/workflow_governor.py b/harness/workflow_governor.py new file mode 100644 index 00000000..38e92f5c --- /dev/null +++ b/harness/workflow_governor.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import threading +import time + +from conductor.client.http.models import StartWorkflowRequest +from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor + + +class WorkflowGovernor: + + def __init__( + self, + workflow_executor: WorkflowExecutor, + workflow_name: str, + workflows_per_second: int, + ) -> None: + self._workflow_executor = workflow_executor + self._workflow_name = workflow_name + self._workflows_per_second = workflows_per_second + self._stop_event = threading.Event() + self._thread: threading.Thread | None = None + + def start(self) -> None: + print( + f"WorkflowGovernor started: workflow={self._workflow_name}, " + f"rate={self._workflows_per_second}/sec" + ) + self._stop_event.clear() + self._thread = threading.Thread(target=self._run, name="WorkflowGovernor", daemon=True) + self._thread.start() + + def stop(self) -> None: + self._stop_event.set() + if self._thread is not None: + self._thread.join(timeout=5.0) + print("WorkflowGovernor stopped") + + def _run(self) -> None: + while not self._stop_event.is_set(): + self._start_batch() + self._stop_event.wait(timeout=1.0) + + def _start_batch(self) -> None: + try: + for _ in range(self._workflows_per_second): + request = StartWorkflowRequest(name=self._workflow_name, version=1) + self._workflow_executor.start_workflow(request) + print(f"Governor: started {self._workflows_per_second} workflow(s)") + except Exception as e: + print(f"Governor: error starting workflows: {e}") diff --git a/src/conductor/client/automator/task_handler.py b/src/conductor/client/automator/task_handler.py index ff43ecbc..9d5dfa04 100644 --- a/src/conductor/client/automator/task_handler.py +++ b/src/conductor/client/automator/task_handler.py @@ -4,6 +4,7 @@ import inspect import logging import os +import signal import threading import time from multiprocessing import Process, freeze_support, Queue, set_start_method @@ -52,6 +53,7 @@ def _run_sync_worker_process( event_listeners: Optional[List[Any]], ) -> None: """Process target: construct TaskRunner after fork/spawn and run forever.""" + signal.signal(signal.SIGINT, signal.SIG_IGN) task_runner = TaskRunner(worker, configuration, metrics_settings, event_listeners) task_runner.run() @@ -63,6 +65,7 @@ def _run_async_worker_process( event_listeners: Optional[List[Any]], ) -> None: """Process target: construct AsyncTaskRunner after fork/spawn and run forever.""" + signal.signal(signal.SIGINT, signal.SIG_IGN) async_task_runner = AsyncTaskRunner(worker, configuration, metrics_settings, event_listeners) asyncio.run(async_task_runner.run()) @@ -607,6 +610,7 @@ def _setup_logging_queue(configuration: Configuration): # This process performs the centralized logging def __logger_process(queue, log_level, logger_format=None): + signal.signal(signal.SIGINT, signal.SIG_IGN) c_logger = logging.getLogger( Configuration.get_logging_formatted_name( __name__