Priority Level
Medium
Task Summary
AsyncTaskScheduler still mixes two responsibilities:
- dependency scheduling for the execution graph
- row-group lifecycle orchestration (
PRE_BATCH, POST_BATCH, checkpointing, preview retention, shutdown handling)
That boundary is the root cause of the remaining async follow-up issues:
- Pre-batch resize is unsafe - the scheduler assumes fixed row-group sizes
- Processor failures are swallowed - callback exceptions become dropped data
- Early shutdown is not terminal -
run() can return before submitted work is drained
- Mode handling leaks into scheduling - preview/build differences are encoded as callbacks and buffer behavior instead of explicit finalization
This ticket should extract lifecycle ownership into a controller and keep AsyncTaskScheduler focused on scheduling tasks for already-prepared row groups.
Scope
In scope:
- Introduce
AsyncRunController
- Move
PRE_BATCH / POST_BATCH / finalization / abort handling out of AsyncTaskScheduler
- Add explicit sinks for build vs preview
- Make processor failures and early shutdown fail the run by default
- Fail fast on
PRE_BATCH row-count changes
Out of scope:
- Full async support for
PRE_BATCH resize
- Tracker reset and dynamic row-group resizing after processor mutation
That should be a separate follow-up ticket once the controller boundary is in place.
Decisions
These should be treated as resolved for this ticket:
PRE_BATCH row-count changes fail fast in this iteration.
- Processor failures fail the entire run by default.
- The controller lives in a new module, not inside
dataset_builder.py.
Technical Details & Implementation Plan
1. Introduce AsyncRunController (dataset_builders/async_run_controller.py)
The controller owns the row-group lifecycle:
- partition row groups
- materialize seed output
- run
PRE_BATCH
- validate finalized row-group size
- hand the prepared row group to
AsyncTaskScheduler
- run
POST_BATCH
- finalize to a sink
Key point: PRE_BATCH happens before the row group enters the main scheduler. That keeps AsyncTaskScheduler fixed-size and avoids mid-run tracker resets.
DatasetBuilder._prepare_async_run() should build the controller once for both build and preview paths.
2. Add explicit sinks
Replace callback-driven finalization with explicit sink objects:
ParquetRowGroupSink for build()
InMemoryRowGroupSink for build_preview()
The controller hands completed row groups to a sink. The scheduler should not infer mode from optional callbacks.
3. Simplify AsyncTaskScheduler
AsyncTaskScheduler should only do dependency scheduling over prepared row groups. Remove lifecycle hooks from its interface:
- remove
on_seeds_complete
- remove
on_before_checkpoint
- remove preview/build-specific finalization behavior
The scheduler should consume: generators, execution graph, tracker, prepared row groups, buffer manager - and return an explicit outcome to the controller.
4. Normalize fatal error semantics
The controller should translate fatal async-engine outcomes into raised errors.
Required behavior:
PRE_BATCH exceptions raise
POST_BATCH exceptions raise
- early shutdown drains or cancels submitted work before returning
- aborted runs do not continue to metadata writing or preview cleanup as if they succeeded
Recommended explicit outcomes: completed, dropped, failed, aborted
5. Add a fail-fast guard for PRE_BATCH resize
Until full resize support exists, PRE_BATCH must not silently truncate rows.
Required behavior for this ticket:
- if
PRE_BATCH changes row count, raise DatasetGenerationError
- do not mutate tracker or buffer sizing to fit the new shape
- add coverage for both build and preview
Files Affected
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_run_controller.py (new)
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py
- tests for controller behavior, failure propagation, and shutdown drain
Likely not needed: completion_tracker.py for dynamic resize support (stay out unless the implementation proves otherwise).
Acceptance Criteria
AsyncTaskScheduler no longer owns processor hooks or mode-specific finalization.
- Build and preview both go through
AsyncRunController.
- Build uses a parquet sink; preview uses an in-memory sink.
PRE_BATCH row-count changes raise instead of truncating data.
- Processor hook failures raise instead of becoming dropped data.
- Early shutdown does not return while submitted tasks are still running.
- Metadata/finalization do not run after an aborted async build.
Tests
Backfill focused coverage for:
- build/preview parity for processor stages
- fail-fast behavior on
PRE_BATCH resize
- processor failure propagation
- early shutdown drain/cancel behavior
- preview buffer lifetime through
InMemoryRowGroupSink
Follow-up Ticket
Create a separate ticket for full async PRE_BATCH resize support:
- allow the controller to finalize a new row-group size after
PRE_BATCH
- rebuild tracker state for the resized row group
- update buffer sizing accordingly
- then admit that resized row group to
AsyncTaskScheduler
Dependencies
Priority Level
Medium
Task Summary
AsyncTaskSchedulerstill mixes two responsibilities:PRE_BATCH,POST_BATCH, checkpointing, preview retention, shutdown handling)That boundary is the root cause of the remaining async follow-up issues:
run()can return before submitted work is drainedThis ticket should extract lifecycle ownership into a controller and keep
AsyncTaskSchedulerfocused on scheduling tasks for already-prepared row groups.Scope
In scope:
AsyncRunControllerPRE_BATCH/POST_BATCH/ finalization / abort handling out ofAsyncTaskSchedulerPRE_BATCHrow-count changesOut of scope:
PRE_BATCHresizeThat should be a separate follow-up ticket once the controller boundary is in place.
Decisions
These should be treated as resolved for this ticket:
PRE_BATCHrow-count changes fail fast in this iteration.dataset_builder.py.Technical Details & Implementation Plan
1. Introduce
AsyncRunController(dataset_builders/async_run_controller.py)The controller owns the row-group lifecycle:
PRE_BATCHAsyncTaskSchedulerPOST_BATCHKey point:
PRE_BATCHhappens before the row group enters the main scheduler. That keepsAsyncTaskSchedulerfixed-size and avoids mid-run tracker resets.DatasetBuilder._prepare_async_run()should build the controller once for both build and preview paths.2. Add explicit sinks
Replace callback-driven finalization with explicit sink objects:
ParquetRowGroupSinkforbuild()InMemoryRowGroupSinkforbuild_preview()The controller hands completed row groups to a sink. The scheduler should not infer mode from optional callbacks.
3. Simplify
AsyncTaskSchedulerAsyncTaskSchedulershould only do dependency scheduling over prepared row groups. Remove lifecycle hooks from its interface:on_seeds_completeon_before_checkpointThe scheduler should consume: generators, execution graph, tracker, prepared row groups, buffer manager - and return an explicit outcome to the controller.
4. Normalize fatal error semantics
The controller should translate fatal async-engine outcomes into raised errors.
Required behavior:
PRE_BATCHexceptions raisePOST_BATCHexceptions raiseRecommended explicit outcomes:
completed,dropped,failed,aborted5. Add a fail-fast guard for
PRE_BATCHresizeUntil full resize support exists,
PRE_BATCHmust not silently truncate rows.Required behavior for this ticket:
PRE_BATCHchanges row count, raiseDatasetGenerationErrorFiles Affected
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.pypackages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.pypackages/data-designer-engine/src/data_designer/engine/dataset_builders/async_run_controller.py(new)packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.pyLikely not needed:
completion_tracker.pyfor dynamic resize support (stay out unless the implementation proves otherwise).Acceptance Criteria
AsyncTaskSchedulerno longer owns processor hooks or mode-specific finalization.AsyncRunController.PRE_BATCHrow-count changes raise instead of truncating data.Tests
Backfill focused coverage for:
PRE_BATCHresizeInMemoryRowGroupSinkFollow-up Ticket
Create a separate ticket for full async
PRE_BATCHresize support:PRE_BATCHAsyncTaskSchedulerDependencies
plans/429-followup/merged-async-pipeline-refactor.mdtasks 3-5