From a8a13f508eb7849a00faebcfdfcf0042f24f7b49 Mon Sep 17 00:00:00 2001 From: Alexander Piskun Date: Thu, 21 May 2026 06:55:06 +0000 Subject: [PATCH 1/2] feat(tracking): PostHog dual-send + canonical lifecycle events Add posthog-python as a parallel telemetry provider alongside Mixpanel. Every PostHog event carries the canonical CLI properties from the analytics PRD: environment/surface/source/trigger_source = "cli", cli_version, tracing_id. - `comfy run` now emits execution_start/success/error. The Mixpanel pipe keeps the legacy "run" event via a mixpanel_name="run" alias so the ~219K/wk stream stays continuous through the parity window. - `comfy generate ` emits a separate generate:start/success/ error/submitted lifecycle, plus generate:list/schema/refresh/upload/ resume sub-action events. Partner-node CLI calls use the generate:* namespace because they aren't ComfyUI workflow executions; completion of async submissions is observed server-side via the partner-node proxy's partner_node:api_call_* events. Wired behind a TelemetryProvider Protocol so the eventual Mixpanel cutover is a one-line edit. atexit hook flushes posthog-python's async batches; `comfy tracking enable|disable` opt-out gates both pipes. POSTHOG_TOKEN points at the production project via https://t.comfy.org; $POSTHOG_API_KEY env var overrides for local testing. Signed-off-by: Alexander Piskun --- comfy_cli/cmdline.py | 81 +++-- comfy_cli/command/generate/app.py | 335 +++++++++++------- comfy_cli/tracking.py | 134 ++++++- pyproject.toml | 1 + .../command/generate/test_app_lifecycle.py | 332 +++++++++++++++++ tests/comfy_cli/command/test_run_json.py | 5 +- .../comfy_cli/test_run_execution_lifecycle.py | 147 ++++++++ tests/comfy_cli/test_tracking.py | 96 +++-- tests/comfy_cli/test_tracking_providers.py | 250 +++++++++++++ 9 files changed, 1174 insertions(+), 207 deletions(-) create mode 100644 tests/comfy_cli/command/generate/test_app_lifecycle.py create mode 100644 tests/comfy_cli/test_run_execution_lifecycle.py create mode 100644 tests/comfy_cli/test_tracking_providers.py diff --git a/comfy_cli/cmdline.py b/comfy_cli/cmdline.py index 659440d9..e59c0450 100644 --- a/comfy_cli/cmdline.py +++ b/comfy_cli/cmdline.py @@ -430,7 +430,6 @@ def update( "UI workflows are converted to API format client-side via /object_info." ) ) -@tracking.track_command() def run( workflow: Annotated[ str, @@ -514,40 +513,62 @@ def run( ), ] = False, ): - if api_key: - api_key = api_key.strip() or None + # Snapshot kwargs before the body mutates api_key/host/port — analytics should record what user actually supplied. + _track_props = tracking.filter_command_kwargs(dict(locals())) + tracking.track_event("execution_start", _track_props, mixpanel_name="run") + + try: + if api_key: + api_key = api_key.strip() or None - config = ConfigManager() + config = ConfigManager() - if host: - s = host.split(":") - host = s[0] - if not port and len(s) == 2: - port = int(s[1]) + if host: + s = host.split(":") + host = s[0] + if not port and len(s) == 2: + port = int(s[1]) + + if config.background: + bg_host, bg_port = config.background[0], config.background[1] + if not host: + host = bg_host + if not port: + port = bg_port - if config.background: - bg_host, bg_port = config.background[0], config.background[1] if not host: - host = bg_host + host = "127.0.0.1" if not port: - port = bg_port - - if not host: - host = "127.0.0.1" - if not port: - port = 8188 - - run_inner.execute( - workflow, - host, - port, - wait, - verbose, - timeout, - api_key=api_key, - json_mode=json_output, - print_prompt=print_prompt, - ) + port = 8188 + + run_inner.execute( + workflow, + host, + port, + wait, + verbose, + timeout, + api_key=api_key, + json_mode=json_output, + print_prompt=print_prompt, + ) + except typer.Exit as e: + if (e.exit_code or 0) == 0: + tracking.track_event("execution_success", _track_props) + else: + tracking.track_event( + "execution_error", + {**_track_props, "error_type": type(e).__name__, "exit_code": e.exit_code}, + ) + raise + except Exception as e: + tracking.track_event( + "execution_error", + {**_track_props, "error_type": type(e).__name__}, + ) + raise + else: + tracking.track_event("execution_success", _track_props) def validate_comfyui(_env_checker): diff --git a/comfy_cli/command/generate/app.py b/comfy_cli/command/generate/app.py index a417e480..35c66f99 100644 --- a/comfy_cli/command/generate/app.py +++ b/comfy_cli/command/generate/app.py @@ -43,7 +43,6 @@ def register_with(parent: typer.Typer) -> None: subcommand name and error.""" @parent.command(name="generate", help=_HELP, context_settings=_CONTEXT_SETTINGS) - @tracking.track_command() def _generate_entry( ctx: typer.Context, target: Annotated[ @@ -57,17 +56,29 @@ def _generate_entry( if target is None or target in {"-h", "--help"}: _print_top_help() raise typer.Exit(code=0) + extra = list(ctx.args) if target == "list": - return _list_models(list(ctx.args)) + tracking.track_event("generate:list") + return _list_models(extra) if target == "schema": - return _schema(list(ctx.args)) + model_arg = extra[0] if extra and not extra[0].startswith("-") else None + tracking.track_event("generate:schema", {"model": model_arg}) + return _schema(extra) if target == "refresh": + tracking.track_event("generate:refresh") return _refresh() if target == "upload": - return _upload(list(ctx.args)) + tracking.track_event("generate:upload") + return _upload(extra) if target == "resume": - return _resume(list(ctx.args)) - _generate(target, list(ctx.args)) + resume_model = extra[0] if extra and not extra[0].startswith("-") else None + resume_job_id = extra[1] if len(extra) >= 2 and not extra[1].startswith("-") else None + tracking.track_event( + "generate:resume", + {"model": resume_model, "job_id": resume_job_id}, + ) + return _resume(extra) + _generate(target, extra) def _separate_meta_flags(extra_args: list[str]) -> tuple[list[str], dict[str, str | bool]]: @@ -146,130 +157,214 @@ def _emit_result(result: poll.PollResult, *, request_id: str, download: str | No def _generate(model: str, extra_args: list[str]) -> None: - try: - ep = spec.get_endpoint(model) - except spec.SpecError as e: - rprint(f"[bold red]{e}[/bold red]") - raise typer.Exit(code=1) - - if any(a in {"--help", "-h"} for a in extra_args): - _show_schema_help(ep) - raise typer.Exit(code=0) - - try: - remaining, meta = _separate_meta_flags(extra_args) - except schema.SchemaError as e: - rprint(f"[bold red]{e}[/bold red]") - raise typer.Exit(code=1) - - flags = schema.flags_for(ep) - try: - values = schema.parse_args(flags, remaining) - except schema.SchemaError as e: - rprint(f"[bold red]{e}[/bold red]") - name = spec.preferred_alias(ep.id) or ep.id - rprint(f"[dim]Run `comfy generate schema {name}` for the full parameter list.[/dim]") - raise typer.Exit(code=1) - - try: - api_key = client.resolve_api_key(meta.get("api-key") if isinstance(meta.get("api-key"), str) else None) - except client.ApiError as e: - rprint(f"[bold red]{e}[/bold red]") - raise typer.Exit(code=1) - - timeout_raw = meta.get("timeout", "300") - try: - timeout = float(timeout_raw) if isinstance(timeout_raw, str) else 300.0 - except ValueError: - rprint(f"[bold red]--timeout: expected number, got {timeout_raw!r}[/bold red]") - raise typer.Exit(code=1) - - do_async = bool(meta.get("async", False)) - download = meta.get("download") if isinstance(meta.get("download"), str) else None - as_json = bool(meta.get("json", False)) - - try: - _apply_upload_transforms(values, flags, ep, api_key) - except (client.ApiError, httpx.HTTPError) as e: - rprint(f"[bold red]Upload failed: {e}[/bold red]") - raise typer.Exit(code=1) + # --help short-circuits before tracking — it's a help-display action, not an execution attempt. + # If the model is unknown, fall through so the tracking path records the schema error. + asks_help = any(a in {"--help", "-h"} for a in extra_args) + if asks_help: + try: + help_ep = spec.get_endpoint(model) + except spec.SpecError: + help_ep = None + if help_ep is not None: + _show_schema_help(help_ep) + raise typer.Exit(code=0) - request_id = str(uuid.uuid4())[:8] - try: - resp = client.send_request(ep, values, flags, api_key, timeout=timeout) - except httpx.HTTPError as e: - rprint(f"[bold red]Network error contacting {spec.base_url()}: {e}[/bold red]") - raise typer.Exit(code=1) from e + # generate:start fires at entry so every invocation has a paired start/end lifecycle. + # Props are filled in progressively as model_alias / partner / async / has_download become known. + gen_props: dict[str, object | None] = { + "model": model, + "model_alias": None, + "async": None, + "has_download": None, + "partner": None, + } + tracking.track_event("generate:start", gen_props) + + def _track_error(error_kind: str, exc: BaseException) -> None: + tracking.track_event( + "generate:error", + {**gen_props, "error_type": type(exc).__name__, "error_kind": error_kind}, + ) try: - client.raise_for_status(resp) - except client.ApiError as e: - rprint(f"[bold red]API error {e.status}[/bold red]\n{e.body}") - raise typer.Exit(code=1) from e - - if resp.headers.get("content-type", "").startswith("image/"): - if download: - saved = output.save_binary_response(resp, download, request_id) - output.print_saved([saved]) - else: - rprint("[yellow]Binary image response; nothing saved. Pass --download to write it to disk.[/yellow]") - return + try: + ep = spec.get_endpoint(model) + except spec.SpecError as e: + rprint(f"[bold red]{e}[/bold red]") + _track_error("schema", e) + raise typer.Exit(code=1) + + gen_props["model_alias"] = spec.preferred_alias(ep.id) + gen_props["partner"] = getattr(ep, "partner", None) + + try: + remaining, meta = _separate_meta_flags(extra_args) + except schema.SchemaError as e: + rprint(f"[bold red]{e}[/bold red]") + _track_error("schema", e) + raise typer.Exit(code=1) + + do_async = bool(meta.get("async", False)) + download = meta.get("download") if isinstance(meta.get("download"), str) else None + as_json = bool(meta.get("json", False)) + gen_props["async"] = do_async + gen_props["has_download"] = bool(download) + + flags = schema.flags_for(ep) + try: + values = schema.parse_args(flags, remaining) + except schema.SchemaError as e: + rprint(f"[bold red]{e}[/bold red]") + name = gen_props["model_alias"] or ep.id + rprint(f"[dim]Run `comfy generate schema {name}` for the full parameter list.[/dim]") + _track_error("schema", e) + raise typer.Exit(code=1) + + try: + api_key = client.resolve_api_key(meta.get("api-key") if isinstance(meta.get("api-key"), str) else None) + except client.ApiError as e: + rprint(f"[bold red]{e}[/bold red]") + _track_error("api", e) + raise typer.Exit(code=1) + + timeout_raw = meta.get("timeout", "300") + try: + timeout = float(timeout_raw) if isinstance(timeout_raw, str) else 300.0 + except ValueError as e: + rprint(f"[bold red]--timeout: expected number, got {timeout_raw!r}[/bold red]") + _track_error("schema", e) + raise typer.Exit(code=1) + + try: + _apply_upload_transforms(values, flags, ep, api_key) + except (client.ApiError, httpx.HTTPError) as e: + rprint(f"[bold red]Upload failed: {e}[/bold red]") + _track_error("upload", e) + raise typer.Exit(code=1) + + request_id = str(uuid.uuid4())[:8] + try: + resp = client.send_request(ep, values, flags, api_key, timeout=timeout) + except httpx.HTTPError as e: + rprint(f"[bold red]Network error contacting {spec.base_url()}: {e}[/bold red]") + _track_error("network", e) + raise typer.Exit(code=1) from e + + try: + client.raise_for_status(resp) + except client.ApiError as e: + rprint(f"[bold red]API error {e.status}[/bold red]\n{e.body}") + _track_error("api", e) + raise typer.Exit(code=1) from e + + if resp.headers.get("content-type", "").startswith("image/"): + if download: + saved = output.save_binary_response(resp, download, request_id) + output.print_saved([saved]) + else: + rprint( + "[yellow]Binary image response; nothing saved. Pass --download to write it to disk.[/yellow]" + ) + tracking.track_event("generate:success", gen_props) + return - try: - body = resp.json() - except ValueError: - rprint("[bold red]Unexpected non-JSON response.[/bold red]") - rprint(resp.text[:500]) - raise typer.Exit(code=1) + try: + body = resp.json() + except ValueError as e: + rprint("[bold red]Unexpected non-JSON response.[/bold red]") + rprint(resp.text[:500]) + _track_error("non_json_response", e) + raise typer.Exit(code=1) + + if ep.polling: + job_id = poll.extract_job_id(ep.polling, body) or request_id + name = gen_props["model_alias"] or ep.id + if do_async: + if as_json: + output.print_json(body) + else: + rprint(f"[bold green]Submitted:[/bold green] {name}") + rprint(f" job id: {job_id}") + rprint(f" resume: comfy generate resume {name} {job_id}") + # Submitted, not succeeded — the workflow runs on the partner side and completion is + # observed server-side via partner_node:api_call_*. No generate:success pair here. + tracking.track_event( + "generate:submitted", + { + "model": model, + "model_alias": gen_props["model_alias"], + "job_id": job_id, + "partner": gen_props["partner"], + }, + ) + return + + poller = poll.get_poller(ep.polling) + with _spinner() as prog: + task = prog.add_task(f"Generating with {name} (job {job_id})", total=None) + + def _on_progress(p: float) -> None: + prog.update(task, description=f"Generating ({p * 100:.0f}%)") + + try: + result = poller( + body, + api_key=api_key, + timeout=timeout, + on_progress=_on_progress, + create_path=ep.path, + ) + except (client.ApiError, httpx.HTTPError) as e: + _track_error("network" if isinstance(e, httpx.HTTPError) else "api", e) + raise typer.Exit(code=1) from e + try: + _emit_result(result, request_id=job_id, download=download, as_json=as_json) + tracking.track_event("generate:success", gen_props) + except typer.Exit as e: + if (e.exit_code or 0) == 0: + tracking.track_event("generate:success", gen_props) + else: + _track_error("api", e) + raise + return - if ep.polling: - job_id = poll.extract_job_id(ep.polling, body) or request_id - name = spec.preferred_alias(ep.id) or ep.id - if do_async: + adapter = adapters.get(ep.id) + if adapter is not None and adapter.decode_sync is not None: + body = resp.json() if as_json: output.print_json(body) + tracking.track_event("generate:success", gen_props) + return + if not download: + rprint("[yellow]Image data returned inline. Pass --download to save.[/yellow]") + tracking.track_event("generate:success", gen_props) + return + saved = adapter.decode_sync(body, download, request_id) + if saved: + output.print_saved(saved) else: - rprint(f"[bold green]Submitted:[/bold green] {name}") - rprint(f" job id: {job_id}") - rprint(f" resume: comfy generate resume {name} {job_id}") - return - - poller = poll.get_poller(ep.polling) - with _spinner() as prog: - task = prog.add_task(f"Generating with {name} (job {job_id})", total=None) - - def _on_progress(p: float) -> None: - prog.update(task, description=f"Generating ({p * 100:.0f}%)") - - result = poller( - body, - api_key=api_key, - timeout=timeout, - on_progress=_on_progress, - create_path=ep.path, - ) - _emit_result(result, request_id=job_id, download=download, as_json=as_json) - return - - adapter = adapters.get(ep.id) - if adapter is not None and adapter.decode_sync is not None: - body = resp.json() - if as_json: - output.print_json(body) - return - if not download: - rprint("[yellow]Image data returned inline. Pass --download to save.[/yellow]") + rprint("[yellow]No image data found in response.[/yellow]") + output.print_json(body) + tracking.track_event("generate:success", gen_props) return - saved = adapter.decode_sync(body, download, request_id) - if saved: - output.print_saved(saved) - else: - rprint("[yellow]No image data found in response.[/yellow]") - output.print_json(body) - return - result = poll.sync_result_from_response(resp) - _emit_result(result, request_id=request_id, download=download, as_json=as_json) + try: + result = poll.sync_result_from_response(resp) + _emit_result(result, request_id=request_id, download=download, as_json=as_json) + tracking.track_event("generate:success", gen_props) + except typer.Exit as e: + if (e.exit_code or 0) == 0: + tracking.track_event("generate:success", gen_props) + else: + _track_error("api", e) + raise + except typer.Exit: + # Inline raise sites already emitted their lifecycle event. + raise + except Exception as e: + # Safety net so an unexpected exception still pairs generate:start with a terminal generate:error. + _track_error("unknown", e) + raise def _arg_value(args: list[str], *names: str) -> str | None: diff --git a/comfy_cli/tracking.py b/comfy_cli/tracking.py index 106dd438..b5d9db7f 100644 --- a/comfy_cli/tracking.py +++ b/comfy_cli/tracking.py @@ -1,20 +1,36 @@ +from __future__ import annotations + +import atexit import functools import logging as logginglib +import os import sys import uuid +from typing import Any, Protocol import typer from mixpanel import Mixpanel +from posthog import Posthog from comfy_cli import constants, logging, ui from comfy_cli.config_manager import ConfigManager from comfy_cli.workspace_manager import WorkspaceManager -# Ignore logs from urllib3 that Mixpanel uses. +# Ignore logs from urllib3 that Mixpanel/PostHog use. logginglib.getLogger("urllib3").setLevel(logginglib.ERROR) MIXPANEL_TOKEN = "93aeab8962b622d431ac19800ccc9f67" -mp = Mixpanel(MIXPANEL_TOKEN) if MIXPANEL_TOKEN else None + +# phc_* are public client-side write keys designed for embedding — safe to commit, same as MIXPANEL_TOKEN above. +# Override with $POSTHOG_API_KEY. +POSTHOG_TOKEN = os.environ.get( + "POSTHOG_API_KEY", + "phc_iKfK86id4xVYws9LybMje0h44eGtfwFgRPIBehmy8rO", +) +POSTHOG_HOST = "https://t.comfy.org" + +# Only these events get the tracing_id --> workflow_run_id alias on PostHog. +EXECUTION_EVENTS = frozenset({"execution_start", "execution_success", "execution_error"}) # Kwargs whose values must never reach tracking system. # The key is kept (with a redacted marker) so we can still see whether the option was supplied. @@ -37,6 +53,67 @@ # stable agent identity in analytics. _session_only_tracking = False + +class TelemetryProvider(Protocol): + enabled: bool + + def track(self, event_name: str, distinct_id: str | None, properties: dict[str, Any]) -> None: ... + + def flush(self) -> None: ... + + +class MixpanelProvider: + def __init__(self, token: str): + self.client = Mixpanel(token) if token else None + self.enabled = self.client is not None + + def track(self, event_name: str, distinct_id: str | None, properties: dict[str, Any]) -> None: + if not self.enabled or distinct_id is None: + return + self.client.track(distinct_id=distinct_id, event_name=event_name, properties=properties) + + def flush(self) -> None: + # mixpanel-python ships per-call over sync HTTP; nothing to drain. + return + + +class PostHogProvider: + _STANDARD_PROPERTIES = { + "environment": "cli", + "surface": "cli", + "source": "cli", + "trigger_source": "cli", + } + + def __init__(self, token: str, host: str): + self.client: Posthog | None = None + self.enabled = False + if not token: + return + # disable_geoip=False lets PostHog enrich events with IP-derived location. + self.client = Posthog(project_api_key=token, host=host, disable_geoip=False) + self.enabled = True + + def track(self, event_name: str, distinct_id: str | None, properties: dict[str, Any]) -> None: + if not self.enabled or self.client is None or distinct_id is None: + return + merged = {**self._STANDARD_PROPERTIES, **properties} + if event_name in EXECUTION_EVENTS and "tracing_id" in merged: + merged.setdefault("workflow_run_id", merged["tracing_id"]) + self.client.capture(event=event_name, distinct_id=distinct_id, properties=merged) + + def flush(self) -> None: + if self.client is None: + return + # posthog-python ships asynchronously; without flush, short-lived CLI invocations silently drop in-flight events + self.client.flush() + + +PROVIDERS: list[TelemetryProvider] = [ + MixpanelProvider(MIXPANEL_TOKEN), + PostHogProvider(POSTHOG_TOKEN, POSTHOG_HOST), +] + app = typer.Typer() @@ -53,7 +130,12 @@ def disable(): typer.echo(f"Tracking is now {'disabled'}.") -def track_event(event_name: str, properties: any = None): +def track_event(event_name: str, properties: Any = None, *, mixpanel_name: str | None = None): + """Fire ``event_name`` to every enabled telemetry provider. + + ``mixpanel_name``, if supplied, overrides the event name on the Mixpanel pipe only — used to keep + legacy Mixpanel event names while PostHog receives the canonical name. + """ if properties is None: properties = {} logging.debug(f"tracking event called with event_name: {event_name} and properties: {properties}") @@ -61,12 +143,25 @@ def track_event(event_name: str, properties: any = None): if not enable_tracking and not _session_only_tracking: return - try: - properties["cli_version"] = cli_version - properties["tracing_id"] = tracing_id - mp.track(distinct_id=user_id, event_name=event_name, properties=properties) - except Exception as e: - logging.warning(f"Failed to track event: {e}") # Log the error but do not raise + properties = {**properties, "cli_version": cli_version, "tracing_id": tracing_id} + + for provider in PROVIDERS: + provider_event_name = ( + mixpanel_name if (mixpanel_name is not None and isinstance(provider, MixpanelProvider)) else event_name + ) + try: + provider.track(provider_event_name, distinct_id=user_id, properties=dict(properties)) + except Exception as e: + logging.warning(f"Failed to track event via {type(provider).__name__}: {e}") + + +def filter_command_kwargs(kwargs: dict[str, Any]) -> dict[str, Any]: + """Drop ``ctx``/``context`` and redact ``SENSITIVE_TRACKING_KEYS`` values.""" + return { + k: ("" if v is not None else None) if k in SENSITIVE_TRACKING_KEYS else v + for k, v in kwargs.items() + if k != "ctx" and k != "context" + } def track_command(sub_command: str = None): @@ -78,15 +173,7 @@ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): command_name = f"{sub_command}:{func.__name__}" if sub_command is not None else func.__name__ - - # Copy kwargs to avoid mutating original dictionary - # Remove context and ctx from the dictionary as they are not needed for tracking and not serializable. - filtered_kwargs = { - k: ("" if v is not None else None) if k in SENSITIVE_TRACKING_KEYS else v - for k, v in kwargs.items() - if k != "ctx" and k != "context" - } - + filtered_kwargs = filter_command_kwargs(kwargs) logging.debug(f"Tracking command: {command_name} with arguments: {filtered_kwargs}") track_event(command_name, properties=filtered_kwargs) @@ -161,3 +248,14 @@ def init_tracking(enable_tracking: bool): logging.debug("Tracking install event.") config_manager.set(constants.CONFIG_KEY_INSTALL_EVENT_TRIGGERED, "True") track_event("install") + + +def _flush_all_providers() -> None: + for provider in PROVIDERS: + try: + provider.flush() + except Exception as e: # noqa: BLE001 + logging.warning(f"Failed to flush telemetry provider {type(provider).__name__}: {e}") + + +atexit.register(_flush_all_providers) diff --git a/pyproject.toml b/pyproject.toml index 23f55beb..a6fae57b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "mixpanel", "packaging", "pathspec", + "posthog>=6,<8", "psutil", "pyyaml", "questionary", diff --git a/tests/comfy_cli/command/generate/test_app_lifecycle.py b/tests/comfy_cli/command/generate/test_app_lifecycle.py new file mode 100644 index 00000000..c9160461 --- /dev/null +++ b/tests/comfy_cli/command/generate/test_app_lifecycle.py @@ -0,0 +1,332 @@ +"""Execution lifecycle + sub-action event tests for ``comfy generate`` (MAR-52).""" + +import httpx +import pytest +from typer.testing import CliRunner + +from comfy_cli.cmdline import app as cli_app +from comfy_cli.command.generate import app as gen_app + + +@pytest.fixture +def runner(): + return CliRunner() + + +@pytest.fixture +def api_key(monkeypatch): + monkeypatch.setenv("COMFY_API_KEY", "comfyui-test") + return "comfyui-test" + + +@pytest.fixture +def captured_events(monkeypatch): + """Drop-in replacement for the autouse track_event mock that records every + call so tests can make assertions on the emitted lifecycle.""" + events: list[tuple[str, dict]] = [] + + def _record(event_name, properties=None, *, mixpanel_name=None): + events.append((event_name, dict(properties or {}))) + + monkeypatch.setattr("comfy_cli.tracking.prompt_tracking_consent", lambda *a, **kw: None) + monkeypatch.setattr("comfy_cli.tracking.track_event", _record) + monkeypatch.setattr("comfy_cli.command.generate.app.tracking.track_event", _record) + monkeypatch.setattr("comfy_cli.cmdline.tracking.track_event", _record) + return events + + +def _names(events): + return [name for name, _ in events] + + +def _props(events, target_name): + return [props for name, props in events if name == target_name] + + +class TestSubActionEvents: + def test_list_fires_generate_list(self, runner, captured_events): + r = runner.invoke(cli_app, ["generate", "list"]) + assert r.exit_code == 0 + assert "generate:list" in _names(captured_events) + + def test_schema_fires_generate_schema_with_model(self, runner, captured_events): + r = runner.invoke(cli_app, ["generate", "schema", "flux-pro"]) + assert r.exit_code == 0 + schema_events = _props(captured_events, "generate:schema") + assert len(schema_events) == 1 + assert schema_events[0]["model"] == "flux-pro" + + def test_schema_without_model_records_none_model(self, runner, captured_events): + # ``comfy generate schema`` prints an error but the sub-action event + # still fires so we can see invalid usage in analytics. + r = runner.invoke(cli_app, ["generate", "schema"]) + assert r.exit_code == 1 + schema_events = _props(captured_events, "generate:schema") + assert len(schema_events) == 1 + assert schema_events[0]["model"] is None + + def test_resume_fires_generate_resume_with_model_and_job(self, runner, captured_events): + r = runner.invoke(cli_app, ["generate", "resume", "flux-pro", "job-abc"]) + # Exit code 1 because the synthetic poll won't reach a real server, but + # the sub-action event must have fired before that. + resume_events = _props(captured_events, "generate:resume") + assert len(resume_events) == 1 + assert resume_events[0]["model"] == "flux-pro" + assert resume_events[0]["job_id"] == "job-abc" + del r # exit code not relevant here — sub-action event firing is + + def test_resume_flag_like_first_arg_does_not_become_model(self, runner, captured_events): + # If the first positional looks like a flag (e.g. ``--help``), don't + # record it as the model name — that'd be noise in analytics. + runner.invoke(cli_app, ["generate", "resume", "--help"]) + resume_events = _props(captured_events, "generate:resume") + assert len(resume_events) == 1 + assert resume_events[0]["model"] is None + assert resume_events[0]["job_id"] is None + + def test_refresh_fires_generate_refresh(self, runner, captured_events, monkeypatch): + # Mock the httpx call so we don't actually hit the network. + monkeypatch.setattr( + "comfy_cli.command.generate.app.httpx.Client", + lambda *a, **kw: _FakeRefreshClient(), + ) + runner.invoke(cli_app, ["generate", "refresh"]) + assert "generate:refresh" in _names(captured_events) + + def test_upload_fires_generate_upload(self, runner, captured_events, api_key, monkeypatch): + # Stub the upload boundary so the event fires without network IO. + def _raise_apiError(*a, **kw): + from comfy_cli.command.generate import client as _client + + raise _client.ApiError(0, "", "stubbed") + + monkeypatch.setattr("comfy_cli.command.generate.app.upload.upload_target", _raise_apiError) + runner.invoke(cli_app, ["generate", "upload", "/tmp/does-not-exist.png"]) + assert "generate:upload" in _names(captured_events) + + def test_no_generate_lifecycle_on_sub_actions(self, runner, captured_events): + """Sub-actions are not partner-node invocations — they must not fire + ``generate:start/success/error`` (or it'd inflate the parity-window + counts) or ``execution_*`` (which is reserved for ``comfy run``).""" + runner.invoke(cli_app, ["generate", "list"]) + names = _names(captured_events) + assert "generate:start" not in names + assert "generate:success" not in names + assert "generate:error" not in names + assert "execution_start" not in names + assert "execution_success" not in names + assert "execution_error" not in names + + +class _FakeRefreshClient: + """Minimal stand-in for httpx.Client used by ``comfy generate refresh``.""" + + def __enter__(self): + return self + + def __exit__(self, *exc): + return False + + def get(self, *a, **kw): + return httpx.Response(200, text="openapi: 3.0.0\npaths: {}\n") + + +class TestGenerateExecutionHappyPath: + def test_sync_success_emits_start_then_success(self, runner, captured_events, api_key, monkeypatch): + resp = httpx.Response(200, json={"data": [{"url": "https://cdn.example/a.png"}]}) + monkeypatch.setattr(gen_app.client.httpx, "post", lambda *a, **kw: resp) + + r = runner.invoke(cli_app, ["generate", "dalle", "--prompt", "x"]) + assert r.exit_code == 0, r.stdout + + names = _names(captured_events) + assert names.count("generate:start") == 1 + assert names.count("generate:success") == 1 + assert "generate:error" not in names + # Must not bleed into the workflow-lifecycle namespace. + assert "execution_start" not in names + assert "execution_success" not in names + + def test_generate_start_props_carry_model_and_partner(self, runner, captured_events, api_key, monkeypatch): + resp = httpx.Response(200, json={"data": [{"url": "https://cdn.example/a.png"}]}) + monkeypatch.setattr(gen_app.client.httpx, "post", lambda *a, **kw: resp) + runner.invoke(cli_app, ["generate", "dalle", "--prompt", "x"]) + + start_props = _props(captured_events, "generate:start")[0] + assert start_props["model"] == "dalle" + # NOTE: start_props captures the snapshot at function entry, which + # is BEFORE spec lookup populates partner/model_alias on the shared + # gen_props dict. The success/error events that fire later carry + # the fully-populated values. + success_props = _props(captured_events, "generate:success")[0] + assert success_props["model"] == "dalle" + assert success_props["partner"] == "openai" + assert success_props["async"] is False + assert success_props["has_download"] is False + + def test_download_flag_sets_has_download(self, runner, captured_events, api_key, tmp_path, monkeypatch): + resp = httpx.Response(200, json={"data": [{"url": "https://cdn.example/a.png"}]}) + monkeypatch.setattr(gen_app.client.httpx, "post", lambda *a, **kw: resp) + monkeypatch.setattr("comfy_cli.command.generate.client.download_bytes", lambda *a, **kw: b"png") + download = str(tmp_path / "out.png") + runner.invoke(cli_app, ["generate", "dalle", "--prompt", "x", "--download", download]) + + success_props = _props(captured_events, "generate:success")[0] + assert success_props["has_download"] is True + + +class TestGenerateExecutionErrorPaths: + def test_api_error_emits_generate_error_with_kind_api(self, runner, captured_events, api_key, monkeypatch): + resp = httpx.Response(401, json={"message": "Invalid token"}) + monkeypatch.setattr(gen_app.client.httpx, "post", lambda *a, **kw: resp) + + r = runner.invoke(cli_app, ["generate", "flux-pro", "--prompt", "x", "--width", "1", "--height", "1"]) + assert r.exit_code == 1 + + err_props = _props(captured_events, "generate:error") + assert len(err_props) == 1 + assert err_props[0]["error_kind"] == "api" + assert "generate:success" not in _names(captured_events) + + def test_network_error_emits_generate_error_with_kind_network(self, runner, captured_events, api_key, monkeypatch): + def boom(*a, **kw): + raise httpx.ConnectError("connection refused") + + monkeypatch.setattr(gen_app.client.httpx, "post", boom) + r = runner.invoke(cli_app, ["generate", "flux-pro", "--prompt", "x", "--width", "1", "--height", "1"]) + assert r.exit_code == 1 + + err_props = _props(captured_events, "generate:error") + assert len(err_props) == 1 + assert err_props[0]["error_kind"] == "network" + + def test_non_json_response_emits_generate_error_with_kind_non_json( + self, runner, captured_events, api_key, monkeypatch + ): + resp = httpx.Response(200, text="not really json", headers={"content-type": "text/plain"}) + monkeypatch.setattr(gen_app.client.httpx, "post", lambda *a, **kw: resp) + r = runner.invoke(cli_app, ["generate", "dalle", "--prompt", "x"]) + assert r.exit_code == 1 + + err_props = _props(captured_events, "generate:error") + assert len(err_props) == 1 + assert err_props[0]["error_kind"] == "non_json_response" + + def test_schema_error_in_args_emits_generate_error_with_kind_schema(self, runner, captured_events, api_key): + r = runner.invoke( + cli_app, + ["generate", "flux-pro", "--prompt", "x", "--width", "abc", "--height", "1"], + ) + assert r.exit_code == 1 + + err_props = _props(captured_events, "generate:error") + assert len(err_props) == 1 + assert err_props[0]["error_kind"] == "schema" + + def test_unknown_model_emits_generate_error_with_kind_schema(self, runner, captured_events, api_key): + # Pre-validation consistency: typing a bogus model name fires the full + # generate:start → generate:error pair (mirrors comfy run firing + # execution_start → execution_error for a bogus workflow path). + r = runner.invoke(cli_app, ["generate", "bogus-model-name", "--prompt", "x"]) + assert r.exit_code == 1 + + names = _names(captured_events) + assert names.count("generate:start") == 1 + assert names.count("generate:error") == 1 + err_props = _props(captured_events, "generate:error")[0] + assert err_props["error_kind"] == "schema" + assert err_props["model"] == "bogus-model-name" + # model_alias/partner stay None because the lookup never succeeded. + assert err_props["model_alias"] is None + assert err_props["partner"] is None + + def test_upload_failure_emits_generate_error_with_kind_upload( + self, runner, captured_events, api_key, tmp_path, monkeypatch + ): + # _apply_upload_transforms calls ``upload.upload_path`` when a local + # file path is supplied for a flag with ``upload_mode="url"``. If the + # upload service errors, ``_generate`` emits ``error_kind=upload``. + local_file = tmp_path / "webhook.json" + local_file.write_bytes(b"{}") + + def _boom_upload(path, key): + from comfy_cli.command.generate import client as _client + + raise _client.ApiError(500, "", "upload service down") + + monkeypatch.setattr("comfy_cli.command.generate.app.upload.upload_path", _boom_upload) + + # flux-pro's ``webhook_url`` flag has ``upload_mode="url"``; passing a + # local file path triggers the upload transform on the way in. + r = runner.invoke( + cli_app, + [ + "generate", + "flux-pro", + "--prompt", + "x", + "--width", + "1", + "--height", + "1", + "--webhook_url", + str(local_file), + ], + ) + assert r.exit_code == 1 + + err_props = _props(captured_events, "generate:error") + assert len(err_props) == 1 + assert err_props[0]["error_kind"] == "upload" + + def test_unexpected_exception_emits_generate_error_with_kind_unknown( + self, runner, captured_events, api_key, monkeypatch + ): + # Safety net: anything that isn't a known exception type should still + # produce a paired generate:error so generate:start isn't orphaned. + def boom(*a, **kw): + raise RuntimeError("synthetic crash") + + monkeypatch.setattr(gen_app.client.httpx, "post", boom) + r = runner.invoke(cli_app, ["generate", "flux-pro", "--prompt", "x", "--width", "1", "--height", "1"]) + assert r.exit_code != 0 + + err_props = _props(captured_events, "generate:error") + assert len(err_props) == 1 + assert err_props[0]["error_kind"] == "unknown" + assert err_props[0]["error_type"] == "RuntimeError" + + +class TestGenerateAsyncSubmission: + def test_async_emits_generate_submitted_not_success(self, runner, captured_events, api_key, monkeypatch): + submit = httpx.Response(200, json={"id": "job-xyz", "polling_url": "https://x/poll"}) + monkeypatch.setattr(gen_app.client.httpx, "post", lambda *a, **kw: submit) + + r = runner.invoke( + cli_app, + ["generate", "flux-pro", "--prompt", "x", "--width", "1", "--height", "1", "--async"], + ) + assert r.exit_code == 0 + + names = _names(captured_events) + assert "generate:start" in names + assert "generate:submitted" in names + # Critical: async submission is not "succeeded" — completion happens on + # a later ``comfy generate resume`` invocation. + assert "generate:success" not in names + assert "generate:error" not in names + + submitted = _props(captured_events, "generate:submitted")[0] + assert submitted["model"] == "flux-pro" + assert submitted["job_id"] == "job-xyz" + + +class TestGenerateHelp: + def test_per_model_help_does_not_fire_lifecycle(self, runner, captured_events): + # ``--help`` is a help-display action, not an execution attempt. + r = runner.invoke(cli_app, ["generate", "flux-pro", "--help"]) + assert r.exit_code == 0 + names = _names(captured_events) + assert "generate:start" not in names + assert "generate:success" not in names + assert "generate:error" not in names diff --git a/tests/comfy_cli/command/test_run_json.py b/tests/comfy_cli/command/test_run_json.py index 773d9363..cc492843 100644 --- a/tests/comfy_cli/command/test_run_json.py +++ b/tests/comfy_cli/command/test_run_json.py @@ -903,7 +903,7 @@ def test_cli_json_print_prompt_emits_clean_ndjson(self, tmp_path): def test_cli_json_with_fresh_consent_state_stays_clean(self, tmp_path): # The exact regression scenario: a fresh machine where consent has # never been recorded. The entry callback enables session-only - # tracking via the non-TTY branch (mocked Mixpanel client so no + # tracking via the non-TTY branch (PROVIDERS swapped out so no # network), and the resulting stdout must still be clean NDJSON. from typer.testing import CliRunner @@ -915,9 +915,8 @@ def test_cli_json_with_fresh_consent_state_stays_clean(self, tmp_path): cfg_dir.mkdir() with ( patch.object(_Cls, "get_config_path", return_value=str(cfg_dir)), - patch("comfy_cli.tracking.mp") as mock_mp, + patch("comfy_cli.tracking.PROVIDERS", []), ): - mock_mp.track.return_value = None runner = CliRunner() result = runner.invoke( app, ["run", "--workflow", self._make_workflow_file(tmp_path), "--json", "--print-prompt"] diff --git a/tests/comfy_cli/test_run_execution_lifecycle.py b/tests/comfy_cli/test_run_execution_lifecycle.py new file mode 100644 index 00000000..e2fe5ff1 --- /dev/null +++ b/tests/comfy_cli/test_run_execution_lifecycle.py @@ -0,0 +1,147 @@ +"""Execution lifecycle tests for ``comfy run`` (MAR-52). + +The CLI's ``run`` command historically emitted a single ``run`` event via the +``@track_command()`` decorator. Post-MAR-52 it manually emits +``execution_start`` / ``execution_success`` / ``execution_error`` against the +canonical PRD §5.1 schema, with ``mixpanel_name="run"`` on the start event to +preserve Mixpanel-side continuity for the 219K/week legacy stream. +""" + +from unittest.mock import patch + +import pytest +import typer +from typer.testing import CliRunner + + +@pytest.fixture +def runner(): + return CliRunner() + + +@pytest.fixture +def tracked_run(monkeypatch): + """Patch tracking entry points so test invocations don't talk to the + consent prompt or any real provider, but capture every ``track_event`` + call for assertions.""" + captured: list[tuple[str, dict, dict]] = [] + + def _record(event_name, properties=None, *, mixpanel_name=None): + captured.append((event_name, dict(properties or {}), {"mixpanel_name": mixpanel_name})) + + monkeypatch.setattr("comfy_cli.tracking.prompt_tracking_consent", lambda *a, **kw: None) + monkeypatch.setattr("comfy_cli.tracking.track_event", _record) + # cmdline.py imports tracking as a module; patch the reference there too + # so the call site sees the recorder, not the original. + monkeypatch.setattr("comfy_cli.cmdline.tracking.track_event", _record) + return captured + + +def _events(captured): + """Drop the kwargs tuple; return [(name, properties), ...].""" + return [(name, props) for name, props, _ in captured] + + +def _event_names(captured): + return [name for name, _, _ in captured] + + +class TestRunHappyPath: + def test_emits_execution_start_then_success(self, runner, tracked_run): + from comfy_cli.cmdline import app + + with patch("comfy_cli.cmdline.run_inner.execute") as mock_execute: + mock_execute.return_value = None + result = runner.invoke(app, ["run", "--workflow", "wf.json"]) + + assert result.exit_code == 0, f"stderr={result.stderr!r} exc={result.exception!r}" + assert _event_names(tracked_run) == ["execution_start", "execution_success"] + + def test_execution_start_uses_mixpanel_name_run_alias(self, runner, tracked_run): + from comfy_cli.cmdline import app + + with patch("comfy_cli.cmdline.run_inner.execute"): + runner.invoke(app, ["run", "--workflow", "wf.json"]) + + # Only execution_start carries the alias; success/error do not. + start_kwargs = next(kw for name, _, kw in tracked_run if name == "execution_start") + success_kwargs = next(kw for name, _, kw in tracked_run if name == "execution_success") + assert start_kwargs["mixpanel_name"] == "run" + assert success_kwargs["mixpanel_name"] is None + + def test_properties_carry_workflow_and_other_kwargs(self, runner, tracked_run): + from comfy_cli.cmdline import app + + with patch("comfy_cli.cmdline.run_inner.execute"): + runner.invoke( + app, + ["run", "--workflow", "wf.json", "--timeout", "60", "--host", "1.2.3.4", "--port", "9000"], + ) + + for name, props in _events(tracked_run): + if name == "execution_start": + assert props["workflow"] == "wf.json" + assert props["timeout"] == 60 + assert props["host"] == "1.2.3.4" + assert props["port"] == 9000 + break + else: + pytest.fail("execution_start not emitted") + + def test_api_key_is_redacted_in_lifecycle_properties(self, runner, tracked_run, monkeypatch): + from comfy_cli.cmdline import app + + # Avoid env var leakage masking the redaction check. + monkeypatch.delenv("COMFY_API_KEY", raising=False) + with patch("comfy_cli.cmdline.run_inner.execute"): + runner.invoke(app, ["run", "--workflow", "wf.json", "--api-key", "sk-supersecret"]) + + for name, props in _events(tracked_run): + assert props.get("api_key") == "", f"{name} leaked api_key={props.get('api_key')!r}" + assert "sk-supersecret" not in str(props) + + +class TestRunFailurePath: + def test_typer_exit_1_emits_execution_error_with_exit_code(self, runner, tracked_run): + from comfy_cli.cmdline import app + + with patch("comfy_cli.cmdline.run_inner.execute") as mock_execute: + mock_execute.side_effect = typer.Exit(code=1) + result = runner.invoke(app, ["run", "--workflow", "wf.json"]) + + assert result.exit_code == 1 + names = _event_names(tracked_run) + assert "execution_start" in names + assert "execution_error" in names + assert "execution_success" not in names + + err_props = next(props for name, props in _events(tracked_run) if name == "execution_error") + assert err_props["error_type"] == "Exit" + assert err_props["exit_code"] == 1 + + def test_unexpected_exception_emits_execution_error(self, runner, tracked_run): + from comfy_cli.cmdline import app + + with patch("comfy_cli.cmdline.run_inner.execute") as mock_execute: + mock_execute.side_effect = ValueError("oops") + result = runner.invoke(app, ["run", "--workflow", "wf.json"]) + + # The exception propagates; CliRunner surfaces it as a nonzero exit. + assert result.exit_code != 0 + names = _event_names(tracked_run) + assert names == ["execution_start", "execution_error"] + err_props = next(props for name, props in _events(tracked_run) if name == "execution_error") + assert err_props["error_type"] == "ValueError" + + def test_typer_exit_0_is_treated_as_success(self, runner, tracked_run): + # A clean early-exit (e.g. --print-prompt currently doesn't do this for + # `comfy run`, but any future early-success path should be analytics- + # equivalent to a normal completion). + from comfy_cli.cmdline import app + + with patch("comfy_cli.cmdline.run_inner.execute") as mock_execute: + mock_execute.side_effect = typer.Exit(code=0) + result = runner.invoke(app, ["run", "--workflow", "wf.json"]) + + assert result.exit_code == 0 + assert _event_names(tracked_run) == ["execution_start", "execution_success"] diff --git a/tests/comfy_cli/test_tracking.py b/tests/comfy_cli/test_tracking.py index a2e619eb..f4440a1c 100644 --- a/tests/comfy_cli/test_tracking.py +++ b/tests/comfy_cli/test_tracking.py @@ -11,7 +11,11 @@ @pytest.fixture def tracking_module(tmp_path): - """Yield comfy_cli.tracking with a fresh tmp-path ConfigManager and a mocked Mixpanel client.""" + """Yield comfy_cli.tracking with a fresh tmp-path ConfigManager and a single + mocked TelemetryProvider in PROVIDERS so tests can assert on the fan-out. + + Exposes the mock as ``tracking_mod.provider`` for assertions. + """ config_dir = tmp_path / "comfy-cli" config_dir.mkdir() with patch.object(_ConfigManagerCls, "get_config_path", return_value=str(config_dir)): @@ -19,49 +23,70 @@ def tracking_module(tmp_path): import comfy_cli.tracking as tracking_mod + fake_provider = MagicMock() + fake_provider.enabled = True + # Mirror MixpanelProvider's no-op-on-missing-distinct-id behavior so opt-out + # paths look identical from the test's perspective. + fake_provider.track.return_value = None + with ( patch.object(tracking_mod, "config_manager", cfg), patch.object(tracking_mod, "user_id", None), patch.object(tracking_mod, "cli_version", "test-cli-version"), patch.object(tracking_mod, "tracing_id", "test-tracing-id"), - patch.object(tracking_mod, "mp", MagicMock()), + patch.object(tracking_mod, "PROVIDERS", [fake_provider]), patch.object(tracking_mod, "_session_only_tracking", False), ): - yield tracking_mod + # Stash the mock on the module for convenient access from tests + # without changing the fixture return contract. + tracking_mod.provider = fake_provider # type: ignore[attr-defined] + try: + yield tracking_mod + finally: + del tracking_mod.provider + + +def _last_track_call(provider): + args, kwargs = provider.track.call_args + # Provider.track(event_name, distinct_id=..., properties=...) + event_name = args[0] if args else kwargs.get("event_name") + distinct_id = kwargs.get("distinct_id", args[1] if len(args) > 1 else None) + properties = kwargs.get("properties", args[2] if len(args) > 2 else {}) + return event_name, distinct_id, properties class TestTrackEvent: def test_short_circuits_when_disabled(self, tracking_module): tracking_module.config_manager.set(constants.CONFIG_KEY_ENABLE_TRACKING, "False") tracking_module.track_event("some_event") - tracking_module.mp.track.assert_not_called() + tracking_module.provider.track.assert_not_called() def test_short_circuits_when_not_configured(self, tracking_module): tracking_module.track_event("some_event") - tracking_module.mp.track.assert_not_called() + tracking_module.provider.track.assert_not_called() def test_fires_when_enabled(self, tracking_module): tracking_module.config_manager.set(constants.CONFIG_KEY_ENABLE_TRACKING, "True") tracking_module.track_event("some_event", {"k": "v"}) - tracking_module.mp.track.assert_called_once() - _, kwargs = tracking_module.mp.track.call_args - assert kwargs["event_name"] == "some_event" - assert kwargs["properties"]["k"] == "v" - assert "cli_version" in kwargs["properties"] - assert "tracing_id" in kwargs["properties"] + tracking_module.provider.track.assert_called_once() + event_name, _, properties = _last_track_call(tracking_module.provider) + assert event_name == "some_event" + assert properties["k"] == "v" + assert "cli_version" in properties + assert "tracing_id" in properties def test_properties_default_to_empty_dict(self, tracking_module): tracking_module.config_manager.set(constants.CONFIG_KEY_ENABLE_TRACKING, "True") tracking_module.track_event("some_event") - tracking_module.mp.track.assert_called_once() - _, kwargs = tracking_module.mp.track.call_args - assert set(kwargs["properties"].keys()) == {"cli_version", "tracing_id"} + tracking_module.provider.track.assert_called_once() + _, _, properties = _last_track_call(tracking_module.provider) + assert set(properties.keys()) == {"cli_version", "tracing_id"} - def test_swallows_mixpanel_errors(self, tracking_module): + def test_swallows_provider_errors(self, tracking_module): tracking_module.config_manager.set(constants.CONFIG_KEY_ENABLE_TRACKING, "True") - tracking_module.mp.track.side_effect = RuntimeError("boom") + tracking_module.provider.track.side_effect = RuntimeError("boom") tracking_module.track_event("some_event") - tracking_module.mp.track.assert_called_once() + tracking_module.provider.track.assert_called_once() class TestTrackCommandRedaction: @@ -76,12 +101,11 @@ def some_cmd(workflow, api_key=None): some_cmd(workflow="wf.json", api_key="sk-supersecret") - tracking_module.mp.track.assert_called_once() - _, kwargs = tracking_module.mp.track.call_args - props = kwargs["properties"] - assert props["api_key"] == "" - assert props["workflow"] == "wf.json" - assert "sk-supersecret" not in str(props) + tracking_module.provider.track.assert_called_once() + _, _, properties = _last_track_call(tracking_module.provider) + assert properties["api_key"] == "" + assert properties["workflow"] == "wf.json" + assert "sk-supersecret" not in str(properties) def test_api_key_none_stays_none(self, tracking_module): # When the user didn't pass --api-key (or set $COMFY_API_KEY), we still @@ -95,8 +119,8 @@ def some_cmd(workflow, api_key=None): some_cmd(workflow="wf.json", api_key=None) - _, kwargs = tracking_module.mp.track.call_args - assert kwargs["properties"]["api_key"] is None + _, _, properties = _last_track_call(tracking_module.provider) + assert properties["api_key"] is None class TestInitTrackingRoundTrip: @@ -109,13 +133,13 @@ class TestInitTrackingRoundTrip: def test_disable_is_respected_by_track_event(self, tracking_module): tracking_module.init_tracking(False) tracking_module.track_event("some_event") - tracking_module.mp.track.assert_not_called() + tracking_module.provider.track.assert_not_called() def test_enable_is_respected_by_track_event(self, tracking_module): tracking_module.init_tracking(True) - tracking_module.mp.track.reset_mock() + tracking_module.provider.track.reset_mock() tracking_module.track_event("some_event") - tracking_module.mp.track.assert_called_once() + tracking_module.provider.track.assert_called_once() def test_disable_persists_as_parseable_bool(self, tracking_module): tracking_module.init_tracking(False) @@ -127,8 +151,8 @@ def test_enable_generates_user_id(self, tracking_module): generated_user_id = tracking_module.config_manager.get(constants.CONFIG_KEY_USER_ID) assert generated_user_id is not None assert tracking_module.user_id == generated_user_id - _, kwargs = tracking_module.mp.track.call_args - assert kwargs["distinct_id"] == generated_user_id + _, distinct_id, _ = _last_track_call(tracking_module.provider) + assert distinct_id == generated_user_id def test_disable_does_not_generate_user_id(self, tracking_module): tracking_module.init_tracking(False) @@ -136,9 +160,9 @@ def test_disable_does_not_generate_user_id(self, tracking_module): def test_install_event_fires_once_across_calls(self, tracking_module): tracking_module.init_tracking(True) - assert tracking_module.mp.track.call_count == 1 + assert tracking_module.provider.track.call_count == 1 tracking_module.init_tracking(True) - assert tracking_module.mp.track.call_count == 1 + assert tracking_module.provider.track.call_count == 1 class TestPromptTrackingConsent: @@ -172,10 +196,10 @@ def test_session_only_tracking_fires_track_event(self, tracking_module): ): tracking_module.prompt_tracking_consent() tracking_module.track_event("some_event", {"k": "v"}) - tracking_module.mp.track.assert_called_once() - _, kwargs = tracking_module.mp.track.call_args - assert kwargs["event_name"] == "some_event" - assert kwargs["distinct_id"] is not None + tracking_module.provider.track.assert_called_once() + event_name, distinct_id, _ = _last_track_call(tracking_module.provider) + assert event_name == "some_event" + assert distinct_id is not None def test_session_only_persists_user_id(self, tracking_module): with ( diff --git a/tests/comfy_cli/test_tracking_providers.py b/tests/comfy_cli/test_tracking_providers.py new file mode 100644 index 00000000..dc16b6d4 --- /dev/null +++ b/tests/comfy_cli/test_tracking_providers.py @@ -0,0 +1,250 @@ +"""Provider-level tests for the dual-send telemetry refactor (MAR-52). + +These cover the contract each provider has to honor — Mixpanel keeps legacy +event names via the ``mixpanel_name`` alias kwarg, PostHog stamps every event +with the standard CLI properties and aliases ``tracing_id`` to +``workflow_run_id`` on the canonical execution lifecycle events. +""" + +from unittest.mock import MagicMock, patch + +import pytest + +from comfy_cli import constants +from comfy_cli.config_manager import ConfigManager +from comfy_cli.tracking import EXECUTION_EVENTS, MixpanelProvider, PostHogProvider + +_ConfigManagerCls = ConfigManager.__closure__[0].cell_contents + + +@pytest.fixture +def tracking_with_two_providers(tmp_path): + """Yield comfy_cli.tracking with a MixpanelProvider + PostHogProvider pair + whose underlying clients are MagicMocks. Lets tests assert on the fan-out + without hitting the network.""" + config_dir = tmp_path / "comfy-cli" + config_dir.mkdir() + with patch.object(_ConfigManagerCls, "get_config_path", return_value=str(config_dir)): + cfg = _ConfigManagerCls() + + import comfy_cli.tracking as tracking_mod + + mixpanel_provider = MixpanelProvider("token-mp") + mixpanel_provider.client = MagicMock() + mixpanel_provider.enabled = True + + posthog_provider = PostHogProvider.__new__(PostHogProvider) + posthog_provider.client = MagicMock() + posthog_provider.enabled = True + + with ( + patch.object(tracking_mod, "config_manager", cfg), + patch.object(tracking_mod, "user_id", "test-distinct-id"), + patch.object(tracking_mod, "cli_version", "test-cli-version"), + patch.object(tracking_mod, "tracing_id", "test-tracing-id"), + patch.object(tracking_mod, "PROVIDERS", [mixpanel_provider, posthog_provider]), + patch.object(tracking_mod, "_session_only_tracking", False), + ): + tracking_mod.config_manager.set(constants.CONFIG_KEY_ENABLE_TRACKING, "True") + yield tracking_mod, mixpanel_provider, posthog_provider + + +def _posthog_capture_kwargs(client_mock): + """Return the last ``capture(...)`` keyword arguments as a dict.""" + args, kwargs = client_mock.capture.call_args + if "event" not in kwargs and args: + kwargs = {"event": args[0], **kwargs} + return kwargs + + +class TestDualFanOut: + def test_track_event_fans_out_to_both_providers(self, tracking_with_two_providers): + tracking_mod, mp_provider, ph_provider = tracking_with_two_providers + tracking_mod.track_event("some_event", {"k": "v"}) + + mp_provider.client.track.assert_called_once() + ph_provider.client.capture.assert_called_once() + + def test_opt_out_short_circuits_both_providers(self, tracking_with_two_providers): + tracking_mod, mp_provider, ph_provider = tracking_with_two_providers + tracking_mod.config_manager.set(constants.CONFIG_KEY_ENABLE_TRACKING, "False") + tracking_mod.track_event("some_event") + + mp_provider.client.track.assert_not_called() + ph_provider.client.capture.assert_not_called() + + def test_one_provider_raising_does_not_block_the_other(self, tracking_with_two_providers): + tracking_mod, mp_provider, ph_provider = tracking_with_two_providers + mp_provider.client.track.side_effect = RuntimeError("mixpanel down") + + tracking_mod.track_event("some_event") + + # Mixpanel raised but PostHog still got the call. + ph_provider.client.capture.assert_called_once() + + def test_provider_order_does_not_matter_for_failure_isolation(self, tracking_with_two_providers): + tracking_mod, mp_provider, ph_provider = tracking_with_two_providers + ph_provider.client.capture.side_effect = RuntimeError("posthog down") + + tracking_mod.track_event("some_event") + + # PostHog raised but Mixpanel still got the call (it ran first). + mp_provider.client.track.assert_called_once() + + +class TestPostHogStandardProperties: + def test_environment_surface_source_are_stamped(self, tracking_with_two_providers): + tracking_mod, _, ph_provider = tracking_with_two_providers + tracking_mod.track_event("any_event") + + capture_kwargs = _posthog_capture_kwargs(ph_provider.client) + props = capture_kwargs["properties"] + assert props["environment"] == "cli" + assert props["surface"] == "cli" + assert props["source"] == "cli" + assert props["trigger_source"] == "cli" + assert props["cli_version"] == "test-cli-version" + assert props["tracing_id"] == "test-tracing-id" + + def test_caller_properties_win_over_defaults(self, tracking_with_two_providers): + tracking_mod, _, ph_provider = tracking_with_two_providers + tracking_mod.track_event("any_event", {"surface": "custom"}) + + capture_kwargs = _posthog_capture_kwargs(ph_provider.client) + assert capture_kwargs["properties"]["surface"] == "custom" + + def test_distinct_id_is_user_id(self, tracking_with_two_providers): + tracking_mod, _, ph_provider = tracking_with_two_providers + tracking_mod.track_event("any_event") + + capture_kwargs = _posthog_capture_kwargs(ph_provider.client) + assert capture_kwargs["distinct_id"] == "test-distinct-id" + + def test_mixpanel_does_not_receive_posthog_standard_props(self, tracking_with_two_providers): + # The Mixpanel pipe has 2 years of history without these CLI-canonical + # props; injecting them would dirty the schema. PostHogProvider owns + # the env/surface/source stamping, not the shared track_event flow. + tracking_mod, mp_provider, _ = tracking_with_two_providers + tracking_mod.track_event("any_event") + + _, kwargs = mp_provider.client.track.call_args + props = kwargs["properties"] + assert "environment" not in props + assert "surface" not in props + assert "source" not in props + + +class TestWorkflowRunIdAlias: + @pytest.mark.parametrize("event_name", sorted(EXECUTION_EVENTS)) + def test_execution_events_get_workflow_run_id(self, tracking_with_two_providers, event_name): + tracking_mod, _, ph_provider = tracking_with_two_providers + tracking_mod.track_event(event_name) + + capture_kwargs = _posthog_capture_kwargs(ph_provider.client) + props = capture_kwargs["properties"] + assert props["workflow_run_id"] == "test-tracing-id" + assert props["tracing_id"] == "test-tracing-id" + + def test_non_execution_events_do_not_get_workflow_run_id(self, tracking_with_two_providers): + tracking_mod, _, ph_provider = tracking_with_two_providers + tracking_mod.track_event("install") + + capture_kwargs = _posthog_capture_kwargs(ph_provider.client) + assert "workflow_run_id" not in capture_kwargs["properties"] + + def test_caller_workflow_run_id_is_not_overwritten(self, tracking_with_two_providers): + tracking_mod, _, ph_provider = tracking_with_two_providers + tracking_mod.track_event("execution_start", {"workflow_run_id": "caller-supplied"}) + + capture_kwargs = _posthog_capture_kwargs(ph_provider.client) + assert capture_kwargs["properties"]["workflow_run_id"] == "caller-supplied" + + +class TestMixpanelLegacyNameAlias: + def test_mixpanel_name_kwarg_routes_to_mixpanel_only(self, tracking_with_two_providers): + tracking_mod, mp_provider, ph_provider = tracking_with_two_providers + tracking_mod.track_event("execution_start", mixpanel_name="run") + + _, mp_kwargs = mp_provider.client.track.call_args + assert mp_kwargs["event_name"] == "run" + + ph_kwargs = _posthog_capture_kwargs(ph_provider.client) + assert ph_kwargs["event"] == "execution_start" + + def test_without_alias_both_providers_get_same_name(self, tracking_with_two_providers): + tracking_mod, mp_provider, ph_provider = tracking_with_two_providers + tracking_mod.track_event("execution_success") + + _, mp_kwargs = mp_provider.client.track.call_args + ph_kwargs = _posthog_capture_kwargs(ph_provider.client) + assert mp_kwargs["event_name"] == "execution_success" + assert ph_kwargs["event"] == "execution_success" + + +class TestProviderConstruction: + def test_posthog_with_empty_token_is_disabled_and_silent(self): + provider = PostHogProvider("", "https://t.comfy.org") + assert provider.enabled is False + # Calling .track on a disabled provider must not raise. + provider.track("any_event", "distinct_id", {}) + + def test_posthog_with_valid_token_constructs_client(self): + provider = PostHogProvider("phc_test", "https://t.comfy.org") + assert provider.enabled is True + assert provider.client is not None + + def test_mixpanel_with_empty_token_is_disabled(self): + provider = MixpanelProvider("") + assert provider.enabled is False + + def test_posthog_track_skips_when_distinct_id_is_none(self, tracking_with_two_providers): + tracking_mod, _, ph_provider = tracking_with_two_providers + with patch.object(tracking_mod, "user_id", None): + tracking_mod.track_event("execution_start") + + ph_provider.client.capture.assert_not_called() + + +class TestRedactionThroughFanOut: + def test_api_key_redaction_reaches_both_providers(self, tracking_with_two_providers): + tracking_mod, mp_provider, ph_provider = tracking_with_two_providers + + @tracking_mod.track_command() + def fake_cmd(workflow, api_key=None): + return None + + fake_cmd(workflow="wf.json", api_key="sk-supersecret") + + _, mp_kwargs = mp_provider.client.track.call_args + ph_kwargs = _posthog_capture_kwargs(ph_provider.client) + assert mp_kwargs["properties"]["api_key"] == "" + assert ph_kwargs["properties"]["api_key"] == "" + assert "sk-supersecret" not in str(mp_kwargs["properties"]) + assert "sk-supersecret" not in str(ph_kwargs["properties"]) + + +class TestAtexitFlush: + def test_flush_all_providers_calls_each_flush(self): + """The module registers ``_flush_all_providers`` with ``atexit`` at import + time. Verify that helper drains every enabled provider so short-lived + CLI invocations don't silently drop in-flight PostHog events.""" + import comfy_cli.tracking as tracking_mod + + p1 = MagicMock() + p2 = MagicMock() + with patch.object(tracking_mod, "PROVIDERS", [p1, p2]): + tracking_mod._flush_all_providers() + + p1.flush.assert_called_once() + p2.flush.assert_called_once() + + def test_flush_swallows_provider_errors(self): + import comfy_cli.tracking as tracking_mod + + p1 = MagicMock() + p1.flush.side_effect = RuntimeError("flush failed") + p2 = MagicMock() + with patch.object(tracking_mod, "PROVIDERS", [p1, p2]): + tracking_mod._flush_all_providers() + + p2.flush.assert_called_once() From e9aa5db4c8d64673c4fee04ec18a6a4d4bccccf6 Mon Sep 17 00:00:00 2001 From: Alexander Piskun Date: Thu, 21 May 2026 15:29:47 +0000 Subject: [PATCH 2/2] fix(generate): reject flag-like positionals in resume Telemetry filtered flag-shaped args but _resume() didn't, so 'comfy generate resume flux-pro --json' polled a job called '--json' instead of showing the usage hint. Align the dispatch validation. Signed-off-by: Alexander Piskun --- comfy_cli/command/generate/app.py | 2 +- tests/comfy_cli/command/generate/test_app_lifecycle.py | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/comfy_cli/command/generate/app.py b/comfy_cli/command/generate/app.py index 35c66f99..350b35a6 100644 --- a/comfy_cli/command/generate/app.py +++ b/comfy_cli/command/generate/app.py @@ -505,7 +505,7 @@ def _apply_upload_transforms(values: dict, flags: list[schema.FlagDef], endpoint def _resume(extra_args: list[str]) -> None: - if len(extra_args) < 2: + if len(extra_args) < 2 or extra_args[0].startswith("-") or extra_args[1].startswith("-"): rprint("[bold red]Usage: comfy generate resume [--download PATH] [--json][/bold red]") raise typer.Exit(code=1) model, job_id = extra_args[0], extra_args[1] diff --git a/tests/comfy_cli/command/generate/test_app_lifecycle.py b/tests/comfy_cli/command/generate/test_app_lifecycle.py index c9160461..a8cbb0a3 100644 --- a/tests/comfy_cli/command/generate/test_app_lifecycle.py +++ b/tests/comfy_cli/command/generate/test_app_lifecycle.py @@ -84,6 +84,14 @@ def test_resume_flag_like_first_arg_does_not_become_model(self, runner, captured assert resume_events[0]["model"] is None assert resume_events[0]["job_id"] is None + def test_resume_flag_like_job_id_surfaces_usage_error(self, runner, captured_events): + # Keep the telemetry-side flag-rejection (line 75) consistent with what + # ``_resume()`` accepts — otherwise ``resume flux-pro --json`` polls a + # job called ``--json`` instead of showing the usage hint. + r = runner.invoke(cli_app, ["generate", "resume", "flux-pro", "--json"]) + assert r.exit_code == 1 + assert "Usage: comfy generate resume" in r.output + def test_refresh_fires_generate_refresh(self, runner, captured_events, monkeypatch): # Mock the httpx call so we don't actually hit the network. monkeypatch.setattr(