Skip to content

feat: wire async task-queue scheduler into ColumnWiseDatasetBuilder#429

Open
andreatgretel wants to merge 22 commits intomainfrom
andreatgretel/feat/async-builder-integration
Open

feat: wire async task-queue scheduler into ColumnWiseDatasetBuilder#429
andreatgretel wants to merge 22 commits intomainfrom
andreatgretel/feat/async-builder-integration

Conversation

@andreatgretel
Copy link
Contributor

@andreatgretel andreatgretel commented Mar 17, 2026

Summary

Final integration PR in the async engine series (Plan #346). Wires the AsyncTaskScheduler and supporting components (built in PRs #356 and #404) into ColumnWiseDatasetBuilder. The async path is gated behind DATA_DESIGNER_ASYNC_ENGINE=1 - the sync path is unchanged.

Closes #346

Changes

Added

  • column_wise_builder.py - _build_async(): builds ExecutionGraph, partitions rows into groups, creates CompletionTracker + RowGroupBufferManager, runs AsyncTaskScheduler on the background loop
  • _validate_async_compatibility(): raises DatasetGenerationError at startup if any column uses allow_resize=True with async engine
  • Pre/post-batch processor callbacks wired into scheduler (on_seeds_complete, on_before_checkpoint)
  • on_batch_complete callback wired through on_row_group_complete for progress notifications
  • task_traces property on ColumnWiseDatasetBuilder and DataDesigner - populated when run_config.async_trace=True
  • run_config.async_trace: bool field for enabling per-task tracing
  • test_async_builder_integration.py: integration tests for multi-column pipelines, checkpoint correctness, allow_resize guard, pre-batch failure, error rate shutdown

Changed

  • async_scheduler.py: sliding window error rate via deque(maxlen=window) (replaces cumulative counters), pre-batch seed-completion tracking, _RowGroupState dataclass consolidation
  • test_async_scheduler.py: expanded coverage for sliding window error rate recovery, out-of-order row group completion, and new callbacks

Refactored

  • Consolidated 5 per-row-group collections (_active_rgs, _admitted_rg_ids, _seeds_dispatched_rgs, _pre_batch_done_rgs, _in_flight_counts) into a single _rg_states: dict[int, _RowGroupState] dataclass - cleanup is now a single del instead of N separate discards, eliminating the class of bugs where one collection is missed during teardown (2114d3b)

Fixed

  • Snapshot dropped rows before await in _run_batch to prevent row-count mismatch when concurrent tasks drop rows during agenerate (c8a823c)
  • Sync tracker on checkpoint failure so dropped rows aren't re-dispatched (c8a823c)
  • Pre-batch row drops now synced from RowGroupBufferManager to CompletionTracker, preventing unnecessary LLM calls on filtered rows (c650a2f)
  • _admitted_rg_ids pruned on checkpoint to prevent unbounded growth (71e7412)
  • disable_early_shutdown wired into AsyncTaskScheduler - was silently ignored in the async path (259828d)

Attention Areas

Reviewers: please pay special attention to the following:

Follow-up


Description updated with AI

@andreatgretel andreatgretel requested a review from a team as a code owner March 17, 2026 21:20
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 17, 2026

Greptile Summary

This is the final integration PR in the async engine series, wiring AsyncTaskScheduler into ColumnWiseDatasetBuilder behind the DATA_DESIGNER_ASYNC_ENGINE=1 flag. The sync path is fully preserved. Key highlights of the implementation are sound: the _RowGroupState dataclass consolidation eliminates the class of teardown bugs from managing five separate per-RG collections, the pre/post-batch callback hooks are correctly ordered and gated, the tracker-buffer sync for pre-batch drops is in place, and the on_checkpoint_complete guard correctly suppresses callbacks for empty row groups.

Key changes:

  • _build_async() in column_wise_builder.py: builds ExecutionGraph, partitions rows into row groups, constructs CompletionTracker + RowGroupBufferManager, and runs AsyncTaskScheduler on the background event loop
  • AsyncTaskScheduler gains error-rate shutdown (shutdown_error_rate, shutdown_error_window, disable_early_shutdown), pre-batch seed-completion tracking, and three new lifecycle callbacks (on_seeds_complete, on_before_checkpoint, on_checkpoint_complete)
  • _RowGroupState dataclass consolidates all per-RG lifecycle state; cleanup is a single del instead of N separate discards
  • RowGroupBufferManager.replace_dataframe() correctly marks trailing buffer slots as dropped when the replacement DataFrame has fewer rows
  • CompletionTracker.is_column_complete_for_rg() public method avoids direct access to private attributes

Issue found:

  • _check_error_rate divides by self._shutdown_error_window without guarding against zero. Since RunConfig.shutdown_error_window allows 0 (via ge=0), a user who sets RunConfig(shutdown_error_window=0) will hit a ZeroDivisionError on the first completed task. A one-line guard or changing ge=0 to ge=1 in the RunConfig field fixes this.

Confidence Score: 4/5

  • Safe to merge — the sync path is fully preserved and the async path is gated behind an env-var flag. One P1 crash bug exists for the edge case shutdown_error_window=0.
  • The implementation is thorough, well-tested, and addresses all previously identified issues from the review thread. The only new issue is a ZeroDivisionError in _check_error_rate when shutdown_error_window=0, which is a valid (though unusual) RunConfig value. This would crash any async run with that setting but does not affect the default configuration or the sync path.
  • packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py — _check_error_rate needs a zero-division guard

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py Central scheduler overhaul: consolidates per-RG lifecycle into _RowGroupState dataclass, adds error-rate shutdown, on_seeds_complete/on_before_checkpoint/on_checkpoint_complete callbacks. ZeroDivisionError when shutdown_error_window=0 (allowed by RunConfig).
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py Wires async path behind DATA_DESIGNER_ASYNC_ENGINE flag; adds _build_async(), _validate_async_compatibility(), and task_traces property. Minor: two nearly-identical log lines are emitted on every async build call.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py Adds is_column_complete_for_rg() public method that checks _batch_complete first then falls back to per-cell check for CELL_BY_CELL columns — correct and safe.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py Adds replace_dataframe() which correctly handles dropped rows and marks trailing active slots as dropped when the replacement DataFrame has fewer rows than the original buffer.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py Adds run_pre_batch_on_df() convenience method that runs PRE_BATCH processors directly on a DataFrame for use in the async path. Clean and minimal change.
packages/data-designer-config/src/data_designer/config/run_config.py Adds async_trace: bool = False field for enabling per-task tracing in the async engine. Safe, backwards-compatible addition.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_concurrency.py Renames _ensure_async_engine_loop to ensure_async_engine_loop (public API) so _build_async can use it. Trivial and correct.
packages/data-designer/src/data_designer/interface/data_designer.py Captures builder.task_traces after generation and passes it into DatasetCreationResults. Clean plumbing change.
packages/data-designer/src/data_designer/interface/results.py Adds optional task_traces parameter to DatasetCreationResults, stored as self.task_traces: list[TaskTrace]. Backwards-compatible with a safe or [] default.
packages/data-designer-engine/tests/engine/dataset_builders/test_async_builder_integration.py New integration test file covering end-to-end async builds, checkpoint correctness, allow_resize guard, and out-of-order row group completion. Good coverage of the main wiring paths.
packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py Expands existing scheduler tests with error-rate shutdown, disable_early_shutdown, sliding-window recovery, and on_before_checkpoint/on_checkpoint_complete callbacks.
packages/data-designer-engine/tests/engine/dataset_builders/utils/test_row_group_buffer.py Adds three unit tests for replace_dataframe (same-size, dropped-row-skip, and trailing-slot-drop cases) — good direct coverage of the new method.
tests_e2e/tests/test_async_engine.py New e2e test that runs a subprocess with DATA_DESIGNER_ASYNC_ENGINE=1 to verify concurrent column dispatch via task traces. Skips cleanly when NVIDIA_API_KEY is absent.

Sequence Diagram

sequenceDiagram
    participant B as ColumnWiseDatasetBuilder._build_async
    participant S as AsyncTaskScheduler.run()
    participant A as _admit_row_groups
    participant D as _dispatch_seeds
    participant T as _execute_seed_task
    participant E as _execute_task (cell/batch)
    participant Ck as _checkpoint_completed_row_groups

    B->>S: asyncio.run_coroutine_threadsafe(scheduler.run(), loop)
    S->>A: create_task(_admit_row_groups)
    loop main dispatch loop
        A-->>S: rg_semaphore.acquire() → _rg_states[rg_id] = _RowGroupState
        S->>D: _dispatch_seeds(rg_id, rg_size)
        D-->>S: seeds_dispatched=True, in_flight_count++
        S->>T: create_task(_execute_seed_task)
        T-->>S: mark_row_range_complete, in_flight_count--
        S->>S: _run_seeds_complete_check
        Note over S: all_seeds_done & in_flight_count==0?
        S-->>B: on_seeds_complete(rg_id, rg_size) [PRE_BATCH processors]
        S->>E: create_task(_execute_task) for cell/batch cols
        E-->>S: mark_cell_complete / mark_row_range_complete
        S->>Ck: _checkpoint_completed_row_groups
        Ck-->>B: on_before_checkpoint(rg_id) [POST_BATCH processors]
        Ck-->>B: on_checkpoint_complete(final_path) [if rows written]
        Ck->>Ck: del _rg_states[rg_id], rg_semaphore.release()
    end
    S-->>B: future.result() returns
Loading

Comments Outside Diff (1)

  1. packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py, line 340-349 (link)

    ZeroDivisionError when shutdown_error_window=0

    RunConfig.shutdown_error_window allows 0 via ge=0. When shutdown_error_window=0:

    • deque(maxlen=0) always ejects every entry immediately — self._recent_outcomes is perpetually empty
    • The early-return guard len(self._recent_outcomes) < self._shutdown_error_window evaluates to 0 < 0 = False, so it does NOT return early
    • errors = sum(...) is 0 (empty deque)
    • 0 / self._shutdown_error_windowZeroDivisionError on the very first completed task

    A user who sets RunConfig(shutdown_error_window=0) (intending "start monitoring immediately") will hit a crash. The fix is a simple early-return guard or tightening the RunConfig validator to ge=1:

    Alternatively, change ge=0 to ge=1 in run_config.py line 42:

    shutdown_error_window: int = Field(default=10, ge=1)
Prompt To Fix All With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 340-349

Comment:
**ZeroDivisionError when `shutdown_error_window=0`**

`RunConfig.shutdown_error_window` allows `0` via `ge=0`. When `shutdown_error_window=0`:
- `deque(maxlen=0)` always ejects every entry immediately — `self._recent_outcomes` is perpetually empty
- The early-return guard `len(self._recent_outcomes) < self._shutdown_error_window` evaluates to `0 < 0 = False`, so it does NOT return early
- `errors = sum(...)` is `0` (empty deque)
- `0 / self._shutdown_error_window`**`ZeroDivisionError`** on the very first completed task

A user who sets `RunConfig(shutdown_error_window=0)` (intending "start monitoring immediately") will hit a crash. The fix is a simple early-return guard or tightening the `RunConfig` validator to `ge=1`:

```suggestion
    def _check_error_rate(self, *, success: bool) -> None:
        """Trigger early shutdown if recent error rate exceeds threshold."""
        if self._disable_early_shutdown or self._shutdown_error_window == 0:
            return
        self._recent_outcomes.append(success)
        if len(self._recent_outcomes) < self._shutdown_error_window:
            return
        errors = sum(1 for ok in self._recent_outcomes if not ok)
        if errors / self._shutdown_error_window > self._shutdown_error_rate:
            self._early_shutdown = True
```

Alternatively, change `ge=0` to `ge=1` in `run_config.py` line 42:
```python
shutdown_error_window: int = Field(default=10, ge=1)
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Line: 169-171

Comment:
**Duplicate log message on every async build**

Two near-identical messages are emitted on every async `build()` call:
- Line 170: `"⚡ Using async task-queue builder"` (in `build()`)
- Line 229: `"⚡ DATA_DESIGNER_ASYNC_ENGINE is enabled - using async task-queue builder"` (in `_build_async()`)

Users will see redundant consecutive log lines. Consider removing one of them (the one at line 229 in `_build_async()` is more descriptive, so the call-site log at line 170 could be dropped).

```suggestion
        if DATA_DESIGNER_ASYNC_ENGINE:
            self._validate_async_compatibility()
            self._build_async(generators, num_records, buffer_size, on_batch_complete)
```

How can I resolve this? If you propose a fix, please make it concise.

Last reviewed commit: "fix: use real time d..."

@andreatgretel
Copy link
Contributor Author

(AR) Warning: on_seeds_complete not evaluated in salvage rounds

packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py:187

What: The seeds-complete check (lines 142-156) only runs inside the main while True loop. Salvage rounds retry deferred seed tasks but never check whether seeds completed, so the pre-batch processor is silently skipped for row groups whose seeds succeed in salvage.

Why: If all seed tasks for a row group are rate-limited on first attempt and succeed in salvage, downstream tasks run and the row group checkpoints without the pre-batch processor having run.

Suggestion:

Extract the seeds-complete check into a helper and call it in _drain_frontier or after salvage seed task completion.

@andreatgretel
Copy link
Contributor Author

(AR) This PR wires the AsyncTaskScheduler into ColumnWiseDatasetBuilder behind DATA_DESIGNER_ASYNC_ENGINE=1, adding _build_async(), pre/post-batch processor callbacks, error rate shutdown, and task trace surfacing. The sync path is unchanged. All 8 changed files pass ruff clean and 22 tests pass (6 integration + 16 scheduler). build_preview() async path (mentioned in the plan) was not implemented in this PR.

The most significant finding is a critical data-loss path in _checkpoint_completed_row_groups: if on_before_checkpoint raises, the row group's buffer is never freed and its data is silently dropped. Two warning-level findings identify scenarios where non-seed tasks can bypass the pre-batch processor barrier (multi-seed pipelines with partial dependencies, and salvage rounds skipping the seeds-complete check), both producing silently incorrect output. The async/sync code path separation is clean and the error rate shutdown mechanism is well-designed.

Verdict: needs-changes — 1 critical, 7 warnings, 4 suggestions.

- Wire on_batch_complete through on_row_group_complete callback
- Mark trailing slots as dropped in replace_dataframe when processor filters rows
- Ensure checkpoint still runs when on_before_checkpoint raises
- Gate non-seed task dispatch on pre-batch completion
- Add public run_pre_batch_on_df to ProcessorRunner (replaces private _run_stage call)
- Add public is_column_complete_for_rg to CompletionTracker (replaces private _completed access)
- Type task_traces as list[TaskTrace] in results.py
- Add async_trace docstring to RunConfig
- Move module-level log into _build_async
- Add replace_dataframe unit tests (same-size, dropped rows, fewer rows)
- Assert on public outcomes in scheduler tests instead of private attributes
- Parametrize allow_resize validation tests
- Cache seed_cols before main loop
- Remove redundant disable_early_shutdown from AsyncTaskScheduler
- Flush completed row groups before breaking on early shutdown (data loss)
- Change error rate check from >= to > so disable_early_shutdown sentinel
  (1.0) never triggers at 100% failure rate
- Extract seeds-complete check into helper and call it in salvage rounds
  via _drain_frontier, with pre-batch gating, so pre-batch processor runs
  even when seed tasks succeed only after retry
- Fix is_column_complete_for_rg to check _batch_complete first, then verify
  all non-dropped rows for CELL_BY_CELL columns
- Replace O(|in-flight|) scan in _in_flight_for_rg with per-RG counter
@andreatgretel
Copy link
Contributor Author

Addressed all remaining open issues in 3facfef:

  1. Early shutdown checkpoint skip (Greptile P2) - _checkpoint_completed_row_groups now called before break on early shutdown
  2. disable_early_shutdown at 100% failure (Greptile P2) - changed >= to > in _check_error_rate so the 1.0 sentinel never triggers
  3. on_seeds_complete skipped in salvage rounds (AR warning) - extracted seeds-complete check into _run_seeds_complete_check helper, called in _drain_frontier with pre-batch gating
  4. is_column_complete_for_rg partial cell correctness (Greptile P2) - now checks _batch_complete first, then verifies all non-dropped rows for CELL_BY_CELL
  5. _in_flight_for_rg O(|in-flight|) (Greptile perf) - replaced with per-RG counter dict for O(1) lookup

All 63 tests in test_async_scheduler, test_async_builder_integration, test_completion_tracker, and test_row_group_buffer pass.

… safely

Pre-batch processors that filter rows marked them as dropped in
RowGroupBufferManager but not in CompletionTracker, causing unnecessary
LLM calls for rows that would be discarded at checkpoint time.

Also wrap the benchmark warmup stderr redirect in try/finally so stderr
is restored if _run_once raises.
andreatgretel and others added 3 commits March 18, 2026 19:24
Prevents unbounded growth of the admission set across large runs.
Dev-time benchmarks and manual test scripts - kept locally, not needed in the PR.
@andreatgretel
Copy link
Contributor Author

Response to Greptile summary (37e3b62):

  • on_before_checkpoint failure writes unprocessed data (P1): Fixed - now drops all rows in the row group instead of checkpointing unprocessed data, matching on_seeds_complete failure behavior.
  • Downstream FULL_COLUMN tasks for fully-dropped row groups (P2): Acknowledged - safe via skip guard, cost is one semaphore slot for one event-loop tick.
  • _run_batch O(rg_size) scan: Acknowledged - fine for current buffer sizes, can add a get_dropped_indices() accessor later if needed.

Avoids unnecessary DataFrame round-trip for every row group in the
common case where no post-batch processors exist.
max_conversation_correction_steps: Maximum number of correction rounds permitted within a
single conversation when generation tasks call `ModelFacade.generate(...)`. Must be >= 0.
Default is 0.
async_trace: If True, collect per-task tracing data when using the async engine
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per-task here may need a little more context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstring already reads "collect per-task tracing data when using the async engine" - lmk if you'd like more detail

return False
return True

def is_column_complete_for_rg(self, column: str, row_group: int) -> bool:
Copy link
Contributor

@nabinchha nabinchha Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: row_group -> row_group_index

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in d7dd2ee

Copy link
Contributor

@nabinchha nabinchha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work on this integration, Andre — the callback wiring and the iterative response to prior feedback are both really well done. Here are my findings from a full read-through via claude.

Findings

Critical — Must fix before merge

None — the prior review's critical findings (silent on_batch_complete drop, replace_dataframe stale data, private attribute access, pre-batch barrier bypass, _run_stage cross-boundary call, bare list typing) have all been addressed in subsequent commits. Great job working through those.

Warnings — Strongly recommend fixing

async_scheduler.py:318 — Redundant seed_cols recomputation in _dispatch_seeds

  • What: _dispatch_seeds recomputes seed_cols from the graph's topological order on every call, but run() already computes seed_cols as a frozenset at the top of the method and passes it through _run_seeds_complete_check and _drain_frontier.
  • Why: Minor inefficiency (re-traverses the graph per row group admission), but more importantly a consistency risk — the two computations use different logic (get_topological_order + filter vs. columns + filter). If the graph ever distinguishes between these, they could diverge silently.
  • Suggestion: Pass the already-computed seed_cols frozenset into _dispatch_seeds instead of recomputing it — should be a quick fix.

async_scheduler.py:284-299_run_seeds_complete_check iterates _active_rgs while callbacks may mutate it indirectly

  • What: The method iterates over self._active_rgs (a list of tuples). The on_seeds_complete callback can raise, causing drop_row calls. While drop_row doesn't directly modify _active_rgs, if a future change to the drop logic triggers a checkpoint (which does remove from _active_rgs), this would cause a mutation-during-iteration bug.
  • Why: Fragile iteration pattern. The current code is safe today, but the coupling is subtle.
  • Suggestion: Iterate over a snapshot: for rg_id, rg_size in list(self._active_rgs): — cheap insurance.

column_wise_builder.py:314-319 — Telemetry exception silently swallowed with bare except Exception: pass

  • What: The telemetry emission block catches all exceptions and discards them silently. This is a pre-existing pattern from the sync path, but it's now replicated in the new async path.
  • Why: If telemetry breaks (e.g., wrong snapshot format after refactoring), there's zero signal — no log, no metric. A logger.debug would cost nothing and save debugging time.
  • Suggestion: Add minimal logging:
    except Exception:
        logger.debug("Failed to emit batch telemetry for async run", exc_info=True)

async_scheduler.py:260-274on_before_checkpoint failure drops all rows including already-generated data

  • What: When on_before_checkpoint raises, the handler drops every row in the row group. The on_before_checkpoint callback runs after all column generation is complete, so this discards fully-generated data. This matches the on_seeds_complete failure pattern (which makes sense there since no downstream work has happened yet), but here the trade-off is different.
  • Why: This is a reasonable design choice — if the post-batch transform is considered mandatory, dropping is correct. Just want to flag it as worth a brief comment explaining the rationale, since it's not immediately obvious why we wouldn't fall back to checkpointing the pre-transform data.
  • Suggestion: A one-line comment above the drop loop would be enough, e.g. # Post-batch is mandatory; drop rather than checkpoint unprocessed data.

column_wise_builder.py:277-280on_before_checkpoint passes rg_id as current_batch_number to run_post_batch

  • What: The closure passes current_batch_number=rg_id. In the sync path, current_batch_number is a sequential 0-based batch index. In the async path, rg_id is also 0-based and sequential, so this works today.
  • Why: If row-group IDs ever become non-sequential (e.g., retried row groups get new IDs), this would silently pass incorrect batch numbers to processors that depend on them.
  • Suggestion: Low urgency — either document that rg_id must remain sequential, or introduce a separate batch counter for the async path.

Suggestions — Consider improving

test_async_engine.py:26-27 — Mutating module-level global for test isolation

  • What: The e2e test directly mutates cwb.DATA_DESIGNER_ASYNC_ENGINE = True and restores it in a finally block.
  • Why: Fragile if tests run concurrently in the same process, and won't restore on KeyboardInterrupt.
  • Suggestion: unittest.mock.patch.object(cwb, "DATA_DESIGNER_ASYNC_ENGINE", True) as a context manager would be safer.

async_scheduler.py:91-92_in_flight_counts can theoretically go negative

  • What: The counter is decremented in the finally block of _execute_task_inner. If a task were somehow processed twice, the counter could go below zero.
  • Why: A negative count would cause _in_flight_for_rg to return False prematurely, potentially triggering _run_seeds_complete_check too early.
  • Suggestion: Add a floor: max(0, self._in_flight_counts.get(task.row_group, 0) - 1). Purely defensive.

column_wise_builder.py:68 — Importing private function _ensure_async_engine_loop

  • What: The builder imports _ensure_async_engine_loop (underscore-prefixed) from async_concurrency.
  • Why: Crosses a module boundary using a private function.
  • Suggestion: Consider renaming to ensure_async_engine_loop (public) or re-exporting from the module's __init__.

test_async_builder_integration.py:108 — Import inside async test function

  • What: test_build_async_end_to_end and test_checkpoint_produces_correct_parquet_calls import AsyncTaskScheduler inside the test body.
  • Why: AGENTS.md prefers module-level imports.
  • Suggestion: Move to module level or use pytest.importorskip. Minor nit.

What Looks Good

  • Thorough callback wiring: The pre-batch and post-batch processor callbacks are cleanly integrated with proper error handling and row-drop synchronization between RowGroupBufferManager and CompletionTracker. The on_seeds_complete / on_before_checkpoint separation is a clean design.

  • Responsive to review feedback: All prior review comments have been addressed in follow-up commits — on_batch_complete wired through, replace_dataframe drop handling, public is_column_complete_for_rg and run_pre_batch_on_df methods, pre-batch barrier gating, _admitted_rg_ids pruning, disable_early_shutdown forwarding. The commit history tells a clear story of iterative improvement.

  • Well-structured tests: The new test files cover the key integration scenarios (multi-column pipeline, checkpoint correctness, allow_resize guard, pre-batch failure, error rate shutdown) with clean setup and meaningful assertions on observable outcomes. The e2e concurrency test using trace interval overlap is a clever verification approach.

Verdict

Ship it (with nits) — The core integration is solid, all prior critical feedback has been addressed, the sync path is untouched, and test coverage is good. The warnings above are mostly about defensive coding and documentation — none are blocking. Nice work getting this across the finish line!

- Gate on_seeds_complete on PRE_BATCH processors (matches on_before_checkpoint pattern)
- Cache seed_cols as instance attr instead of recomputing in _dispatch_seeds
- Iterate list(self._active_rgs) snapshot in _run_seeds_complete_check
- Add logger.debug to telemetry except block
- Add design comment on on_before_checkpoint failure drop behavior
- Rename row_group param to row_group_index in is_column_complete_for_rg
- Document rg_id as current_batch_number equivalence
- Use mock.patch.object in e2e test instead of direct mutation
- Add max(0, ...) floor guard on _in_flight_counts decrement
- Rename _ensure_async_engine_loop to public ensure_async_engine_loop
- Move AsyncTaskScheduler import to module level in integration tests
@andreatgretel
Copy link
Contributor Author

addressed all items from your review in d7dd2ee:

warnings:

  • seed_cols recomputed in _dispatch_seeds -> cached as _seed_cols frozenset in __init__
  • _run_seeds_complete_check iterates _active_rgs directly -> now iterates list(self._active_rgs)
  • telemetry bare except: pass -> added logger.debug with exc_info
  • on_before_checkpoint failure rationale -> added design comment
  • rg_id as current_batch_number -> added doc comment noting the equivalence

suggestions:

  • e2e test direct mutation -> switched to mock.patch.object
  • _in_flight_counts negative -> added max(0, ...) floor
  • private _ensure_async_engine_loop -> renamed to public ensure_async_engine_loop
  • function-level import in tests -> moved AsyncTaskScheduler to module level

@andreatgretel
Copy link
Contributor Author

on_seeds_complete conditional guard fixed in d7dd2ee - now gated on has_processors_for(PRE_BATCH), matching the on_before_checkpoint pattern

nabinchha
nabinchha previously approved these changes Mar 19, 2026
Copy link
Contributor

@nabinchha nabinchha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM — all critical issues from prior rounds have been addressed. Two minor nits that can be addressed in follow-ups:

  1. _seeds_dispatched_rgs and _pre_batch_done_rgs grow unboundedly — same class of issue as the _admitted_rg_ids leak that was already fixed. Should get the same discard treatment in _checkpoint_completed_row_groups:

    self._seeds_dispatched_rgs.discard(rg_id)
    self._pre_batch_done_rgs.discard(rg_id)
  2. on_row_group_complete fires after on_before_checkpoint failure drops all rows — callers get a "batch complete" notification for an empty/dropped row group. Could confuse progress UIs. A guard before the callback would fix it:

    if self._on_row_group_complete and not buffer_manager.is_all_dropped(rg_id):
        self._on_row_group_complete(rg_id)

Neither is a correctness issue — just cleanup.

Replace 5 independent collections (_active_rgs, _admitted_rg_ids,
_seeds_dispatched_rgs, _pre_batch_done_rgs, _in_flight_counts) with a
single _rg_states dict keyed by row group ID. Cleanup is now a single
`del` instead of N separate discards, eliminating the class of bugs
where one collection is missed during row group teardown.
When on_before_checkpoint raises and all rows are dropped, the code
previously fell through to checkpoint_row_group and on_row_group_complete,
sending a spurious progress notification for a batch with zero records.
Now gates both on a `dropped` flag so they are skipped after failure.
@andreatgretel
Copy link
Contributor Author

@nabinchha Both items from your latest review are addressed:

  1. Unbounded _seeds_dispatched_rgs/_pre_batch_done_rgs - Fixed in 2114d3b by consolidating all 5 per-RG collections into a _RowGroupState dataclass. Cleanup is now del self._rg_states[rg_id] - one operation removes everything, so this entire class of bug is eliminated.

  2. on_row_group_complete after on_before_checkpoint failure - Fixed in 166cfff with a dropped flag that gates both checkpoint_row_group and on_row_group_complete.

Comment on lines +285 to +297
dropped = False
try:
if self._buffer_manager is not None:
self._buffer_manager.checkpoint_row_group(rg_id)
if self._on_row_group_complete:
if self._on_before_checkpoint:
try:
self._on_before_checkpoint(rg_id, rg_size)
except Exception:
# Post-batch is mandatory; drop rather than checkpoint unprocessed data.
logger.error(
f"on_before_checkpoint failed for row group {rg_id}, dropping row group.",
exc_info=True,
)
for ri in range(rg_size):
if self._buffer_manager:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 on_before_checkpoint failure drops buffer rows but not tracker rows

When on_before_checkpoint raises (lines 287–297), every row in the row group is marked as dropped in self._buffer_manager but self._tracker.drop_row(rg_id, ri) is never called. The tracker still has those rows marked as complete rather than dropped.

In the current scheduler this is functionally harmless because del self._rg_states[rg_id] runs before the try-block (line 284), so admitted_ids will never include this rg_id again and get_ready_tasks will silently discard any stale frontier entries. However, tracker.is_dropped(rg_id, ri) returns False for rows that were actually discarded, making post-mortem inspection of the tracker (e.g., via task_traces or future diagnostics) misleading.

Syncing the drops to the tracker mirrors what on_seeds_complete already does (lines 272–274) and makes the two failure paths consistent:

except Exception:
    logger.error(
        f"on_before_checkpoint failed for row group {rg_id}, dropping row group.",
        exc_info=True,
    )
    for ri in range(rg_size):
        if self._buffer_manager:
            self._buffer_manager.drop_row(rg_id, ri)
        self._tracker.drop_row(rg_id, ri)   # keep tracker in sync
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 285-297

Comment:
**`on_before_checkpoint` failure drops buffer rows but not tracker rows**

When `on_before_checkpoint` raises (lines 287–297), every row in the row group is marked as dropped in `self._buffer_manager` but `self._tracker.drop_row(rg_id, ri)` is never called. The tracker still has those rows marked as *complete* rather than *dropped*.

In the current scheduler this is functionally harmless because `del self._rg_states[rg_id]` runs before the try-block (line 284), so `admitted_ids` will never include this `rg_id` again and `get_ready_tasks` will silently discard any stale frontier entries. However, `tracker.is_dropped(rg_id, ri)` returns `False` for rows that were actually discarded, making post-mortem inspection of the tracker (e.g., via `task_traces` or future diagnostics) misleading.

Syncing the drops to the tracker mirrors what `on_seeds_complete` already does (lines 272–274) and makes the two failure paths consistent:

```python
except Exception:
    logger.error(
        f"on_before_checkpoint failed for row group {rg_id}, dropping row group.",
        exc_info=True,
    )
    for ri in range(rg_size):
        if self._buffer_manager:
            self._buffer_manager.drop_row(rg_id, ri)
        self._tracker.drop_row(rg_id, ri)   # keep tracker in sync
```

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already fixed in c8a823c - self._tracker.drop_row(rg_id, ri) is now called at line 297, before the buffer drop. both failure paths (seeds_complete and on_before_checkpoint) are consistent.

@nabinchha
Copy link
Contributor

Follow-up: async preview path + high-fidelity logging for async scheduler

Two observations from testing this PR that should be addressed in a follow-up:

1. build_preview() does not use the async scheduler

When DATA_DESIGNER_ASYNC_ENGINE=1 is set, build() correctly routes through _build_async() and the AsyncTaskScheduler, giving us pipelined row-level execution across columns. However, build_preview() still goes through the sequential _run_batch() path — columns are processed one at a time, waiting for all records to complete before starting the next column.

For a recipe pipeline with 7 columns and 3 records, preview took ~52 seconds sequentially, while build processed 20 records (with salvage rounds) in ~3 minutes using the async path. Preview should benefit from the same pipelining — independent columns (e.g., recipe_idea on nvidia and recipe_idea_anthropic on anthropic) could run concurrently, and downstream columns could start as soon as their per-row dependencies are met.

2. Async path has significantly lower logging fidelity than sync

The sequential _run_batch() path (used by preview and sync build) produces high-fidelity per-column progress logs:

⚡️ Processing llm-text column 'recipe_idea' with 4 concurrent workers
⏱️ llm-text column 'recipe_idea' will report progress after each record
  |-- 🐴 llm-text column 'recipe_idea' progress: 1/3 (33%) complete, 1 ok, 0 failed, 1.54 rec/s, eta 1.3s
  |-- 🚗 llm-text column 'recipe_idea' progress: 2/3 (67%) complete, 2 ok, 0 failed, 3.05 rec/s, eta 0.3s
  |-- 🚀 llm-text column 'recipe_idea' progress: 3/3 (100%) complete, 3 ok, 0 failed, 4.56 rec/s, eta 0.0s

The async AsyncTaskScheduler path produces:

⚡ Using async task-queue builder
⚡ DATA_DESIGNER_ASYNC_ENGINE is enabled - using async task-queue builder
[all model configs dumped upfront]
🎲 Preparing samplers to generate 20 records across 1 columns
[~20 second silent gap]
🧩 Generating column `ingredient_count` from expression
Salvage round 1/2: 22 tasks
Salvage round 2/2: 6 tasks

There's no per-column progress, no records/second throughput, no ETA, and no per-record failure warnings during the main generation phase. If something is slow or stuck, you're flying blind.

The CompletionTracker already knows which cells are complete per column, and _execute_task_inner is the natural hook point. A per-column ProgressTracker (or async-aware variant) that aggregates across row groups and logs at interval thresholds would restore parity with the sync path without breaking the pipelining benefits.

Proposed follow-up items

  • Wire build_preview() into _build_async() when DATA_DESIGNER_ASYNC_ENGINE=1
  • Add per-column progress logging to AsyncTaskScheduler (success/failure counts, throughput, ETA)
  • Emit per-record failure warnings from the async path (currently only logged in salvage summary)

nabinchha
nabinchha previously approved these changes Mar 19, 2026
Copy link
Contributor

@nabinchha nabinchha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing this locally, I observed two things, captured in this comment, that I think should be addressed.

…r on checkpoint failure

Two fixes:
- _run_batch: snapshot dropped rows before `await agenerate` so the
  row-count expectation matches batch_df. Concurrent tasks can drop rows
  during the await, causing a spurious ValueError that would drop the
  entire row group. Write-back now re-checks is_dropped to skip rows
  dropped mid-flight.
- _checkpoint_completed_row_groups: add tracker.drop_row alongside
  buffer_manager.drop_row when on_before_checkpoint fails, keeping
  both in sync.
@andreatgretel
Copy link
Contributor Author

Thanks @nabinchha! Solid finds from actually running this thing end to end.

Filed both as follow-ups:

The logging one should be straightforward - ProgressTracker already does the heavy lifting, we just need to hook it into _execute_task_inner. Will get us the emoji progression and rec/s back without reinventing anything.

… test

Replace cumulative error counters with a deque-based sliding window so
that early transient failures do not permanently inflate the error rate
in long-running jobs. Add tests for the sliding window recovery path
and for deterministic out-of-order row group checkpoint ordering.
@andreatgretel
Copy link
Contributor Author

Added just one last commit d3c2712 to be fully compliant with the original plan (I kind of messed up on this one actually, I didn't check the plan after the initial implementation - that's why we had to iterate so much)

asyncio.sleep(0) interleaving is not deterministic across Python
versions. Switch to asyncio.sleep(num_records * 0.02) so the smaller
row group genuinely finishes seeds first regardless of event loop
scheduling.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(engine): async generators and task-queue dataset builder

2 participants