Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
from data_designer.config.utils.type_helpers import StrEnum
from data_designer.config.utils.warning_helpers import warn_at_caller
from data_designer.config.version import get_library_version
from data_designer.engine import flags
from data_designer.engine.column_generators.generators.base import (
ColumnGenerator,
ColumnGeneratorWithModel,
GenerationStrategy,
)
from data_designer.engine.column_generators.utils.generator_classification import column_type_is_model_generated
from data_designer.engine.compiler import compile_data_designer_config
from data_designer.engine.dataset_builders.errors import DatasetGenerationError
from data_designer.engine.dataset_builders.multi_column_configs import MultiColumnConfig
Expand All @@ -55,6 +55,7 @@
from data_designer.engine.models.telemetry import InferenceEvent, NemoSourceEnum, TaskStatusEnum, TelemetryHandler
from data_designer.engine.processing.processors.base import Processor
from data_designer.engine.processing.processors.drop_columns import DropColumnsProcessor
from data_designer.engine.readiness import run_readiness_check
from data_designer.engine.registry.data_designer_registry import DataDesignerRegistry
from data_designer.engine.resources.resource_provider import ResourceProvider
from data_designer.engine.storage.artifact_storage import (
Expand All @@ -75,12 +76,12 @@

logger = logging.getLogger(__name__)

# Async engine is the default execution path. Set ``DATA_DESIGNER_ASYNC_ENGINE=0``
# to opt back into the legacy sync engine for one transitional release; the sync
# path is scheduled for removal afterwards.
DATA_DESIGNER_ASYNC_ENGINE = os.environ.get("DATA_DESIGNER_ASYNC_ENGINE", "1") == "1"
# The async-engine flag now lives in ``data_designer.engine.flags`` so the
# engine, the public interface, and the readiness module can share one source
# of truth. Always read ``flags.DATA_DESIGNER_ASYNC_ENGINE`` rather than caching
# a local copy so monkeypatches in tests are visible.

if DATA_DESIGNER_ASYNC_ENGINE:
if flags.DATA_DESIGNER_ASYNC_ENGINE:
import asyncio

from data_designer.engine.dataset_builders.async_scheduler import (
Expand Down Expand Up @@ -133,7 +134,7 @@ def __init__(
self._task_traces: list[TaskTrace] = []
self._registry = registry or DataDesignerRegistry()
self._graph: ExecutionGraph | None = None
self._use_async: bool = DATA_DESIGNER_ASYNC_ENGINE
self._use_async: bool = flags.DATA_DESIGNER_ASYNC_ENGINE
# Structured signal: set by _build_async if the scheduler hit early shutdown.
# Stays at defaults for sync-engine and successful async runs. Reset at
# the start of each public run path so reused builder instances don't
Expand Down Expand Up @@ -215,10 +216,6 @@ def single_column_configs(self) -> list[ColumnConfigT]:
def single_column_config_by_name(self) -> dict[str, ColumnConfigT]:
return {config.name: config for config in self.single_column_configs}

@functools.cached_property
def llm_generated_column_configs(self) -> list[ColumnConfigT]:
return [config for config in self.single_column_configs if column_type_is_model_generated(config.column_type)]

def build(
self,
*,
Expand Down Expand Up @@ -255,8 +252,7 @@ def build(
"""
self._reset_run_state()

self._run_model_health_check_if_needed()
self._run_mcp_tool_check_if_needed()
run_readiness_check(self.single_column_configs, self._resource_provider)

# For IF_POSSIBLE and ALWAYS: check config compatibility before touching the artifact
# directory. _check_resume_config_compatibility() must NOT access base_dataset_path
Expand Down Expand Up @@ -326,7 +322,7 @@ def build(
"start a new generation run."
)

self._use_async = DATA_DESIGNER_ASYNC_ENGINE and self._resolve_async_compatibility()
self._use_async = flags.DATA_DESIGNER_ASYNC_ENGINE and self._resolve_async_compatibility()
if self._use_async:
self._build_async(generators, num_records, buffer_size, on_batch_complete, resume=resume)
elif resume == ResumeMode.ALWAYS:
Expand Down Expand Up @@ -589,8 +585,7 @@ def _build_with_resume(

def build_preview(self, *, num_records: int) -> pd.DataFrame:
self._reset_run_state()
self._run_model_health_check_if_needed()
self._run_mcp_tool_check_if_needed()
run_readiness_check(self.single_column_configs, self._resource_provider)

# Set media storage to DATAFRAME mode for preview - base64 stored directly in DataFrame
if self._has_image_columns():
Expand All @@ -599,7 +594,7 @@ def build_preview(self, *, num_records: int) -> pd.DataFrame:
generators, self._graph = self._initialize_generators_and_graph()
start_time = time.perf_counter()

self._use_async = DATA_DESIGNER_ASYNC_ENGINE and self._resolve_async_compatibility()
self._use_async = flags.DATA_DESIGNER_ASYNC_ENGINE and self._resolve_async_compatibility()
if self._use_async:
dataset = self._build_async_preview(generators, num_records)
else:
Expand Down Expand Up @@ -1327,38 +1322,6 @@ def _merge_skipped_and_generated(
batch.append(gen_result)
return batch

def _run_model_health_check_if_needed(self) -> None:
model_aliases: set[str] = set()
for config in self.single_column_configs:
model_aliases.update(config.get_model_aliases())

if not model_aliases:
return

if DATA_DESIGNER_ASYNC_ENGINE:
loop = ensure_async_engine_loop()
future = asyncio.run_coroutine_threadsafe(
self._resource_provider.model_registry.arun_health_check(list(model_aliases)),
loop,
)
try:
future.result(timeout=180)
except TimeoutError:
future.cancel()
raise
else:
self._resource_provider.model_registry.run_health_check(list(model_aliases))

def _run_mcp_tool_check_if_needed(self) -> None:
tool_aliases = sorted(
{config.tool_alias for config in self.llm_generated_column_configs if getattr(config, "tool_alias", None)}
)
if not tool_aliases:
return
if self._resource_provider.mcp_registry is None:
raise DatasetGenerationError(f"Tool alias(es) {tool_aliases!r} specified but no MCPRegistry configured.")
self._resource_provider.mcp_registry.run_health_check(tool_aliases)

def _setup_fan_out(
self,
generator: ColumnGeneratorWithModelRegistry,
Expand Down
19 changes: 19 additions & 0 deletions packages/data-designer-engine/src/data_designer/engine/flags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Engine-wide feature flags read from environment variables.

This module exists so the engine, the public interface, and the readiness
module can share a single source of truth for runtime mode flags without
forming an import cycle. Tests patch values here to flip behavior for a
single test scope.
"""

from __future__ import annotations

import os

# Async engine is the default execution path. Set ``DATA_DESIGNER_ASYNC_ENGINE=0``
# to opt back into the legacy sync engine for one transitional release; the sync
# path is scheduled for removal afterwards.
DATA_DESIGNER_ASYNC_ENGINE: bool = os.environ.get("DATA_DESIGNER_ASYNC_ENGINE", "1") == "1"
123 changes: 123 additions & 0 deletions packages/data-designer-engine/src/data_designer/engine/readiness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""External-readiness checks for a DataDesigner workload.

A "readiness" check is a pre-flight probe of every external resource a
configuration depends on: each referenced model alias is sent a tiny
generation request, and every referenced MCP tool alias is contacted to
confirm its server is reachable.

This module hosts the shared logic invoked from two places:

- ``DatasetBuilder.build`` / ``DatasetBuilder.build_preview`` β€” at the start
of a workload, to fail fast before any expensive work begins.
- ``DataDesigner.check_models`` β€” exposed publicly so users can verify
external dependencies are responsive without triggering a workload.

The two callers must use the same code path here so the standalone method
cannot drift from the workload-startup gate.
"""

from __future__ import annotations

import logging
from collections.abc import Sequence
from typing import TYPE_CHECKING

from data_designer.engine import flags
from data_designer.engine.column_generators.utils.generator_classification import column_type_is_model_generated
from data_designer.engine.dataset_builders.errors import DatasetGenerationError

if TYPE_CHECKING:
from data_designer.config.column_types import ColumnConfigT
from data_designer.engine.resources.resource_provider import ResourceProvider

logger = logging.getLogger(__name__)

# Match the timeout the dataset builder's startup gate has always used.
_MODEL_HEALTH_CHECK_TIMEOUT_SECONDS = 180


def run_readiness_check(
column_configs: Sequence[ColumnConfigT],
resource_provider: ResourceProvider,
) -> None:
"""Probe every model and MCP tool referenced by ``column_configs``.

For each unique model alias collected from the column configs,
``ModelRegistry.run_health_check`` (or ``arun_health_check`` on the async
engine) sends a tiny ``"Hello!"`` generation. Models whose ``ModelConfig``
has ``skip_health_check=True`` are skipped by the registry. After the
model pass, every unique MCP tool alias is probed via
``MCPRegistry.run_health_check``.

Args:
column_configs: The column configs whose ``get_model_aliases()`` and
``tool_alias`` fields determine which aliases are probed.
resource_provider: Provides access to the model registry and MCP
registry. ``mcp_registry`` may be ``None`` only if no tool
aliases are referenced.

Raises:
Typed model errors from ``data_designer.engine.models.errors`` for
any failing model probe.
DatasetGenerationError: If a tool alias is referenced but no MCP
registry is configured on the resource provider.
TimeoutError: If async health-check execution exceeds
``_MODEL_HEALTH_CHECK_TIMEOUT_SECONDS``.
"""
_run_model_health_check(column_configs, resource_provider)
_run_mcp_tool_health_check(column_configs, resource_provider)


def _run_model_health_check(
column_configs: Sequence[ColumnConfigT],
resource_provider: ResourceProvider,
) -> None:
model_aliases: set[str] = set()
for config in column_configs:
model_aliases.update(config.get_model_aliases())

if not model_aliases:
return

if flags.DATA_DESIGNER_ASYNC_ENGINE:
# Defer the async-engine imports to here so users on the legacy sync
# engine never pay the import cost. Mirrors the gating in
# ``dataset_builders.dataset_builder``.
import asyncio

from data_designer.engine.dataset_builders.utils.async_concurrency import ensure_async_engine_loop

loop = ensure_async_engine_loop()
future = asyncio.run_coroutine_threadsafe(
resource_provider.model_registry.arun_health_check(list(model_aliases)),
loop,
)
try:
future.result(timeout=_MODEL_HEALTH_CHECK_TIMEOUT_SECONDS)
except TimeoutError:
future.cancel()
raise
else:
resource_provider.model_registry.run_health_check(list(model_aliases))


def _run_mcp_tool_health_check(
column_configs: Sequence[ColumnConfigT],
resource_provider: ResourceProvider,
) -> None:
# Tool aliases are only meaningful on model-generated column configs.
tool_aliases = sorted(
{
config.tool_alias
for config in column_configs
if column_type_is_model_generated(config.column_type) and getattr(config, "tool_alias", None)
}
)
if not tool_aliases:
return
if resource_provider.mcp_registry is None:
raise DatasetGenerationError(f"Tool alias(es) {tool_aliases!r} specified but no MCPRegistry configured.")
resource_provider.mcp_registry.run_health_check(tool_aliases)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from __future__ import annotations

import os
from typing import TYPE_CHECKING

from data_designer.config.base import ConfigBase
Expand All @@ -13,6 +12,7 @@
from data_designer.config.run_config import RunConfig
from data_designer.config.seed_source import SeedSource
from data_designer.config.utils.type_helpers import StrEnum
from data_designer.engine import flags
from data_designer.engine.mcp.factory import create_mcp_registry
from data_designer.engine.mcp.registry import MCPRegistry
from data_designer.engine.model_provider import (
Expand Down Expand Up @@ -148,9 +148,7 @@ def create_resource_provider(
# default for backward compatibility.
if client_concurrency_mode is None:
client_concurrency_mode = (
ClientConcurrencyMode.ASYNC
if os.environ.get("DATA_DESIGNER_ASYNC_ENGINE", "1") == "1"
else ClientConcurrencyMode.SYNC
ClientConcurrencyMode.ASYNC if flags.DATA_DESIGNER_ASYNC_ENGINE else ClientConcurrencyMode.SYNC
)

effective_run_config = run_config or RunConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,10 @@ def __init__(self, **kwargs: object) -> None:

def test_sync_path_unaffected_by_async_engine_flag() -> None:
"""DATA_DESIGNER_ASYNC_ENGINE=0 keeps the sync path unchanged."""
import data_designer.engine.dataset_builders.dataset_builder as builder_mod
from data_designer.engine import flags

assert hasattr(builder_mod, "DATA_DESIGNER_ASYNC_ENGINE")
assert isinstance(builder_mod.DATA_DESIGNER_ASYNC_ENGINE, bool)
assert hasattr(flags, "DATA_DESIGNER_ASYNC_ENGINE")
assert isinstance(flags.DATA_DESIGNER_ASYNC_ENGINE, bool)


# -- Test execution graph integration with real column configs -----------------
Expand Down
Loading
Loading