feat: wire async task-queue scheduler into ColumnWiseDatasetBuilder#429
feat: wire async task-queue scheduler into ColumnWiseDatasetBuilder#429andreatgretel wants to merge 22 commits intomainfrom
Conversation
Greptile SummaryThis is the final integration PR in the async engine series, wiring Key changes:
Issue found:
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py | Central scheduler overhaul: consolidates per-RG lifecycle into _RowGroupState dataclass, adds error-rate shutdown, on_seeds_complete/on_before_checkpoint/on_checkpoint_complete callbacks. ZeroDivisionError when shutdown_error_window=0 (allowed by RunConfig). |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py | Wires async path behind DATA_DESIGNER_ASYNC_ENGINE flag; adds _build_async(), _validate_async_compatibility(), and task_traces property. Minor: two nearly-identical log lines are emitted on every async build call. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py | Adds is_column_complete_for_rg() public method that checks _batch_complete first then falls back to per-cell check for CELL_BY_CELL columns — correct and safe. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py | Adds replace_dataframe() which correctly handles dropped rows and marks trailing active slots as dropped when the replacement DataFrame has fewer rows than the original buffer. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py | Adds run_pre_batch_on_df() convenience method that runs PRE_BATCH processors directly on a DataFrame for use in the async path. Clean and minimal change. |
| packages/data-designer-config/src/data_designer/config/run_config.py | Adds async_trace: bool = False field for enabling per-task tracing in the async engine. Safe, backwards-compatible addition. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_concurrency.py | Renames _ensure_async_engine_loop to ensure_async_engine_loop (public API) so _build_async can use it. Trivial and correct. |
| packages/data-designer/src/data_designer/interface/data_designer.py | Captures builder.task_traces after generation and passes it into DatasetCreationResults. Clean plumbing change. |
| packages/data-designer/src/data_designer/interface/results.py | Adds optional task_traces parameter to DatasetCreationResults, stored as self.task_traces: list[TaskTrace]. Backwards-compatible with a safe or [] default. |
| packages/data-designer-engine/tests/engine/dataset_builders/test_async_builder_integration.py | New integration test file covering end-to-end async builds, checkpoint correctness, allow_resize guard, and out-of-order row group completion. Good coverage of the main wiring paths. |
| packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py | Expands existing scheduler tests with error-rate shutdown, disable_early_shutdown, sliding-window recovery, and on_before_checkpoint/on_checkpoint_complete callbacks. |
| packages/data-designer-engine/tests/engine/dataset_builders/utils/test_row_group_buffer.py | Adds three unit tests for replace_dataframe (same-size, dropped-row-skip, and trailing-slot-drop cases) — good direct coverage of the new method. |
| tests_e2e/tests/test_async_engine.py | New e2e test that runs a subprocess with DATA_DESIGNER_ASYNC_ENGINE=1 to verify concurrent column dispatch via task traces. Skips cleanly when NVIDIA_API_KEY is absent. |
Sequence Diagram
sequenceDiagram
participant B as ColumnWiseDatasetBuilder._build_async
participant S as AsyncTaskScheduler.run()
participant A as _admit_row_groups
participant D as _dispatch_seeds
participant T as _execute_seed_task
participant E as _execute_task (cell/batch)
participant Ck as _checkpoint_completed_row_groups
B->>S: asyncio.run_coroutine_threadsafe(scheduler.run(), loop)
S->>A: create_task(_admit_row_groups)
loop main dispatch loop
A-->>S: rg_semaphore.acquire() → _rg_states[rg_id] = _RowGroupState
S->>D: _dispatch_seeds(rg_id, rg_size)
D-->>S: seeds_dispatched=True, in_flight_count++
S->>T: create_task(_execute_seed_task)
T-->>S: mark_row_range_complete, in_flight_count--
S->>S: _run_seeds_complete_check
Note over S: all_seeds_done & in_flight_count==0?
S-->>B: on_seeds_complete(rg_id, rg_size) [PRE_BATCH processors]
S->>E: create_task(_execute_task) for cell/batch cols
E-->>S: mark_cell_complete / mark_row_range_complete
S->>Ck: _checkpoint_completed_row_groups
Ck-->>B: on_before_checkpoint(rg_id) [POST_BATCH processors]
Ck-->>B: on_checkpoint_complete(final_path) [if rows written]
Ck->>Ck: del _rg_states[rg_id], rg_semaphore.release()
end
S-->>B: future.result() returns
Comments Outside Diff (1)
-
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py, line 340-349 (link)ZeroDivisionError when
shutdown_error_window=0RunConfig.shutdown_error_windowallows0viage=0. Whenshutdown_error_window=0:deque(maxlen=0)always ejects every entry immediately —self._recent_outcomesis perpetually empty- The early-return guard
len(self._recent_outcomes) < self._shutdown_error_windowevaluates to0 < 0 = False, so it does NOT return early errors = sum(...)is0(empty deque)0 / self._shutdown_error_window→ZeroDivisionErroron the very first completed task
A user who sets
RunConfig(shutdown_error_window=0)(intending "start monitoring immediately") will hit a crash. The fix is a simple early-return guard or tightening theRunConfigvalidator toge=1:Alternatively, change
ge=0toge=1inrun_config.pyline 42:shutdown_error_window: int = Field(default=10, ge=1)
Prompt To Fix All With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 340-349
Comment:
**ZeroDivisionError when `shutdown_error_window=0`**
`RunConfig.shutdown_error_window` allows `0` via `ge=0`. When `shutdown_error_window=0`:
- `deque(maxlen=0)` always ejects every entry immediately — `self._recent_outcomes` is perpetually empty
- The early-return guard `len(self._recent_outcomes) < self._shutdown_error_window` evaluates to `0 < 0 = False`, so it does NOT return early
- `errors = sum(...)` is `0` (empty deque)
- `0 / self._shutdown_error_window` → **`ZeroDivisionError`** on the very first completed task
A user who sets `RunConfig(shutdown_error_window=0)` (intending "start monitoring immediately") will hit a crash. The fix is a simple early-return guard or tightening the `RunConfig` validator to `ge=1`:
```suggestion
def _check_error_rate(self, *, success: bool) -> None:
"""Trigger early shutdown if recent error rate exceeds threshold."""
if self._disable_early_shutdown or self._shutdown_error_window == 0:
return
self._recent_outcomes.append(success)
if len(self._recent_outcomes) < self._shutdown_error_window:
return
errors = sum(1 for ok in self._recent_outcomes if not ok)
if errors / self._shutdown_error_window > self._shutdown_error_rate:
self._early_shutdown = True
```
Alternatively, change `ge=0` to `ge=1` in `run_config.py` line 42:
```python
shutdown_error_window: int = Field(default=10, ge=1)
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Line: 169-171
Comment:
**Duplicate log message on every async build**
Two near-identical messages are emitted on every async `build()` call:
- Line 170: `"⚡ Using async task-queue builder"` (in `build()`)
- Line 229: `"⚡ DATA_DESIGNER_ASYNC_ENGINE is enabled - using async task-queue builder"` (in `_build_async()`)
Users will see redundant consecutive log lines. Consider removing one of them (the one at line 229 in `_build_async()` is more descriptive, so the call-site log at line 170 could be dropped).
```suggestion
if DATA_DESIGNER_ASYNC_ENGINE:
self._validate_async_compatibility()
self._build_async(generators, num_records, buffer_size, on_batch_complete)
```
How can I resolve this? If you propose a fix, please make it concise.Last reviewed commit: "fix: use real time d..."
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Show resolved
Hide resolved
...ges/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Outdated
Show resolved
Hide resolved
...ges/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py
Show resolved
Hide resolved
packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/tests/engine/dataset_builders/test_async_builder_integration.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Outdated
Show resolved
Hide resolved
|
(AR) Warning: on_seeds_complete not evaluated in salvage rounds
What: The seeds-complete check (lines 142-156) only runs inside the main while True loop. Salvage rounds retry deferred seed tasks but never check whether seeds completed, so the pre-batch processor is silently skipped for row groups whose seeds succeed in salvage. Why: If all seed tasks for a row group are rate-limited on first attempt and succeed in salvage, downstream tasks run and the row group checkpoints without the pre-batch processor having run. Suggestion: |
|
(AR) This PR wires the AsyncTaskScheduler into ColumnWiseDatasetBuilder behind DATA_DESIGNER_ASYNC_ENGINE=1, adding _build_async(), pre/post-batch processor callbacks, error rate shutdown, and task trace surfacing. The sync path is unchanged. All 8 changed files pass ruff clean and 22 tests pass (6 integration + 16 scheduler). build_preview() async path (mentioned in the plan) was not implemented in this PR. The most significant finding is a critical data-loss path in _checkpoint_completed_row_groups: if on_before_checkpoint raises, the row group's buffer is never freed and its data is silently dropped. Two warning-level findings identify scenarios where non-seed tasks can bypass the pre-batch processor barrier (multi-seed pipelines with partial dependencies, and salvage rounds skipping the seeds-complete check), both producing silently incorrect output. The async/sync code path separation is clean and the error rate shutdown mechanism is well-designed. Verdict: needs-changes — 1 critical, 7 warnings, 4 suggestions. |
- Wire on_batch_complete through on_row_group_complete callback - Mark trailing slots as dropped in replace_dataframe when processor filters rows - Ensure checkpoint still runs when on_before_checkpoint raises - Gate non-seed task dispatch on pre-batch completion - Add public run_pre_batch_on_df to ProcessorRunner (replaces private _run_stage call) - Add public is_column_complete_for_rg to CompletionTracker (replaces private _completed access) - Type task_traces as list[TaskTrace] in results.py - Add async_trace docstring to RunConfig - Move module-level log into _build_async - Add replace_dataframe unit tests (same-size, dropped rows, fewer rows) - Assert on public outcomes in scheduler tests instead of private attributes - Parametrize allow_resize validation tests - Cache seed_cols before main loop - Remove redundant disable_early_shutdown from AsyncTaskScheduler
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Show resolved
Hide resolved
...s/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py
Outdated
Show resolved
Hide resolved
- Flush completed row groups before breaking on early shutdown (data loss) - Change error rate check from >= to > so disable_early_shutdown sentinel (1.0) never triggers at 100% failure rate - Extract seeds-complete check into helper and call it in salvage rounds via _drain_frontier, with pre-batch gating, so pre-batch processor runs even when seed tasks succeed only after retry - Fix is_column_complete_for_rg to check _batch_complete first, then verify all non-dropped rows for CELL_BY_CELL columns - Replace O(|in-flight|) scan in _in_flight_for_rg with per-RG counter
|
Addressed all remaining open issues in 3facfef:
All 63 tests in |
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Show resolved
Hide resolved
… safely Pre-batch processors that filter rows marked them as dropped in RowGroupBufferManager but not in CompletionTracker, causing unnecessary LLM calls for rows that would be discarded at checkpoint time. Also wrap the benchmark warmup stderr redirect in try/finally so stderr is restored if _run_once raises.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Outdated
Show resolved
Hide resolved
Prevents unbounded growth of the admission set across large runs.
Dev-time benchmarks and manual test scripts - kept locally, not needed in the PR.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Show resolved
Hide resolved
|
Response to Greptile summary (37e3b62):
|
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Outdated
Show resolved
Hide resolved
Avoids unnecessary DataFrame round-trip for every row group in the common case where no post-batch processors exist.
| max_conversation_correction_steps: Maximum number of correction rounds permitted within a | ||
| single conversation when generation tasks call `ModelFacade.generate(...)`. Must be >= 0. | ||
| Default is 0. | ||
| async_trace: If True, collect per-task tracing data when using the async engine |
There was a problem hiding this comment.
per-task here may need a little more context?
There was a problem hiding this comment.
docstring already reads "collect per-task tracing data when using the async engine" - lmk if you'd like more detail
| return False | ||
| return True | ||
|
|
||
| def is_column_complete_for_rg(self, column: str, row_group: int) -> bool: |
There was a problem hiding this comment.
super nit: row_group -> row_group_index
nabinchha
left a comment
There was a problem hiding this comment.
Nice work on this integration, Andre — the callback wiring and the iterative response to prior feedback are both really well done. Here are my findings from a full read-through via claude.
Findings
Critical — Must fix before merge
None — the prior review's critical findings (silent on_batch_complete drop, replace_dataframe stale data, private attribute access, pre-batch barrier bypass, _run_stage cross-boundary call, bare list typing) have all been addressed in subsequent commits. Great job working through those.
Warnings — Strongly recommend fixing
async_scheduler.py:318 — Redundant seed_cols recomputation in _dispatch_seeds
- What:
_dispatch_seedsrecomputesseed_colsfrom the graph's topological order on every call, butrun()already computesseed_colsas afrozensetat the top of the method and passes it through_run_seeds_complete_checkand_drain_frontier. - Why: Minor inefficiency (re-traverses the graph per row group admission), but more importantly a consistency risk — the two computations use different logic (
get_topological_order+ filter vs.columns+ filter). If the graph ever distinguishes between these, they could diverge silently. - Suggestion: Pass the already-computed
seed_colsfrozenset into_dispatch_seedsinstead of recomputing it — should be a quick fix.
async_scheduler.py:284-299 — _run_seeds_complete_check iterates _active_rgs while callbacks may mutate it indirectly
- What: The method iterates over
self._active_rgs(a list of tuples). Theon_seeds_completecallback can raise, causingdrop_rowcalls. Whiledrop_rowdoesn't directly modify_active_rgs, if a future change to the drop logic triggers a checkpoint (which does remove from_active_rgs), this would cause a mutation-during-iteration bug. - Why: Fragile iteration pattern. The current code is safe today, but the coupling is subtle.
- Suggestion: Iterate over a snapshot:
for rg_id, rg_size in list(self._active_rgs):— cheap insurance.
column_wise_builder.py:314-319 — Telemetry exception silently swallowed with bare except Exception: pass
- What: The telemetry emission block catches all exceptions and discards them silently. This is a pre-existing pattern from the sync path, but it's now replicated in the new async path.
- Why: If telemetry breaks (e.g., wrong snapshot format after refactoring), there's zero signal — no log, no metric. A
logger.debugwould cost nothing and save debugging time. - Suggestion: Add minimal logging:
except Exception: logger.debug("Failed to emit batch telemetry for async run", exc_info=True)
async_scheduler.py:260-274 — on_before_checkpoint failure drops all rows including already-generated data
- What: When
on_before_checkpointraises, the handler drops every row in the row group. Theon_before_checkpointcallback runs after all column generation is complete, so this discards fully-generated data. This matches theon_seeds_completefailure pattern (which makes sense there since no downstream work has happened yet), but here the trade-off is different. - Why: This is a reasonable design choice — if the post-batch transform is considered mandatory, dropping is correct. Just want to flag it as worth a brief comment explaining the rationale, since it's not immediately obvious why we wouldn't fall back to checkpointing the pre-transform data.
- Suggestion: A one-line comment above the drop loop would be enough, e.g.
# Post-batch is mandatory; drop rather than checkpoint unprocessed data.
column_wise_builder.py:277-280 — on_before_checkpoint passes rg_id as current_batch_number to run_post_batch
- What: The closure passes
current_batch_number=rg_id. In the sync path,current_batch_numberis a sequential 0-based batch index. In the async path,rg_idis also 0-based and sequential, so this works today. - Why: If row-group IDs ever become non-sequential (e.g., retried row groups get new IDs), this would silently pass incorrect batch numbers to processors that depend on them.
- Suggestion: Low urgency — either document that
rg_idmust remain sequential, or introduce a separate batch counter for the async path.
Suggestions — Consider improving
test_async_engine.py:26-27 — Mutating module-level global for test isolation
- What: The e2e test directly mutates
cwb.DATA_DESIGNER_ASYNC_ENGINE = Trueand restores it in afinallyblock. - Why: Fragile if tests run concurrently in the same process, and won't restore on
KeyboardInterrupt. - Suggestion:
unittest.mock.patch.object(cwb, "DATA_DESIGNER_ASYNC_ENGINE", True)as a context manager would be safer.
async_scheduler.py:91-92 — _in_flight_counts can theoretically go negative
- What: The counter is decremented in the
finallyblock of_execute_task_inner. If a task were somehow processed twice, the counter could go below zero. - Why: A negative count would cause
_in_flight_for_rgto returnFalseprematurely, potentially triggering_run_seeds_complete_checktoo early. - Suggestion: Add a floor:
max(0, self._in_flight_counts.get(task.row_group, 0) - 1). Purely defensive.
column_wise_builder.py:68 — Importing private function _ensure_async_engine_loop
- What: The builder imports
_ensure_async_engine_loop(underscore-prefixed) fromasync_concurrency. - Why: Crosses a module boundary using a private function.
- Suggestion: Consider renaming to
ensure_async_engine_loop(public) or re-exporting from the module's__init__.
test_async_builder_integration.py:108 — Import inside async test function
- What:
test_build_async_end_to_endandtest_checkpoint_produces_correct_parquet_callsimportAsyncTaskSchedulerinside the test body. - Why: AGENTS.md prefers module-level imports.
- Suggestion: Move to module level or use
pytest.importorskip. Minor nit.
What Looks Good
-
Thorough callback wiring: The pre-batch and post-batch processor callbacks are cleanly integrated with proper error handling and row-drop synchronization between
RowGroupBufferManagerandCompletionTracker. Theon_seeds_complete/on_before_checkpointseparation is a clean design. -
Responsive to review feedback: All prior review comments have been addressed in follow-up commits —
on_batch_completewired through,replace_dataframedrop handling, publicis_column_complete_for_rgandrun_pre_batch_on_dfmethods, pre-batch barrier gating,_admitted_rg_idspruning,disable_early_shutdownforwarding. The commit history tells a clear story of iterative improvement. -
Well-structured tests: The new test files cover the key integration scenarios (multi-column pipeline, checkpoint correctness,
allow_resizeguard, pre-batch failure, error rate shutdown) with clean setup and meaningful assertions on observable outcomes. The e2e concurrency test using trace interval overlap is a clever verification approach.
Verdict
Ship it (with nits) — The core integration is solid, all prior critical feedback has been addressed, the sync path is untouched, and test coverage is good. The warnings above are mostly about defensive coding and documentation — none are blocking. Nice work getting this across the finish line!
- Gate on_seeds_complete on PRE_BATCH processors (matches on_before_checkpoint pattern) - Cache seed_cols as instance attr instead of recomputing in _dispatch_seeds - Iterate list(self._active_rgs) snapshot in _run_seeds_complete_check - Add logger.debug to telemetry except block - Add design comment on on_before_checkpoint failure drop behavior - Rename row_group param to row_group_index in is_column_complete_for_rg - Document rg_id as current_batch_number equivalence - Use mock.patch.object in e2e test instead of direct mutation - Add max(0, ...) floor guard on _in_flight_counts decrement - Rename _ensure_async_engine_loop to public ensure_async_engine_loop - Move AsyncTaskScheduler import to module level in integration tests
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Show resolved
Hide resolved
|
addressed all items from your review in d7dd2ee: warnings:
suggestions:
|
|
|
nabinchha
left a comment
There was a problem hiding this comment.
LGTM — all critical issues from prior rounds have been addressed. Two minor nits that can be addressed in follow-ups:
-
_seeds_dispatched_rgsand_pre_batch_done_rgsgrow unboundedly — same class of issue as the_admitted_rg_idsleak that was already fixed. Should get the samediscardtreatment in_checkpoint_completed_row_groups:self._seeds_dispatched_rgs.discard(rg_id) self._pre_batch_done_rgs.discard(rg_id)
-
on_row_group_completefires afteron_before_checkpointfailure drops all rows — callers get a "batch complete" notification for an empty/dropped row group. Could confuse progress UIs. A guard before the callback would fix it:if self._on_row_group_complete and not buffer_manager.is_all_dropped(rg_id): self._on_row_group_complete(rg_id)
Neither is a correctness issue — just cleanup.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Show resolved
Hide resolved
Replace 5 independent collections (_active_rgs, _admitted_rg_ids, _seeds_dispatched_rgs, _pre_batch_done_rgs, _in_flight_counts) with a single _rg_states dict keyed by row group ID. Cleanup is now a single `del` instead of N separate discards, eliminating the class of bugs where one collection is missed during row group teardown.
When on_before_checkpoint raises and all rows are dropped, the code previously fell through to checkpoint_row_group and on_row_group_complete, sending a spurious progress notification for a batch with zero records. Now gates both on a `dropped` flag so they are skipped after failure.
|
@nabinchha Both items from your latest review are addressed:
|
| dropped = False | ||
| try: | ||
| if self._buffer_manager is not None: | ||
| self._buffer_manager.checkpoint_row_group(rg_id) | ||
| if self._on_row_group_complete: | ||
| if self._on_before_checkpoint: | ||
| try: | ||
| self._on_before_checkpoint(rg_id, rg_size) | ||
| except Exception: | ||
| # Post-batch is mandatory; drop rather than checkpoint unprocessed data. | ||
| logger.error( | ||
| f"on_before_checkpoint failed for row group {rg_id}, dropping row group.", | ||
| exc_info=True, | ||
| ) | ||
| for ri in range(rg_size): | ||
| if self._buffer_manager: |
There was a problem hiding this comment.
on_before_checkpoint failure drops buffer rows but not tracker rows
When on_before_checkpoint raises (lines 287–297), every row in the row group is marked as dropped in self._buffer_manager but self._tracker.drop_row(rg_id, ri) is never called. The tracker still has those rows marked as complete rather than dropped.
In the current scheduler this is functionally harmless because del self._rg_states[rg_id] runs before the try-block (line 284), so admitted_ids will never include this rg_id again and get_ready_tasks will silently discard any stale frontier entries. However, tracker.is_dropped(rg_id, ri) returns False for rows that were actually discarded, making post-mortem inspection of the tracker (e.g., via task_traces or future diagnostics) misleading.
Syncing the drops to the tracker mirrors what on_seeds_complete already does (lines 272–274) and makes the two failure paths consistent:
except Exception:
logger.error(
f"on_before_checkpoint failed for row group {rg_id}, dropping row group.",
exc_info=True,
)
for ri in range(rg_size):
if self._buffer_manager:
self._buffer_manager.drop_row(rg_id, ri)
self._tracker.drop_row(rg_id, ri) # keep tracker in syncPrompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 285-297
Comment:
**`on_before_checkpoint` failure drops buffer rows but not tracker rows**
When `on_before_checkpoint` raises (lines 287–297), every row in the row group is marked as dropped in `self._buffer_manager` but `self._tracker.drop_row(rg_id, ri)` is never called. The tracker still has those rows marked as *complete* rather than *dropped*.
In the current scheduler this is functionally harmless because `del self._rg_states[rg_id]` runs before the try-block (line 284), so `admitted_ids` will never include this `rg_id` again and `get_ready_tasks` will silently discard any stale frontier entries. However, `tracker.is_dropped(rg_id, ri)` returns `False` for rows that were actually discarded, making post-mortem inspection of the tracker (e.g., via `task_traces` or future diagnostics) misleading.
Syncing the drops to the tracker mirrors what `on_seeds_complete` already does (lines 272–274) and makes the two failure paths consistent:
```python
except Exception:
logger.error(
f"on_before_checkpoint failed for row group {rg_id}, dropping row group.",
exc_info=True,
)
for ri in range(rg_size):
if self._buffer_manager:
self._buffer_manager.drop_row(rg_id, ri)
self._tracker.drop_row(rg_id, ri) # keep tracker in sync
```
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
already fixed in c8a823c - self._tracker.drop_row(rg_id, ri) is now called at line 297, before the buffer drop. both failure paths (seeds_complete and on_before_checkpoint) are consistent.
Follow-up: async preview path + high-fidelity logging for async schedulerTwo observations from testing this PR that should be addressed in a follow-up: 1.
|
…r on checkpoint failure Two fixes: - _run_batch: snapshot dropped rows before `await agenerate` so the row-count expectation matches batch_df. Concurrent tasks can drop rows during the await, causing a spurious ValueError that would drop the entire row group. Write-back now re-checks is_dropped to skip rows dropped mid-flight. - _checkpoint_completed_row_groups: add tracker.drop_row alongside buffer_manager.drop_row when on_before_checkpoint fails, keeping both in sync.
|
Thanks @nabinchha! Solid finds from actually running this thing end to end. Filed both as follow-ups:
The logging one should be straightforward - |
… test Replace cumulative error counters with a deque-based sliding window so that early transient failures do not permanently inflate the error rate in long-running jobs. Add tests for the sliding window recovery path and for deterministic out-of-order row group checkpoint ordering.
|
Added just one last commit d3c2712 to be fully compliant with the original plan (I kind of messed up on this one actually, I didn't check the plan after the initial implementation - that's why we had to iterate so much) |
asyncio.sleep(0) interleaving is not deterministic across Python versions. Switch to asyncio.sleep(num_records * 0.02) so the smaller row group genuinely finishes seeds first regardless of event loop scheduling.
Summary
Final integration PR in the async engine series (Plan #346). Wires the
AsyncTaskSchedulerand supporting components (built in PRs #356 and #404) intoColumnWiseDatasetBuilder. The async path is gated behindDATA_DESIGNER_ASYNC_ENGINE=1- the sync path is unchanged.Closes #346
Changes
Added
column_wise_builder.py-_build_async(): buildsExecutionGraph, partitions rows into groups, createsCompletionTracker+RowGroupBufferManager, runsAsyncTaskScheduleron the background loop_validate_async_compatibility(): raisesDatasetGenerationErrorat startup if any column usesallow_resize=Truewith async engineon_seeds_complete,on_before_checkpoint)on_batch_completecallback wired throughon_row_group_completefor progress notificationstask_tracesproperty onColumnWiseDatasetBuilderandDataDesigner- populated whenrun_config.async_trace=Truerun_config.async_trace: boolfield for enabling per-task tracingtest_async_builder_integration.py: integration tests for multi-column pipelines, checkpoint correctness,allow_resizeguard, pre-batch failure, error rate shutdownChanged
async_scheduler.py: sliding window error rate viadeque(maxlen=window)(replaces cumulative counters), pre-batch seed-completion tracking,_RowGroupStatedataclass consolidationtest_async_scheduler.py: expanded coverage for sliding window error rate recovery, out-of-order row group completion, and new callbacksRefactored
_active_rgs,_admitted_rg_ids,_seeds_dispatched_rgs,_pre_batch_done_rgs,_in_flight_counts) into a single_rg_states: dict[int, _RowGroupState]dataclass - cleanup is now a singledelinstead of N separate discards, eliminating the class of bugs where one collection is missed during teardown (2114d3b)Fixed
awaitin_run_batchto prevent row-count mismatch when concurrent tasks drop rows duringagenerate(c8a823c)RowGroupBufferManagertoCompletionTracker, preventing unnecessary LLM calls on filtered rows (c650a2f)_admitted_rg_idspruned on checkpoint to prevent unbounded growth (71e7412)disable_early_shutdownwired intoAsyncTaskScheduler- was silently ignored in the async path (259828d)Attention Areas
column_wise_builder.py-_build_async- main wiring; verify generator map construction forMultiColumnConfigand row group partitioning matches sync path semanticsasync_scheduler.py-_RowGroupState- dataclass consolidating per-RG lifecycle state; all admission/dispatch/checkpoint paths now go through this single dictasync_scheduler.py- sliding window error rate -deque(maxlen=window)replaces cumulative counters so early transient errors don't permanently inflate the error rate in long-running jobsFollow-up
ColumnWiseDatasetBuildertoDatasetBuilderbuild_preview()pathDescription updated with AI