From d2205601fd1b3ebfae346c62b8616c97ab047847 Mon Sep 17 00:00:00 2001 From: Nabin Mulepati Date: Thu, 28 May 2026 09:52:40 -0600 Subject: [PATCH 1/6] fix ordered seed resume offsets Preserve planned row-group start offsets during resume so ordered seed datasets continue from the next seed row instead of replaying already-consumed rows. Fixes #709 --- .../generators/seed_dataset.py | 27 ++++- .../src/data_designer/engine/context.py | 4 + .../dataset_builders/async_scheduler.py | 17 ++- .../dataset_builders/dataset_builder.py | 99 ++++++++++------ .../generators/test_seed_dataset.py | 43 +++++++ .../dataset_builders/test_async_scheduler.py | 43 +++++++ .../dataset_builders/test_dataset_builder.py | 108 +++++++++++++++++- 7 files changed, 299 insertions(+), 42 deletions(-) diff --git a/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py b/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py index 7a3909889..593f186a1 100644 --- a/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py +++ b/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py @@ -10,7 +10,7 @@ from data_designer.config.seed import IndexRange, PartitionBlock, SamplingStrategy from data_designer.engine.column_generators.generators.base import FromScratchColumnGenerator, GenerationStrategy from data_designer.engine.column_generators.utils.errors import SeedDatasetError -from data_designer.engine.context import format_row_group_tag +from data_designer.engine.context import current_row_group_start_offset, format_row_group_tag from data_designer.engine.dataset_builders.multi_column_configs import SeedDatasetMultiColumnConfig from data_designer.engine.processing.utils import concat_datasets from data_designer.logging import LOG_INDENT @@ -43,7 +43,11 @@ def generate_from_scratch(self, num_records: int) -> pd.DataFrame: if num_records <= 0: raise ValueError("πŸ›‘ `num_records` must be positive.") - if self._batch_reader is None: + row_group_start_offset = current_row_group_start_offset.get() + if self.config.sampling_strategy == SamplingStrategy.ORDERED and row_group_start_offset is not None: + self._df_remaining = None + self._reset_batch_reader(num_records, record_offset=row_group_start_offset) + elif self._batch_reader is None: self._reset_batch_reader(num_records) return self._sample_records(num_records) @@ -81,14 +85,29 @@ def _resolve_index_range(self) -> IndexRange | None: index_range = self.config.selection_strategy.to_index_range(self._seed_dataset_size) return index_range - def _reset_batch_reader(self, num_records: int) -> None: + def _reset_batch_reader(self, num_records: int, *, record_offset: int = 0) -> None: shuffle = self.config.sampling_strategy == SamplingStrategy.SHUFFLE self._batch_reader = self.resource_provider.seed_reader.create_batch_reader( batch_size=num_records, - index_range=self._index_range, + index_range=self._index_range_at_offset(record_offset), shuffle=shuffle, ) + def _index_range_at_offset(self, record_offset: int) -> IndexRange | None: + if record_offset <= 0: + return self._index_range + + selected_start = self._index_range.start if self._index_range is not None else 0 + selected_end = self._index_range.end if self._index_range is not None else self._seed_dataset_size - 1 + selected_size = selected_end - selected_start + 1 + if selected_size <= 0: + return self._index_range + + relative_offset = record_offset % selected_size + if relative_offset == 0: + return self._index_range + return IndexRange(start=selected_start + relative_offset, end=selected_end) + def _sample_records(self, num_records: int) -> pd.DataFrame: logger.info(f"🌱 {format_row_group_tag()}Sampling {num_records} records from seed dataset") logger.info(f"{LOG_INDENT}seed dataset size: {self._seed_dataset_size} records") diff --git a/packages/data-designer-engine/src/data_designer/engine/context.py b/packages/data-designer-engine/src/data_designer/engine/context.py index 500b6bb51..002a4b95c 100644 --- a/packages/data-designer-engine/src/data_designer/engine/context.py +++ b/packages/data-designer-engine/src/data_designer/engine/context.py @@ -9,6 +9,10 @@ # Value: (current_rg_index, total_rg_count) or None. current_row_group: ContextVar[tuple[int, int] | None] = ContextVar("current_row_group", default=None) +# Set while generating a row group. The value is the row group's planned start +# offset in the full dataset, including row groups skipped during resume. +current_row_group_start_offset: ContextVar[int | None] = ContextVar("current_row_group_start_offset", default=None) + def format_row_group_tag() -> str: """Return a '(x/X) ' prefix if a row group context is active, else ''.""" diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py index 69d8a3d50..d4376c1b7 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py @@ -25,7 +25,7 @@ RequestAdmissionConfigSnapshot, RowGroupAdmission, ) -from data_designer.engine.context import current_row_group +from data_designer.engine.context import current_row_group, current_row_group_start_offset from data_designer.engine.dataset_builders.errors import DatasetGenerationError from data_designer.engine.dataset_builders.multi_column_configs import MultiColumnConfig from data_designer.engine.dataset_builders.scheduling.completion import CompletionTracker, FrontierDelta @@ -170,6 +170,7 @@ def __init__( progress_bar: bool = False, scheduler_event_sink: SchedulerAdmissionEventSink | None = None, run_id: str | None = None, + row_group_start_offsets: dict[int, int] | None = None, adaptive_row_group_admission: bool = False, adaptive_row_group_initial_target: int = 1, request_pressure_provider: RequestPressureSnapshotProvider | None = None, @@ -288,6 +289,9 @@ def __init__( # Pre-compute row-group sizes for O(1) lookup self._rg_size_map: dict[int, int] = dict(row_groups) + self._rg_start_offset_map: dict[int, int] = row_group_start_offsets or self._build_row_group_start_offsets( + row_groups + ) self._max_concurrent_row_groups = max_concurrent_row_groups self._max_in_flight_tasks = max_in_flight_tasks self._max_model_task_admission = max_model_task_admission @@ -324,6 +328,15 @@ def __init__( self._progress_bar = StickyProgressBar() if progress_bar else None self._reporter = self._setup_async_progress_reporter(num_records, buffer_size, progress_interval) + @staticmethod + def _build_row_group_start_offsets(row_groups: list[tuple[int, int]]) -> dict[int, int]: + offsets: dict[int, int] = {} + next_offset = 0 + for rg_id, rg_size in row_groups: + offsets[rg_id] = next_offset + next_offset += rg_size + return offsets + def _setup_async_progress_reporter( self, num_records: int, @@ -1550,6 +1563,7 @@ async def _execute_task_inner(self, task: Task, lease: TaskAdmissionLease, task_ """Core task execution logic.""" num_rgs = len(self._row_groups) token = current_row_group.set((task.row_group, num_rgs)) + start_offset_token = current_row_group_start_offset.set(self._rg_start_offset_map.get(task.row_group)) group = lease.item.group identity_hash = hashlib.sha1("\0".join(group.key.identity).encode()).hexdigest()[:16] correlation_token = runtime_correlation_provider.set( @@ -1567,6 +1581,7 @@ async def _execute_task_inner(self, task: Task, lease: TaskAdmissionLease, task_ await self._execute_task_inner_impl(task, lease, task_execution_id) finally: runtime_correlation_provider.reset(correlation_token) + current_row_group_start_offset.reset(start_offset_token) current_row_group.reset(token) async def _execute_task_inner_impl(self, task: Task, lease: TaskAdmissionLease, task_execution_id: str) -> None: diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py index 003c5bdad..9275416b1 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py @@ -35,6 +35,7 @@ ) from data_designer.engine.column_generators.utils.generator_classification import column_type_is_model_generated from data_designer.engine.compiler import compile_data_designer_config +from data_designer.engine.context import current_row_group, current_row_group_start_offset from data_designer.engine.dataset_builders.errors import DatasetGenerationError from data_designer.engine.dataset_builders.multi_column_configs import MultiColumnConfig from data_designer.engine.dataset_builders.utils.concurrency import ConcurrentThreadExecutor @@ -583,6 +584,7 @@ def _build_with_resume( group_id=group_id, current_batch_number=batch_idx, on_batch_complete=on_batch_complete, + row_group_start_offset=sum(self.batch_manager.num_records_list[:batch_idx]), ) self.batch_manager.finish() return True @@ -845,6 +847,7 @@ def _build_async( trace_enabled = _is_async_trace_enabled(settings) precomputed_row_groups: list[tuple[int, int]] | None = None + row_group_start_offsets: dict[int, int] | None = None initial_actual_num_records = 0 initial_total_num_batches = 0 original_target = num_records # immutable original target; overridden on resume @@ -890,12 +893,19 @@ def _rg_size(rg_id: int) -> int: f"complete ({initial_actual_num_records} records), skipping them." ) + all_row_group_start_offsets: dict[int, int] = {} + next_offset = 0 + for rg_id in range(total_row_groups): + all_row_group_start_offsets[rg_id] = next_offset + next_offset += _rg_size(rg_id) + # Pre-compute the full row-group list with correct per-group sizes so that # non-aligned skipped groups deduct their actual on-disk record count rather # than buffer_size, keeping extension group sizes accurate. precomputed_row_groups = [ (rg_id, _rg_size(rg_id)) for rg_id in range(total_row_groups) if rg_id not in completed_ids ] + row_group_start_offsets = {rg_id: all_row_group_start_offsets[rg_id] for rg_id, _ in precomputed_row_groups} def finalize_row_group(rg_id: int) -> None: def on_complete(final_path: Path | str | None) -> None: @@ -920,6 +930,7 @@ def on_complete(final_path: Path | str | None) -> None: disable_early_shutdown=settings.disable_early_shutdown, trace=trace_enabled, precomputed_row_groups=precomputed_row_groups, + row_group_start_offsets=row_group_start_offsets, initial_actual_num_records=initial_actual_num_records, initial_total_num_batches=initial_total_num_batches, ) @@ -989,6 +1000,7 @@ def _prepare_async_run( disable_early_shutdown: bool = False, trace: bool = False, precomputed_row_groups: list[tuple[int, int]] | None = None, + row_group_start_offsets: dict[int, int] | None = None, initial_actual_num_records: int = 0, initial_total_num_batches: int = 0, ) -> tuple[AsyncTaskScheduler, RowGroupBufferManager]: @@ -1079,6 +1091,7 @@ def on_before_checkpoint(rg_id: int, rg_size: int) -> None: trace=trace, num_records=num_records, buffer_size=buffer_size, + row_group_start_offsets=row_group_start_offsets, progress_interval=self._resource_provider.run_config.progress_interval, progress_bar=self._resource_provider.run_config.progress_bar, request_pressure_provider=self._resource_provider.model_registry.request_admission, @@ -1127,47 +1140,61 @@ def _run_batch( group_id: str, current_batch_number: int | None = None, on_batch_complete: Callable[[Path], None] | None = None, + row_group_start_offset: int | None = None, ) -> None: + row_group_token = None + start_offset_token = None + if row_group_start_offset is not None: + if current_batch_number is not None: + row_group_token = current_row_group.set((current_batch_number, self.batch_manager.num_batches)) + start_offset_token = current_row_group_start_offset.set(row_group_start_offset) + pre_batch_snapshot = self._resource_provider.model_registry.get_model_usage_snapshot() ran_pre_batch = False - for generator in generators: - generator.log_pre_generation() - try: - generation_strategy = generator.get_generation_strategy() - if generator.can_generate_from_scratch and self.batch_manager.buffer_is_empty: - self._run_from_scratch_column_generator(generator) - # Run PRE_BATCH after seed generator, before other columns - if not ran_pre_batch: - self._processor_runner.run_pre_batch(self.batch_manager) - ran_pre_batch = True - elif generation_strategy == GenerationStrategy.CELL_BY_CELL: - self._run_cell_by_cell_generator(generator) - elif generation_strategy == GenerationStrategy.FULL_COLUMN: - self._run_full_column_generator(generator) - else: - logger.error(f"❌ Unknown generation strategy: {generation_strategy}") - raise DatasetGenerationError(f"πŸ›‘ Unknown generation strategy: {generation_strategy}") - if save_partial_results: - self.batch_manager.write() - except Exception as e: - column_error_str = ( - f"columns {generator.config.column_names}" - if hasattr(generator.config, "column_names") - else f"column {generator.config.name!r}" - ) - raise DatasetGenerationError(f"πŸ›‘ Failed to process {column_error_str}:\n{e}") - try: - usage_deltas = self._resource_provider.model_registry.get_usage_deltas(pre_batch_snapshot) - self._emit_batch_inference_events(batch_mode, usage_deltas, group_id) - except Exception: - pass + for generator in generators: + generator.log_pre_generation() + try: + generation_strategy = generator.get_generation_strategy() + if generator.can_generate_from_scratch and self.batch_manager.buffer_is_empty: + self._run_from_scratch_column_generator(generator) + # Run PRE_BATCH after seed generator, before other columns + if not ran_pre_batch: + self._processor_runner.run_pre_batch(self.batch_manager) + ran_pre_batch = True + elif generation_strategy == GenerationStrategy.CELL_BY_CELL: + self._run_cell_by_cell_generator(generator) + elif generation_strategy == GenerationStrategy.FULL_COLUMN: + self._run_full_column_generator(generator) + else: + logger.error(f"❌ Unknown generation strategy: {generation_strategy}") + raise DatasetGenerationError(f"πŸ›‘ Unknown generation strategy: {generation_strategy}") + if save_partial_results: + self.batch_manager.write() + except Exception as e: + column_error_str = ( + f"columns {generator.config.column_names}" + if hasattr(generator.config, "column_names") + else f"column {generator.config.name!r}" + ) + raise DatasetGenerationError(f"πŸ›‘ Failed to process {column_error_str}:\n{e}") - if current_batch_number is not None: - df_batch = self.batch_manager.get_current_batch(as_dataframe=True) - df_batch = self._processor_runner.run_post_batch(df_batch, current_batch_number=current_batch_number) - self._write_processed_batch(df_batch) - self.batch_manager.finish_batch(on_batch_complete) + try: + usage_deltas = self._resource_provider.model_registry.get_usage_deltas(pre_batch_snapshot) + self._emit_batch_inference_events(batch_mode, usage_deltas, group_id) + except Exception: + pass + + if current_batch_number is not None: + df_batch = self.batch_manager.get_current_batch(as_dataframe=True) + df_batch = self._processor_runner.run_post_batch(df_batch, current_batch_number=current_batch_number) + self._write_processed_batch(df_batch) + self.batch_manager.finish_batch(on_batch_complete) + finally: + if start_offset_token is not None: + current_row_group_start_offset.reset(start_offset_token) + if row_group_token is not None: + current_row_group.reset(row_group_token) def _run_from_scratch_column_generator(self, generator: ColumnGenerator) -> None: df = generator.generate_from_scratch(self.batch_manager.num_records_batch) diff --git a/packages/data-designer-engine/tests/engine/column_generators/generators/test_seed_dataset.py b/packages/data-designer-engine/tests/engine/column_generators/generators/test_seed_dataset.py index 01da14cbd..026d7b13b 100644 --- a/packages/data-designer-engine/tests/engine/column_generators/generators/test_seed_dataset.py +++ b/packages/data-designer-engine/tests/engine/column_generators/generators/test_seed_dataset.py @@ -19,6 +19,7 @@ SeedDatasetColumnGenerator, ) from data_designer.engine.column_generators.utils.errors import SeedDatasetError +from data_designer.engine.context import current_row_group_start_offset from data_designer.engine.dataset_builders.multi_column_configs import SeedDatasetMultiColumnConfig from data_designer.engine.resources.resource_provider import ResourceProvider from data_designer.engine.resources.seed_reader import LocalFileSeedReader @@ -242,6 +243,48 @@ def test_seed_dataset_column_generator_reset_batch_reader_forwards_index_range( assert gen._batch_reader == mock_batch_reader +def test_seed_dataset_column_generator_reset_batch_reader_applies_record_offset( + stub_seed_dataset_generator, +) -> None: + gen = stub_seed_dataset_generator + mock_batch_reader = Mock() + gen._index_range = IndexRange(start=4, end=8) + gen.resource_provider.seed_reader.create_batch_reader.return_value = mock_batch_reader + + gen._reset_batch_reader(100, record_offset=3) + + gen.resource_provider.seed_reader.create_batch_reader.assert_called_once_with( + batch_size=100, + index_range=IndexRange(start=7, end=8), + shuffle=False, + ) + assert gen._batch_reader == mock_batch_reader + + +def test_seed_dataset_column_generator_ordered_generation_uses_row_group_offset( + stub_seed_dataset_generator, +) -> None: + gen = stub_seed_dataset_generator + mock_batch = Mock() + mock_batch.to_pandas.return_value = lazy.pd.DataFrame({"col1": [3]}) + mock_batch_reader = Mock() + mock_batch_reader.read_next_batch.return_value = mock_batch + gen.resource_provider.seed_reader.create_batch_reader.return_value = mock_batch_reader + + token = current_row_group_start_offset.set(3) + try: + result = gen.generate_from_scratch(1) + finally: + current_row_group_start_offset.reset(token) + + assert result["col1"].tolist() == [3] + gen.resource_provider.seed_reader.create_batch_reader.assert_called_once_with( + batch_size=1, + index_range=IndexRange(start=3, end=999), + shuffle=False, + ) + + def test_seed_dataset_column_generator_sample_records_simple(stub_seed_dataset_generator): gen = stub_seed_dataset_generator diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py index a8dd7d9c5..c0d52c136 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py @@ -34,6 +34,7 @@ FromScratchColumnGenerator, ) from data_designer.engine.column_generators.generators.custom import CustomColumnGenerator +from data_designer.engine.context import current_row_group_start_offset from data_designer.engine.dataset_builders.async_scheduler import AsyncTaskScheduler from data_designer.engine.dataset_builders.errors import DatasetGenerationError from data_designer.engine.dataset_builders.scheduling.completion import CompletionTracker, FrontierDelta @@ -563,6 +564,48 @@ async def test_scheduler_multiple_row_groups() -> None: assert tracker.is_row_group_complete(2, 1, ["seed", "cell_out"]) +@pytest.mark.asyncio(loop_scope="session") +async def test_scheduler_sets_row_group_start_offsets_for_generators() -> None: + """Ordered generators can seek by planned row-group offset during async resume.""" + + class OffsetSeedGenerator(FromScratchColumnGenerator[ExpressionColumnConfig]): + @staticmethod + def get_generation_strategy() -> GenerationStrategy: + return GenerationStrategy.FULL_COLUMN + + def generate(self, data: lazy.pd.DataFrame) -> lazy.pd.DataFrame: + return data + + def generate_from_scratch(self, num_records: int) -> lazy.pd.DataFrame: + offset = current_row_group_start_offset.get() + assert offset is not None + return lazy.pd.DataFrame({"seed": list(range(offset, offset + num_records))}) + + provider = _mock_provider() + configs = [SamplerColumnConfig(name="seed", sampler_type=SamplerType.CATEGORY, params={"values": ["A"]})] + strategies = {"seed": GenerationStrategy.FULL_COLUMN} + generators = {"seed": OffsetSeedGenerator(config=_expr_config("seed"), resource_provider=provider)} + row_groups = [(1, 1), (3, 1)] + + graph = ExecutionGraph.create(configs, strategies) + tracker = CompletionTracker.with_graph(graph, row_groups) + storage = _make_storage() + buffer_manager = RowGroupBufferManager(storage) + + scheduler = AsyncTaskScheduler( + generators=generators, + graph=graph, + tracker=tracker, + row_groups=row_groups, + buffer_manager=buffer_manager, + row_group_start_offsets={1: 1, 3: 3}, + ) + await scheduler.run() + + assert buffer_manager.get_row(1, 0)["seed"] == 1 + assert buffer_manager.get_row(3, 0)["seed"] == 3 + + @pytest.mark.asyncio(loop_scope="session") async def test_scheduler_non_retryable_failure_drops_row() -> None: """Non-retryable failure drops the row.""" diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py index bc6328a96..df201e8bc 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py @@ -12,12 +12,18 @@ import data_designer.engine.dataset_builders.dataset_builder as builder_mod import data_designer.lazy_heavy_imports as lazy from data_designer.config.base import SkipConfig -from data_designer.config.column_configs import CustomColumnConfig, LLMTextColumnConfig, SamplerColumnConfig +from data_designer.config.column_configs import ( + CustomColumnConfig, + ExpressionColumnConfig, + LLMTextColumnConfig, + SamplerColumnConfig, +) from data_designer.config.config_builder import DataDesignerConfigBuilder from data_designer.config.custom_column import custom_column_generator from data_designer.config.processors import DropColumnsProcessorConfig from data_designer.config.run_config import RunConfig from data_designer.config.sampler_params import SamplerType, UUIDSamplerParams +from data_designer.config.seed import IndexRange, SamplingStrategy from data_designer.config.seed_source import LocalFileSeedSource from data_designer.config.seed_source_dataframe import DataFrameSeedSource from data_designer.engine.column_generators.generators.base import GenerationStrategy @@ -1595,6 +1601,64 @@ def _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_p ) +def test_build_resume_ordered_seed_dataset_continues_from_next_planned_row(stub_resource_provider, tmp_path): + """Regression for issue #709: resume must not replay ordered seed rows.""" + + class StopAfterFirstBatch(RuntimeError): + pass + + seed_source = DataFrameSeedSource(df=lazy.pd.DataFrame({"name": ["alpha", "beta", "gamma"]})) + seed_reader = DataFrameSeedReader() + seed_reader.attach(seed_source, Mock()) + + config_builder = DataDesignerConfigBuilder() + config_builder.with_seed_dataset( + seed_source, + sampling_strategy=SamplingStrategy.ORDERED, + selection_strategy=IndexRange(start=0, end=2), + ) + config_builder.add_column(ExpressionColumnConfig(name="copy", expr="{{ name }}")) + + storage = _ArtifactStorage(artifact_path=tmp_path, dataset_name="dataset", resume=ResumeMode.NEVER) + stub_resource_provider.artifact_storage = storage + stub_resource_provider.seed_reader = seed_reader + stub_resource_provider.run_config = RunConfig(disable_early_shutdown=True, buffer_size=1) + + builder = DatasetBuilder( + data_designer_config=config_builder.build(), + resource_provider=stub_resource_provider, + ) + + def stop(_path: _Path) -> None: + raise StopAfterFirstBatch("simulated interruption") + + with pytest.raises(StopAfterFirstBatch, match="simulated interruption"): + builder.build(num_records=3, on_batch_complete=stop, resume=ResumeMode.NEVER) + + resumed_seed_reader = DataFrameSeedReader() + resumed_seed_reader.attach(seed_source, Mock()) + stub_resource_provider.seed_reader = resumed_seed_reader + stub_resource_provider.artifact_storage = _ArtifactStorage( + artifact_path=tmp_path, + dataset_name="dataset", + resume=ResumeMode.ALWAYS, + ) + + resumed_builder = DatasetBuilder( + data_designer_config=config_builder.build(), + resource_provider=stub_resource_provider, + ) + + final_path = resumed_builder.build(num_records=3, resume=ResumeMode.ALWAYS) + result = lazy.pd.concat( + [lazy.pd.read_parquet(path) for path in sorted(final_path.glob("batch_*.parquet"))], + ignore_index=True, + ) + + assert result["name"].tolist() == ["alpha", "beta", "gamma"] + assert result["copy"].tolist() == ["alpha", "beta", "gamma"] + + def test_build_resume_starts_fresh_without_metadata(stub_resource_provider, stub_test_config_builder, tmp_path, caplog): """resume=True when only the folder exists (no metadata.json) logs an info message and starts fresh. @@ -2199,6 +2263,48 @@ def capturing_prepare(*args, **kwargs): assert captured["initial_total_num_batches"] == 2 +def test_build_async_resume_passes_planned_offsets_for_remaining_row_groups( + stub_resource_provider, + stub_test_config_builder, + tmp_path, +): + """Async resume tells generators each remaining row group's original planned start offset.""" + import asyncio as stdlib_asyncio + + dataset_dir = tmp_path / "dataset" + _write_metadata(dataset_dir, target_num_records=4, buffer_size=1, num_completed_batches=2, actual_num_records=2) + _write_parquet_files(dataset_dir / "parquet-files", [0, 2], row_counts={0: 1, 2: 1}) + + builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, buffer_size=1) + captured: dict = {} + + def capturing_prepare(*args, **kwargs): + captured["precomputed_row_groups"] = kwargs.get("precomputed_row_groups") + captured["row_group_start_offsets"] = kwargs.get("row_group_start_offsets") + mock_scheduler = Mock() + mock_scheduler.traces = [] + mock_scheduler.early_shutdown = False + mock_scheduler.partial_row_groups = () + mock_scheduler.first_non_retryable_error = None + mock_buffer_manager = Mock() + mock_buffer_manager.actual_num_records = 4 + return mock_scheduler, mock_buffer_manager + + mock_future = Mock() + mock_future.result = Mock(return_value=None) + + with patch.object(builder_mod, "DATA_DESIGNER_ASYNC_ENGINE", True): + with patch.object(builder_mod, "asyncio", stdlib_asyncio, create=True): + with patch.object(builder_mod, "ensure_async_engine_loop", Mock(return_value=Mock()), create=True): + with patch.object(stdlib_asyncio, "run_coroutine_threadsafe", return_value=mock_future): + with patch.object(builder, "_run_model_health_check_if_needed"): + with patch.object(builder, "_prepare_async_run", side_effect=capturing_prepare): + builder.build(num_records=4, resume=ResumeMode.ALWAYS) + + assert captured["precomputed_row_groups"] == [(1, 1), (3, 1)] + assert captured["row_group_start_offsets"] == {1: 1, 3: 3} + + def test_initial_actual_num_records_uses_actual_parquet_rows_for_partial_row_group( stub_resource_provider, stub_test_config_builder, tmp_path ): From 9783618a984c7af6fa6e5126bb934b348a7f9abb Mon Sep 17 00:00:00 2001 From: Nabin Mulepati Date: Thu, 28 May 2026 09:55:03 -0600 Subject: [PATCH 2/6] simplify async resume offset planning test --- .../dataset_builders/dataset_builder.py | 76 +++--- .../dataset_builders/test_dataset_builder.py | 238 +++++++----------- 2 files changed, 144 insertions(+), 170 deletions(-) diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py index 9275416b1..0a12e863c 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py @@ -119,6 +119,44 @@ class _ResumeState: completed_row_groups: dict[int, int] +@dataclass +class RowGroupResumePlan: + total_row_groups: int + remaining_row_groups: list[tuple[int, int]] + row_group_start_offsets: dict[int, int] + + +def build_row_group_resume_plan( + *, + original_target: int, + num_records: int, + buffer_size: int, + completed_ids: set[int], +) -> RowGroupResumePlan: + num_original_groups = -(-original_target // buffer_size) + extension_records = num_records - original_target + total_row_groups = num_original_groups + -(-extension_records // buffer_size) + + def _rg_size(rg_id: int) -> int: + if rg_id < num_original_groups: + return min(buffer_size, original_target - rg_id * buffer_size) + ext_group_idx = rg_id - num_original_groups + return min(buffer_size, extension_records - ext_group_idx * buffer_size) + + all_start_offsets: dict[int, int] = {} + next_offset = 0 + for rg_id in range(total_row_groups): + all_start_offsets[rg_id] = next_offset + next_offset += _rg_size(rg_id) + + remaining_row_groups = [(rg_id, _rg_size(rg_id)) for rg_id in range(total_row_groups) if rg_id not in completed_ids] + return RowGroupResumePlan( + total_row_groups=total_row_groups, + remaining_row_groups=remaining_row_groups, + row_group_start_offsets={rg_id: all_start_offsets[rg_id] for rg_id, _ in remaining_row_groups}, + ) + + class DatasetBuilder: def __init__( self, @@ -866,22 +904,15 @@ def _build_async( # non-aligned run gets its true size, not buffer_size. original_target = state.original_target_num_records - num_original_groups = -(-original_target // buffer_size) # ceil(original_target/buffer_size) - - def _rg_size(rg_id: int) -> int: - if rg_id < num_original_groups: - return min(buffer_size, original_target - rg_id * buffer_size) - ext_group_idx = rg_id - num_original_groups - return min(buffer_size, (num_records - original_target) - ext_group_idx * buffer_size) - self.artifact_storage.clear_partial_results() - # Original groups are immutable; any extension always needs new groups beyond - # num_original_groups β€” ceil(num_records/bs) gives the wrong count when the - # original run was non-aligned and the extension fits in the last group's slack. - extension_records = num_records - original_target - total_row_groups = num_original_groups + -(-extension_records // buffer_size) - if len(completed_ids) >= total_row_groups: + resume_plan = build_row_group_resume_plan( + original_target=original_target, + num_records=num_records, + buffer_size=buffer_size, + completed_ids=completed_ids, + ) + if len(completed_ids) >= resume_plan.total_row_groups: logger.warning( "⚠️ Dataset is already complete β€” all row groups were found in the existing artifact " "directory. Nothing to resume. Use resume=ResumeMode.NEVER if you want to generate a new dataset." @@ -889,23 +920,12 @@ def _rg_size(rg_id: int) -> int: return False logger.info( - f"▢️ Resuming async run: {len(completed_ids)} of {total_row_groups} row group(s) already " + f"▢️ Resuming async run: {len(completed_ids)} of {resume_plan.total_row_groups} row group(s) already " f"complete ({initial_actual_num_records} records), skipping them." ) - all_row_group_start_offsets: dict[int, int] = {} - next_offset = 0 - for rg_id in range(total_row_groups): - all_row_group_start_offsets[rg_id] = next_offset - next_offset += _rg_size(rg_id) - - # Pre-compute the full row-group list with correct per-group sizes so that - # non-aligned skipped groups deduct their actual on-disk record count rather - # than buffer_size, keeping extension group sizes accurate. - precomputed_row_groups = [ - (rg_id, _rg_size(rg_id)) for rg_id in range(total_row_groups) if rg_id not in completed_ids - ] - row_group_start_offsets = {rg_id: all_row_group_start_offsets[rg_id] for rg_id, _ in precomputed_row_groups} + precomputed_row_groups = resume_plan.remaining_row_groups + row_group_start_offsets = resume_plan.row_group_start_offsets def finalize_row_group(rg_id: int) -> None: def on_complete(final_path: Path | str | None) -> None: diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py index df201e8bc..207f40961 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py @@ -3,7 +3,9 @@ from __future__ import annotations +import json import logging +from pathlib import Path from typing import TYPE_CHECKING from unittest.mock import Mock, patch @@ -27,7 +29,7 @@ from data_designer.config.seed_source import LocalFileSeedSource from data_designer.config.seed_source_dataframe import DataFrameSeedSource from data_designer.engine.column_generators.generators.base import GenerationStrategy -from data_designer.engine.dataset_builders.dataset_builder import DatasetBuilder, _ConfigCompatibility +from data_designer.engine.dataset_builders.dataset_builder import DatasetBuilder, build_row_group_resume_plan from data_designer.engine.dataset_builders.errors import DatasetGenerationError, DatasetProcessingError from data_designer.engine.models.errors import ( FormattedLLMErrorMessage, @@ -39,7 +41,7 @@ from data_designer.engine.processing.processors.base import Processor from data_designer.engine.registry.data_designer_registry import DataDesignerRegistry from data_designer.engine.resources.seed_reader import DataFrameSeedReader -from data_designer.engine.storage.artifact_storage import ResumeMode +from data_designer.engine.storage.artifact_storage import ArtifactStorage, ResumeMode if TYPE_CHECKING: import pandas as pd @@ -1067,7 +1069,7 @@ def test_resume_rejects_allow_resize_columns(stub_resource_provider, stub_model_ actual_num_records=2, ) - stub_resource_provider.artifact_storage = _ArtifactStorage(artifact_path=artifact_path, resume=ResumeMode.ALWAYS) + stub_resource_provider.artifact_storage = ArtifactStorage(artifact_path=artifact_path, resume=ResumeMode.ALWAYS) columns = _resize_columns("cell_x2") builder = _build_resize_builder(stub_resource_provider, stub_model_configs, seed_data_setup, columns) @@ -1097,13 +1099,16 @@ def test_if_possible_allows_allow_resize_when_config_is_incompatible( sentinel = dataset_dir / "important_file.txt" sentinel.write_text("precious data") - storage = _ArtifactStorage(artifact_path=tmp_path, resume=ResumeMode.IF_POSSIBLE) + storage = ArtifactStorage(artifact_path=tmp_path, resume=ResumeMode.IF_POSSIBLE) stub_resource_provider.artifact_storage = storage columns = _resize_columns("cell_x2") builder = _build_resize_builder(stub_resource_provider, stub_model_configs, seed_data_setup, columns) + _write_incompatible_config_metadata( + dataset_dir, + builder.data_designer_config.fingerprint()["config_hash_version"], + ) - with patch.object(builder, "_check_resume_config_compatibility", return_value=_ConfigCompatibility.INCOMPATIBLE): - final_path = builder.build(num_records=5, resume=ResumeMode.IF_POSSIBLE) + final_path = builder.build(num_records=5, resume=ResumeMode.IF_POSSIBLE) assert storage.resume == ResumeMode.NEVER assert sentinel.exists() @@ -1559,22 +1564,23 @@ def test_skip_row_count_preserved_across_pipeline(stub_resource_provider, stub_m # --------------------------------------------------------------------------- -import json as _json -from pathlib import Path as _Path - -from data_designer.engine.storage.artifact_storage import ArtifactStorage as _ArtifactStorage - - -def _write_metadata(dataset_dir: _Path, **fields) -> None: +def _write_metadata(dataset_dir: Path, **fields) -> None: """Write a metadata.json into an existing dataset folder.""" dataset_dir.mkdir(parents=True, exist_ok=True) (dataset_dir / "sentinel.txt").write_text("x") # make folder non-empty for resolved_dataset_name - (dataset_dir / "metadata.json").write_text(_json.dumps(fields)) + (dataset_dir / "metadata.json").write_text(json.dumps(fields)) -def _write_parquet_files( - parquet_dir: _Path, row_group_ids: list[int], row_counts: dict[int, int] | None = None -) -> None: +def _write_incompatible_config_metadata(dataset_dir: Path, config_hash_version: str, **fields) -> None: + _write_metadata( + dataset_dir, + **fields, + config_hash="different-config", + config_hash_version=config_hash_version, + ) + + +def _write_parquet_files(parquet_dir: Path, row_group_ids: list[int], row_counts: dict[int, int] | None = None) -> None: """Create batch_*.parquet files for the given row group IDs. Both engines now derive ``num_completed_batches`` and ``actual_num_records`` from @@ -1592,7 +1598,7 @@ def _write_parquet_files( def _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, *, buffer_size: int = 2): """Return a DatasetBuilder whose ArtifactStorage has resume=ResumeMode.ALWAYS.""" - storage = _ArtifactStorage(artifact_path=tmp_path, resume=ResumeMode.ALWAYS) + storage = ArtifactStorage(artifact_path=tmp_path, resume=ResumeMode.ALWAYS) stub_resource_provider.artifact_storage = storage stub_resource_provider.run_config = RunConfig(buffer_size=buffer_size) return DatasetBuilder( @@ -1601,6 +1607,30 @@ def _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_p ) +def _make_sampler_only_builder( + stub_resource_provider: Mock, + tmp_path: Path, + *, + resume: ResumeMode = ResumeMode.IF_POSSIBLE, +) -> tuple[DatasetBuilder, ArtifactStorage]: + """Create a builder that can run end-to-end without model or MCP stubs.""" + storage = ArtifactStorage(artifact_path=tmp_path, resume=resume) + stub_resource_provider.artifact_storage = storage + stub_resource_provider.run_config = RunConfig(buffer_size=2) + + config_builder = DataDesignerConfigBuilder() + config_builder.add_column( + SamplerColumnConfig(name="some_id", sampler_type=SamplerType.UUID, params=UUIDSamplerParams()) + ) + return ( + DatasetBuilder( + data_designer_config=config_builder.build(), + resource_provider=stub_resource_provider, + ), + storage, + ) + + def test_build_resume_ordered_seed_dataset_continues_from_next_planned_row(stub_resource_provider, tmp_path): """Regression for issue #709: resume must not replay ordered seed rows.""" @@ -1619,7 +1649,7 @@ class StopAfterFirstBatch(RuntimeError): ) config_builder.add_column(ExpressionColumnConfig(name="copy", expr="{{ name }}")) - storage = _ArtifactStorage(artifact_path=tmp_path, dataset_name="dataset", resume=ResumeMode.NEVER) + storage = ArtifactStorage(artifact_path=tmp_path, dataset_name="dataset", resume=ResumeMode.NEVER) stub_resource_provider.artifact_storage = storage stub_resource_provider.seed_reader = seed_reader stub_resource_provider.run_config = RunConfig(disable_early_shutdown=True, buffer_size=1) @@ -1629,7 +1659,7 @@ class StopAfterFirstBatch(RuntimeError): resource_provider=stub_resource_provider, ) - def stop(_path: _Path) -> None: + def stop(_path: Path) -> None: raise StopAfterFirstBatch("simulated interruption") with pytest.raises(StopAfterFirstBatch, match="simulated interruption"): @@ -1638,7 +1668,7 @@ def stop(_path: _Path) -> None: resumed_seed_reader = DataFrameSeedReader() resumed_seed_reader.attach(seed_source, Mock()) stub_resource_provider.seed_reader = resumed_seed_reader - stub_resource_provider.artifact_storage = _ArtifactStorage( + stub_resource_provider.artifact_storage = ArtifactStorage( artifact_path=tmp_path, dataset_name="dataset", resume=ResumeMode.ALWAYS, @@ -1797,7 +1827,7 @@ def test_build_if_possible_starts_fresh_on_dropped_column_artifact_policy_mismat preserve_dropped_columns=True, ) - storage = _ArtifactStorage(artifact_path=tmp_path, resume=ResumeMode.IF_POSSIBLE) + storage = ArtifactStorage(artifact_path=tmp_path, resume=ResumeMode.IF_POSSIBLE) stub_resource_provider.artifact_storage = storage stub_resource_provider.run_config = RunConfig(buffer_size=2, preserve_dropped_columns=False) builder = DatasetBuilder( @@ -1830,12 +1860,17 @@ def test_build_resume_raises_on_corrupt_metadata(stub_resource_provider, stub_te def test_build_resume_always_raises_on_config_mismatch(stub_resource_provider, stub_test_config_builder, tmp_path): """resume=ALWAYS raises DatasetGenerationError when the stored config fingerprint differs.""" dataset_dir = tmp_path / "dataset" - _write_metadata(dataset_dir, target_num_records=4, buffer_size=2, num_completed_batches=1, actual_num_records=2) - + _write_incompatible_config_metadata( + dataset_dir, + stub_test_config_builder.build().fingerprint()["config_hash_version"], + target_num_records=4, + buffer_size=2, + num_completed_batches=1, + actual_num_records=2, + ) builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path) - with patch.object(builder, "_check_resume_config_compatibility", return_value=_ConfigCompatibility.INCOMPATIBLE): - with pytest.raises(DatasetGenerationError, match="does not match the config used"): - builder.build(num_records=4, resume=ResumeMode.ALWAYS) + with pytest.raises(DatasetGenerationError, match="does not match the config used"): + builder.build(num_records=4, resume=ResumeMode.ALWAYS) def test_build_resume_logs_warning_when_already_complete( @@ -1980,7 +2015,7 @@ def test_build_marks_post_generation_started_before_running_processors( with pytest.raises(RuntimeError, match="boom"): builder.build(num_records=4, resume=ResumeMode.ALWAYS) - metadata = _json.loads((dataset_dir / "metadata.json").read_text()) + metadata = json.loads((dataset_dir / "metadata.json").read_text()) assert metadata["post_generation_state"] == "started" assert metadata["post_generation_processed"] is False @@ -2015,7 +2050,7 @@ def test_build_resume_complete_dataset_runs_after_generation_when_no_marker( builder.build(num_records=4, resume=ResumeMode.ALWAYS) after_gen_processor.process_after_generation.assert_called_once() - metadata = _json.loads((dataset_dir / "metadata.json").read_text()) + metadata = json.loads((dataset_dir / "metadata.json").read_text()) assert metadata["post_generation_state"] == "complete" assert metadata["post_generation_processed"] is True @@ -2263,46 +2298,18 @@ def capturing_prepare(*args, **kwargs): assert captured["initial_total_num_batches"] == 2 -def test_build_async_resume_passes_planned_offsets_for_remaining_row_groups( - stub_resource_provider, - stub_test_config_builder, - tmp_path, -): - """Async resume tells generators each remaining row group's original planned start offset.""" - import asyncio as stdlib_asyncio - - dataset_dir = tmp_path / "dataset" - _write_metadata(dataset_dir, target_num_records=4, buffer_size=1, num_completed_batches=2, actual_num_records=2) - _write_parquet_files(dataset_dir / "parquet-files", [0, 2], row_counts={0: 1, 2: 1}) - - builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, buffer_size=1) - captured: dict = {} - - def capturing_prepare(*args, **kwargs): - captured["precomputed_row_groups"] = kwargs.get("precomputed_row_groups") - captured["row_group_start_offsets"] = kwargs.get("row_group_start_offsets") - mock_scheduler = Mock() - mock_scheduler.traces = [] - mock_scheduler.early_shutdown = False - mock_scheduler.partial_row_groups = () - mock_scheduler.first_non_retryable_error = None - mock_buffer_manager = Mock() - mock_buffer_manager.actual_num_records = 4 - return mock_scheduler, mock_buffer_manager - - mock_future = Mock() - mock_future.result = Mock(return_value=None) - - with patch.object(builder_mod, "DATA_DESIGNER_ASYNC_ENGINE", True): - with patch.object(builder_mod, "asyncio", stdlib_asyncio, create=True): - with patch.object(builder_mod, "ensure_async_engine_loop", Mock(return_value=Mock()), create=True): - with patch.object(stdlib_asyncio, "run_coroutine_threadsafe", return_value=mock_future): - with patch.object(builder, "_run_model_health_check_if_needed"): - with patch.object(builder, "_prepare_async_run", side_effect=capturing_prepare): - builder.build(num_records=4, resume=ResumeMode.ALWAYS) +def test_row_group_resume_plan_keeps_original_offsets_for_remaining_groups() -> None: + """Async resume uses these planned offsets when completed row-group IDs have holes.""" + plan = build_row_group_resume_plan( + original_target=4, + num_records=4, + buffer_size=1, + completed_ids={0, 2}, + ) - assert captured["precomputed_row_groups"] == [(1, 1), (3, 1)] - assert captured["row_group_start_offsets"] == {1: 1, 3: 3} + assert plan.total_row_groups == 4 + assert plan.remaining_row_groups == [(1, 1), (3, 1)] + assert plan.row_group_start_offsets == {1: 1, 3: 3} def test_initial_actual_num_records_uses_actual_parquet_rows_for_partial_row_group( @@ -2619,9 +2626,7 @@ def capturing_prepare(*args, **kwargs): mock_prepare.assert_called_once() -def test_if_possible_incompatible_config_does_not_overwrite_existing_dataset( - stub_resource_provider, stub_test_config_builder, tmp_path -): +def test_if_possible_incompatible_config_does_not_overwrite_existing_dataset(stub_resource_provider, tmp_path): """IF_POSSIBLE + incompatible config must NOT resolve to the existing dataset directory. Bug: _check_resume_config_compatibility() used base_dataset_path, triggering the @@ -2638,39 +2643,25 @@ def test_if_possible_incompatible_config_does_not_overwrite_existing_dataset( sentinel = dataset_dir / "important_file.txt" sentinel.write_text("precious data") - storage = _ArtifactStorage(artifact_path=tmp_path, resume=ResumeMode.IF_POSSIBLE) - stub_resource_provider.artifact_storage = storage - - builder = DatasetBuilder( - data_designer_config=stub_test_config_builder.build(), - resource_provider=stub_resource_provider, + builder, storage = _make_sampler_only_builder(stub_resource_provider, tmp_path) + _write_incompatible_config_metadata( + dataset_dir, + builder.data_designer_config.fingerprint()["config_hash_version"], ) - # Simulate incompatible config and mock out all I/O so build() does not actually generate data - with patch.object(builder, "_check_resume_config_compatibility", return_value=_ConfigCompatibility.INCOMPATIBLE): - with patch.object(builder, "_run_model_health_check_if_needed"): - with patch.object(builder, "_run_mcp_tool_check_if_needed"): - with patch.object(builder, "_write_builder_config"): - with patch.object(builder, "_initialize_generators_and_graph", return_value=([], None)): - with patch.object(builder.batch_manager, "start"): - with patch.object(builder.batch_manager, "finish"): - with patch.object(builder._processor_runner, "run_after_generation"): - builder.build(num_records=2, resume=ResumeMode.IF_POSSIBLE) + final_path = builder.build(num_records=2, resume=ResumeMode.IF_POSSIBLE) # artifact_storage.resume must be downgraded to NEVER so resolved_dataset_name uses NEVER semantics assert storage.resume == ResumeMode.NEVER - # resolved_dataset_name has not been cached yet (compat check bypassed base_dataset_path, - # _write_builder_config was mocked). Accessing it now must give a timestamped name. assert sentinel.exists(), "Existing dataset directory must not be touched" + assert final_path != dataset_dir / "parquet-files" assert storage.resolved_dataset_name != "dataset", ( "resolved_dataset_name must be a new timestamped directory, not the existing one" ) -def test_if_possible_incompatible_config_refreshes_media_storage_path( - stub_resource_provider, stub_test_config_builder, tmp_path -): +def test_if_possible_incompatible_config_refreshes_media_storage_path(stub_resource_provider, tmp_path): """After IF_POSSIBLE β†’ NEVER downgrade, _media_storage must point to the new timestamped dir. Bug: validate_folder_names initialises MediaStorage with base_dataset_path at Pydantic @@ -2685,27 +2676,18 @@ def test_if_possible_incompatible_config_refreshes_media_storage_path( dataset_dir.mkdir() (dataset_dir / "existing_file.parquet").write_text("data") # non-empty dir triggers NEVERβ†’timestamp - storage = _ArtifactStorage(artifact_path=tmp_path, resume=ResumeMode.IF_POSSIBLE) - stub_resource_provider.artifact_storage = storage + builder, storage = _make_sampler_only_builder(stub_resource_provider, tmp_path) # Trigger validate_folder_names so _media_storage is initialised with IF_POSSIBLE semantics # (non-empty dir + IF_POSSIBLE β†’ resolved_dataset_name returns "dataset", not timestamped) original_media_base = storage.media_storage.base_path - builder = DatasetBuilder( - data_designer_config=stub_test_config_builder.build(), - resource_provider=stub_resource_provider, + _write_incompatible_config_metadata( + dataset_dir, + builder.data_designer_config.fingerprint()["config_hash_version"], ) - with patch.object(builder, "_check_resume_config_compatibility", return_value=_ConfigCompatibility.INCOMPATIBLE): - with patch.object(builder, "_run_model_health_check_if_needed"): - with patch.object(builder, "_run_mcp_tool_check_if_needed"): - with patch.object(builder, "_write_builder_config"): - with patch.object(builder, "_initialize_generators_and_graph", return_value=([], None)): - with patch.object(builder.batch_manager, "start"): - with patch.object(builder.batch_manager, "finish"): - with patch.object(builder._processor_runner, "run_after_generation"): - builder.build(num_records=2, resume=ResumeMode.IF_POSSIBLE) + builder.build(num_records=2, resume=ResumeMode.IF_POSSIBLE) new_media_base = storage.media_storage.base_path assert new_media_base != original_media_base, ( @@ -2716,9 +2698,7 @@ def test_if_possible_incompatible_config_refreshes_media_storage_path( ) -def test_if_possible_starts_fresh_when_no_existing_directory( - stub_resource_provider, stub_test_config_builder, tmp_path -): +def test_if_possible_starts_fresh_when_no_existing_directory(stub_resource_provider, tmp_path): """IF_POSSIBLE on a first-ever run (no dataset directory) must start fresh, not raise. Bug: _check_resume_config_compatibility returned True when config_path did not exist, @@ -2727,27 +2707,14 @@ def test_if_possible_starts_fresh_when_no_existing_directory( Fix: return False when the dataset directory itself is absent. """ - storage = _ArtifactStorage(artifact_path=tmp_path, resume=ResumeMode.IF_POSSIBLE) - stub_resource_provider.artifact_storage = storage - - builder = DatasetBuilder( - data_designer_config=stub_test_config_builder.build(), - resource_provider=stub_resource_provider, - ) - - with patch.object(builder, "_run_model_health_check_if_needed"): - with patch.object(builder, "_run_mcp_tool_check_if_needed"): - with patch.object(builder, "_write_builder_config"): - with patch.object(builder, "_initialize_generators_and_graph", return_value=([], None)): - with patch.object(builder.batch_manager, "start"): - with patch.object(builder.batch_manager, "finish"): - with patch.object(builder._processor_runner, "run_after_generation"): - builder.build(num_records=2, resume=ResumeMode.IF_POSSIBLE) + builder, storage = _make_sampler_only_builder(stub_resource_provider, tmp_path) + final_path = builder.build(num_records=2, resume=ResumeMode.IF_POSSIBLE) assert storage.resume == ResumeMode.NEVER + assert final_path.exists() -def test_if_possible_starts_fresh_when_directory_is_empty(stub_resource_provider, stub_test_config_builder, tmp_path): +def test_if_possible_starts_fresh_when_directory_is_empty(stub_resource_provider, tmp_path): """IF_POSSIBLE on an empty dataset directory must start fresh, not raise. Edge case: a prior run crashed in the window between mkdir and the first file write @@ -2760,21 +2727,8 @@ def test_if_possible_starts_fresh_when_directory_is_empty(stub_resource_provider dataset_dir = tmp_path / "dataset" dataset_dir.mkdir() # empty β€” no files written yet - storage = _ArtifactStorage(artifact_path=tmp_path, resume=ResumeMode.IF_POSSIBLE) - stub_resource_provider.artifact_storage = storage - - builder = DatasetBuilder( - data_designer_config=stub_test_config_builder.build(), - resource_provider=stub_resource_provider, - ) - - with patch.object(builder, "_run_model_health_check_if_needed"): - with patch.object(builder, "_run_mcp_tool_check_if_needed"): - with patch.object(builder, "_write_builder_config"): - with patch.object(builder, "_initialize_generators_and_graph", return_value=([], None)): - with patch.object(builder.batch_manager, "start"): - with patch.object(builder.batch_manager, "finish"): - with patch.object(builder._processor_runner, "run_after_generation"): - builder.build(num_records=2, resume=ResumeMode.IF_POSSIBLE) + builder, storage = _make_sampler_only_builder(stub_resource_provider, tmp_path) + final_path = builder.build(num_records=2, resume=ResumeMode.IF_POSSIBLE) assert storage.resume == ResumeMode.NEVER + assert final_path.exists() From 537ca605832a64d8f0b78a396020185dd083c224 Mon Sep 17 00:00:00 2001 From: Nabin Mulepati Date: Thu, 28 May 2026 11:07:49 -0600 Subject: [PATCH 3/6] address review comments on seed resume offset fix - Simplify _run_batch context-var setup so current_row_group is set consistently in fresh and resumed sync runs (matches the (x/X) log prefix the async path already emits) and add a docstring spelling out which ContextVars the function owns. - Document RowGroupResumePlan and build_row_group_resume_plan, and make the plan dataclass frozen+slots since it is a one-shot value. - Comment the modulo cycling logic in _index_range_at_offset. - Add a scheduler test verifying fresh async runs auto-derive the per-row-group offsets from row-group sizes (no caller-supplied offsets) so ordered generators stay parallel-safe across row groups. - Add a wraparound regression test that resumes past a full seed cycle, exercising the relative_offset == 0 branch the original #709 regression test missed. --- .../generators/seed_dataset.py | 7 ++ .../dataset_builders/dataset_builder.py | 52 +++++++++++++- .../dataset_builders/test_async_scheduler.py | 68 +++++++++++++++---- .../dataset_builders/test_dataset_builder.py | 58 ++++++++++++++++ 4 files changed, 168 insertions(+), 17 deletions(-) diff --git a/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py b/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py index 593f186a1..5c53b242e 100644 --- a/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py +++ b/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py @@ -94,6 +94,13 @@ def _reset_batch_reader(self, num_records: int, *, record_offset: int = 0) -> No ) def _index_range_at_offset(self, record_offset: int) -> IndexRange | None: + # ORDERED sampling cycles through the index range when more records are + # requested than the selection contains. ``record_offset`` is the count + # of records already produced for prior row groups, so it may exceed + # ``selected_size`` after one or more full cycles. Modulo by selection + # size gives the next read position within the current cycle; when it + # lands at 0 we fall back to the original range so the next read starts + # at ``selected_start`` like a fresh cycle. if record_offset <= 0: return self._index_range diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py index 0a12e863c..0b58f60f1 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py @@ -119,8 +119,19 @@ class _ResumeState: completed_row_groups: dict[int, int] -@dataclass +@dataclass(frozen=True, slots=True) class RowGroupResumePlan: + """Plan describing the row groups left to generate when resuming an async run. + + Attributes: + total_row_groups: Total row group count for the full target (original + extension). + remaining_row_groups: ``(rg_id, rg_size)`` for groups not yet on disk, in id order. + row_group_start_offsets: ``rg_id -> planned start offset`` for the remaining + groups, computed from the original plan so completed-group sizes are + preserved (offsets are not recomputed from the remaining list, which + would shift them when there are holes). + """ + total_row_groups: int remaining_row_groups: list[tuple[int, int]] row_group_start_offsets: dict[int, int] @@ -133,6 +144,27 @@ def build_row_group_resume_plan( buffer_size: int, completed_ids: set[int], ) -> RowGroupResumePlan: + """Compute the remaining row-group plan for an async resume. + + Original groups are immutable: their per-group sizes were fixed by the first + run's ``original_target_num_records`` and ``buffer_size``. Any extension + (``num_records > original_target``) always adds new groups beyond the + original count β€” ``ceil(num_records/buffer_size)`` would give the wrong + total when the original run was non-aligned and the extension fits in the + last original group's slack. + + Args: + original_target: Target record count from the first run (immutable). + num_records: Current target record count (may extend ``original_target``). + buffer_size: Records per row group. + completed_ids: Row-group IDs already persisted on disk. + + Returns: + A ``RowGroupResumePlan`` whose ``row_group_start_offsets`` are taken from + the full original plan, so the offset for ``rg_id`` is the same whether + or not earlier groups have completed. This is what lets ordered seed + generators seek to the correct row when resuming with holes. + """ num_original_groups = -(-original_target // buffer_size) extension_records = num_records - original_target total_row_groups = num_original_groups + -(-extension_records // buffer_size) @@ -1162,11 +1194,25 @@ def _run_batch( on_batch_complete: Callable[[Path], None] | None = None, row_group_start_offset: int | None = None, ) -> None: + """Run one batch of generators in the sync engine. + + Sets two ContextVars for the duration of the batch so order-dependent + generators (e.g. seed dataset under ORDERED sampling) and log helpers + can observe the row group's place in the run: + + - ``current_row_group`` is set whenever ``current_batch_number`` is known + (both fresh and resumed sync runs), so ``format_row_group_tag()`` + produces a consistent ``(x/X)`` log prefix in either path. + - ``current_row_group_start_offset`` is set only when the caller supplies + the planned start offset (sync resume passes it; fresh sync and preview + do not), so generators can seek into the correct seed slice without + replaying already-consumed rows. + """ row_group_token = None start_offset_token = None + if current_batch_number is not None: + row_group_token = current_row_group.set((current_batch_number, self.batch_manager.num_batches)) if row_group_start_offset is not None: - if current_batch_number is not None: - row_group_token = current_row_group.set((current_batch_number, self.batch_manager.num_batches)) start_offset_token = current_row_group_start_offset.set(row_group_start_offset) pre_batch_snapshot = self._resource_provider.model_registry.get_model_usage_snapshot() diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py index c0d52c136..ca32b499f 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py @@ -564,27 +564,29 @@ async def test_scheduler_multiple_row_groups() -> None: assert tracker.is_row_group_complete(2, 1, ["seed", "cell_out"]) -@pytest.mark.asyncio(loop_scope="session") -async def test_scheduler_sets_row_group_start_offsets_for_generators() -> None: - """Ordered generators can seek by planned row-group offset during async resume.""" +class _OffsetSeedGenerator(FromScratchColumnGenerator[ExpressionColumnConfig]): + """Synthetic seed generator that emits ``[offset, offset + n)`` for a row group.""" - class OffsetSeedGenerator(FromScratchColumnGenerator[ExpressionColumnConfig]): - @staticmethod - def get_generation_strategy() -> GenerationStrategy: - return GenerationStrategy.FULL_COLUMN + @staticmethod + def get_generation_strategy() -> GenerationStrategy: + return GenerationStrategy.FULL_COLUMN - def generate(self, data: lazy.pd.DataFrame) -> lazy.pd.DataFrame: - return data + def generate(self, data: lazy.pd.DataFrame) -> lazy.pd.DataFrame: + return data + + def generate_from_scratch(self, num_records: int) -> lazy.pd.DataFrame: + offset = current_row_group_start_offset.get() + assert offset is not None + return lazy.pd.DataFrame({"seed": list(range(offset, offset + num_records))}) - def generate_from_scratch(self, num_records: int) -> lazy.pd.DataFrame: - offset = current_row_group_start_offset.get() - assert offset is not None - return lazy.pd.DataFrame({"seed": list(range(offset, offset + num_records))}) +@pytest.mark.asyncio(loop_scope="session") +async def test_scheduler_sets_row_group_start_offsets_for_generators() -> None: + """Ordered generators can seek by planned row-group offset during async resume.""" provider = _mock_provider() configs = [SamplerColumnConfig(name="seed", sampler_type=SamplerType.CATEGORY, params={"values": ["A"]})] strategies = {"seed": GenerationStrategy.FULL_COLUMN} - generators = {"seed": OffsetSeedGenerator(config=_expr_config("seed"), resource_provider=provider)} + generators = {"seed": _OffsetSeedGenerator(config=_expr_config("seed"), resource_provider=provider)} row_groups = [(1, 1), (3, 1)] graph = ExecutionGraph.create(configs, strategies) @@ -606,6 +608,44 @@ def generate_from_scratch(self, num_records: int) -> lazy.pd.DataFrame: assert buffer_manager.get_row(3, 0)["seed"] == 3 +@pytest.mark.asyncio(loop_scope="session") +async def test_scheduler_auto_computes_row_group_start_offsets_for_fresh_runs() -> None: + """Fresh async runs (no caller-supplied offsets) auto-derive offsets from row-group sizes. + + This locks in the per-row-group seek behavior for ordered generators on fresh + runs. Previously the scheduler relied on a single shared seed reader whose + state advanced under a stateful lock; now each row group seeks to its own + planned offset, which is parallel-safe and order-independent. + """ + provider = _mock_provider() + configs = [SamplerColumnConfig(name="seed", sampler_type=SamplerType.CATEGORY, params={"values": ["A"]})] + strategies = {"seed": GenerationStrategy.FULL_COLUMN} + generators = {"seed": _OffsetSeedGenerator(config=_expr_config("seed"), resource_provider=provider)} + row_groups = [(0, 2), (1, 2), (2, 1)] # non-aligned last group exercises offset accumulation + + graph = ExecutionGraph.create(configs, strategies) + tracker = CompletionTracker.with_graph(graph, row_groups) + storage = _make_storage() + buffer_manager = RowGroupBufferManager(storage) + + scheduler = AsyncTaskScheduler( + generators=generators, + graph=graph, + tracker=tracker, + row_groups=row_groups, + buffer_manager=buffer_manager, + # row_group_start_offsets intentionally omitted β€” scheduler should derive + # {0: 0, 1: 2, 2: 4} from the row-group sizes. + ) + await scheduler.run() + + assert buffer_manager.get_row(0, 0)["seed"] == 0 + assert buffer_manager.get_row(0, 1)["seed"] == 1 + assert buffer_manager.get_row(1, 0)["seed"] == 2 + assert buffer_manager.get_row(1, 1)["seed"] == 3 + assert buffer_manager.get_row(2, 0)["seed"] == 4 + + @pytest.mark.asyncio(loop_scope="session") async def test_scheduler_non_retryable_failure_drops_row() -> None: """Non-retryable failure drops the row.""" diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py index 207f40961..8921e795f 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py @@ -1689,6 +1689,64 @@ def stop(_path: Path) -> None: assert result["copy"].tolist() == ["alpha", "beta", "gamma"] +def test_build_resume_ordered_seed_dataset_extension_wraps_at_cycle_boundary(stub_resource_provider, tmp_path): + """Resume that extends past a full seed cycle hits the modulo == 0 branch. + + Companion to the basic #709 regression: when the resumed run's first new + row group starts at an offset that is a non-zero multiple of the seed + selection size, ``_index_range_at_offset`` returns the full original + ``_index_range`` so reads restart at ``_index_range.start`` like a fresh + cycle (instead of producing a degenerate empty range). + """ + seed_source = DataFrameSeedSource(df=lazy.pd.DataFrame({"name": ["alpha", "beta", "gamma"]})) + seed_reader = DataFrameSeedReader() + seed_reader.attach(seed_source, Mock()) + + config_builder = DataDesignerConfigBuilder() + config_builder.with_seed_dataset( + seed_source, + sampling_strategy=SamplingStrategy.ORDERED, + selection_strategy=IndexRange(start=0, end=2), + ) + config_builder.add_column(ExpressionColumnConfig(name="copy", expr="{{ name }}")) + + storage = ArtifactStorage(artifact_path=tmp_path, dataset_name="dataset", resume=ResumeMode.NEVER) + stub_resource_provider.artifact_storage = storage + stub_resource_provider.seed_reader = seed_reader + stub_resource_provider.run_config = RunConfig(disable_early_shutdown=True, buffer_size=1) + + builder = DatasetBuilder( + data_designer_config=config_builder.build(), + resource_provider=stub_resource_provider, + ) + # Run 1: target=3 fills exactly one full cycle through the 3-row selection. + builder.build(num_records=3, resume=ResumeMode.NEVER) + + resumed_seed_reader = DataFrameSeedReader() + resumed_seed_reader.attach(seed_source, Mock()) + stub_resource_provider.seed_reader = resumed_seed_reader + stub_resource_provider.artifact_storage = ArtifactStorage( + artifact_path=tmp_path, + dataset_name="dataset", + resume=ResumeMode.ALWAYS, + ) + + resumed_builder = DatasetBuilder( + data_designer_config=config_builder.build(), + resource_provider=stub_resource_provider, + ) + # Run 2: extend to 6. The first extension row group has start offset 3, + # which is exactly one selection cycle (3 % 3 == 0) β€” the wrap branch. + final_path = resumed_builder.build(num_records=6, resume=ResumeMode.ALWAYS) + result = lazy.pd.concat( + [lazy.pd.read_parquet(path) for path in sorted(final_path.glob("batch_*.parquet"))], + ignore_index=True, + ) + + assert result["name"].tolist() == ["alpha", "beta", "gamma", "alpha", "beta", "gamma"] + assert result["copy"].tolist() == ["alpha", "beta", "gamma", "alpha", "beta", "gamma"] + + def test_build_resume_starts_fresh_without_metadata(stub_resource_provider, stub_test_config_builder, tmp_path, caplog): """resume=True when only the folder exists (no metadata.json) logs an info message and starts fresh. From b22ca1129c3dabae72a855a62dbe75cc05adefa0 Mon Sep 17 00:00:00 2001 From: Nabin Mulepati Date: Thu, 28 May 2026 13:23:10 -0600 Subject: [PATCH 4/6] address review suggestions on seed resume offset fix - update current_row_group ContextVar comment to reflect that both the async scheduler and the sync engine's _run_batch set it - move pre_batch_snapshot capture (and ran_pre_batch flag) inside the try/finally in _run_batch so a failure between ContextVar.set and the snapshot call still resets the tokens - add a direct unit test for the relative_offset == 0 wraparound branch in _index_range_at_offset to lock in the fresh-cycle restart behavior --- .../src/data_designer/engine/context.py | 3 ++- .../dataset_builders/dataset_builder.py | 4 +-- .../generators/test_seed_dataset.py | 25 +++++++++++++++++++ 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/packages/data-designer-engine/src/data_designer/engine/context.py b/packages/data-designer-engine/src/data_designer/engine/context.py index 002a4b95c..621391617 100644 --- a/packages/data-designer-engine/src/data_designer/engine/context.py +++ b/packages/data-designer-engine/src/data_designer/engine/context.py @@ -5,7 +5,8 @@ from contextvars import ContextVar -# Set by the async scheduler before executing each task. +# Set per row group by both engines: the async scheduler sets it before each +# task executes, and the sync engine's ``_run_batch`` sets it for each batch. # Value: (current_rg_index, total_rg_count) or None. current_row_group: ContextVar[tuple[int, int] | None] = ContextVar("current_row_group", default=None) diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py index 0b58f60f1..9a8670599 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py @@ -1215,9 +1215,9 @@ def _run_batch( if row_group_start_offset is not None: start_offset_token = current_row_group_start_offset.set(row_group_start_offset) - pre_batch_snapshot = self._resource_provider.model_registry.get_model_usage_snapshot() - ran_pre_batch = False try: + pre_batch_snapshot = self._resource_provider.model_registry.get_model_usage_snapshot() + ran_pre_batch = False for generator in generators: generator.log_pre_generation() try: diff --git a/packages/data-designer-engine/tests/engine/column_generators/generators/test_seed_dataset.py b/packages/data-designer-engine/tests/engine/column_generators/generators/test_seed_dataset.py index 026d7b13b..8c4432bda 100644 --- a/packages/data-designer-engine/tests/engine/column_generators/generators/test_seed_dataset.py +++ b/packages/data-designer-engine/tests/engine/column_generators/generators/test_seed_dataset.py @@ -261,6 +261,31 @@ def test_seed_dataset_column_generator_reset_batch_reader_applies_record_offset( assert gen._batch_reader == mock_batch_reader +def test_seed_dataset_column_generator_reset_batch_reader_wraps_at_cycle_boundary( + stub_seed_dataset_generator, +) -> None: + """Direct unit test for the ``relative_offset == 0`` branch in ``_index_range_at_offset``. + + Selection range ``[4, 8]`` has size 5, so ``record_offset=5`` is exactly one full + cycle: modulo lands on 0 and the helper must hand back the original full range + so the next read restarts at ``selected_start`` like a fresh cycle (not a + degenerate empty range). + """ + gen = stub_seed_dataset_generator + mock_batch_reader = Mock() + gen._index_range = IndexRange(start=4, end=8) + gen.resource_provider.seed_reader.create_batch_reader.return_value = mock_batch_reader + + gen._reset_batch_reader(100, record_offset=5) + + gen.resource_provider.seed_reader.create_batch_reader.assert_called_once_with( + batch_size=100, + index_range=IndexRange(start=4, end=8), + shuffle=False, + ) + assert gen._batch_reader == mock_batch_reader + + def test_seed_dataset_column_generator_ordered_generation_uses_row_group_offset( stub_seed_dataset_generator, ) -> None: From c15a9008bf42f4b139c91b0a185d56e0035b20d1 Mon Sep 17 00:00:00 2001 From: Nabin Mulepati Date: Thu, 28 May 2026 13:46:42 -0600 Subject: [PATCH 5/6] add ORDERED + PartitionBlock resume regression test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Companion to the existing IndexRange resume test. Locks in correct behavior when the seed selection comes from PartitionBlock β€” its to_index_range produces a contiguous range today, but nothing else asserts that contract. The test crosses a cycle boundary inside the partition (4 records over a 2-row partition) so it exercises both the offset-into-partition branch and the relative_offset == 0 wraparound branch end-to-end. --- .../dataset_builders/test_dataset_builder.py | 72 ++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py index 8921e795f..5d66b6c48 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py @@ -25,7 +25,7 @@ from data_designer.config.processors import DropColumnsProcessorConfig from data_designer.config.run_config import RunConfig from data_designer.config.sampler_params import SamplerType, UUIDSamplerParams -from data_designer.config.seed import IndexRange, SamplingStrategy +from data_designer.config.seed import IndexRange, PartitionBlock, SamplingStrategy from data_designer.config.seed_source import LocalFileSeedSource from data_designer.config.seed_source_dataframe import DataFrameSeedSource from data_designer.engine.column_generators.generators.base import GenerationStrategy @@ -1747,6 +1747,76 @@ def test_build_resume_ordered_seed_dataset_extension_wraps_at_cycle_boundary(stu assert result["copy"].tolist() == ["alpha", "beta", "gamma", "alpha", "beta", "gamma"] +def test_build_resume_ordered_seed_dataset_with_partition_block_continues_within_partition( + stub_resource_provider, tmp_path +): + """Resume must seek into the partition slice, not just the full dataset. + + Companion to the basic #709 regression that uses ``IndexRange``: this exercises + the same offset machinery with ``PartitionBlock``, which resolves to a + contiguous ``IndexRange`` only because of ``PartitionBlock.to_index_range``. + The resumed run also crosses a cycle boundary inside the partition, hitting + both the offset-into-partition branch and the wraparound (``relative_offset == 0``) + branch end-to-end. + """ + + class StopAfterFirstBatch(RuntimeError): + pass + + seed_source = DataFrameSeedSource(df=lazy.pd.DataFrame({"name": ["a", "b", "c", "d", "e", "f"]})) + seed_reader = DataFrameSeedReader() + seed_reader.attach(seed_source, Mock()) + + # PartitionBlock(index=1, num_partitions=3) over 6 rows -> IndexRange(2, 3), + # i.e. a 2-row cycle of ["c", "d"]. With buffer_size=1 and num_records=4, a + # full continuous run would emit ["c", "d", "c", "d"]. + config_builder = DataDesignerConfigBuilder() + config_builder.with_seed_dataset( + seed_source, + sampling_strategy=SamplingStrategy.ORDERED, + selection_strategy=PartitionBlock(index=1, num_partitions=3), + ) + config_builder.add_column(ExpressionColumnConfig(name="copy", expr="{{ name }}")) + + storage = ArtifactStorage(artifact_path=tmp_path, dataset_name="dataset", resume=ResumeMode.NEVER) + stub_resource_provider.artifact_storage = storage + stub_resource_provider.seed_reader = seed_reader + stub_resource_provider.run_config = RunConfig(disable_early_shutdown=True, buffer_size=1) + + builder = DatasetBuilder( + data_designer_config=config_builder.build(), + resource_provider=stub_resource_provider, + ) + + def stop(_path: Path) -> None: + raise StopAfterFirstBatch("simulated interruption") + + with pytest.raises(StopAfterFirstBatch, match="simulated interruption"): + builder.build(num_records=4, on_batch_complete=stop, resume=ResumeMode.NEVER) + + resumed_seed_reader = DataFrameSeedReader() + resumed_seed_reader.attach(seed_source, Mock()) + stub_resource_provider.seed_reader = resumed_seed_reader + stub_resource_provider.artifact_storage = ArtifactStorage( + artifact_path=tmp_path, + dataset_name="dataset", + resume=ResumeMode.ALWAYS, + ) + + resumed_builder = DatasetBuilder( + data_designer_config=config_builder.build(), + resource_provider=stub_resource_provider, + ) + final_path = resumed_builder.build(num_records=4, resume=ResumeMode.ALWAYS) + result = lazy.pd.concat( + [lazy.pd.read_parquet(path) for path in sorted(final_path.glob("batch_*.parquet"))], + ignore_index=True, + ) + + assert result["name"].tolist() == ["c", "d", "c", "d"] + assert result["copy"].tolist() == ["c", "d", "c", "d"] + + def test_build_resume_starts_fresh_without_metadata(stub_resource_provider, stub_test_config_builder, tmp_path, caplog): """resume=True when only the folder exists (no metadata.json) logs an info message and starts fresh. From f5905def04cab40471b0ea06e4896947ab0b2422 Mon Sep 17 00:00:00 2001 From: Nabin Mulepati Date: Thu, 28 May 2026 22:46:37 -0600 Subject: [PATCH 6/6] fix async resume progress accounting --- .../dataset_builders/async_scheduler.py | 6 ++++- .../dataset_builders/dataset_builder.py | 1 + .../utils/async_progress_reporter.py | 7 ++++-- .../utils/progress_tracker.py | 11 +++++--- .../dataset_builders/test_async_scheduler.py | 25 +++++++++++++++++++ 5 files changed, 44 insertions(+), 6 deletions(-) diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py index d4376c1b7..53b83b220 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py @@ -171,6 +171,7 @@ def __init__( scheduler_event_sink: SchedulerAdmissionEventSink | None = None, run_id: str | None = None, row_group_start_offsets: dict[int, int] | None = None, + initial_completed_records: int = 0, adaptive_row_group_admission: bool = False, adaptive_row_group_initial_target: int = 1, request_pressure_provider: RequestPressureSnapshotProvider | None = None, @@ -297,6 +298,8 @@ def __init__( self._max_model_task_admission = max_model_task_admission self._num_records = num_records self._buffer_size = buffer_size + self._scheduled_records = sum(size for _, size in row_groups) + self._initial_completed_records = initial_completed_records self._observed_max_row_groups_in_flight = 0 self._observed_max_task_leases_by_resource: dict[str, int] = {} self._observed_max_queued_by_group: dict[str, int] = {} @@ -355,6 +358,7 @@ def _setup_async_progress_reporter( total_records=task_counts[col], label=f"column '{col}'", quiet=True, + initial_completed=self._initial_completed_records, ) if not trackers: @@ -1030,7 +1034,7 @@ async def run(self) -> None: with self._progress_bar or contextlib.nullcontext(): if self._reporter: - self._reporter.log_start(num_row_groups=num_rgs) + self._reporter.log_start(num_row_groups=num_rgs, scheduled_records=self._scheduled_records) self._emit_scheduler_event("scheduler_job_started", diagnostics=self._scheduler_job_diagnostics()) self._emit_scheduler_health_snapshot("start") diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py index 9a8670599..43f5d7e71 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py @@ -1144,6 +1144,7 @@ def on_before_checkpoint(rg_id: int, rg_size: int) -> None: num_records=num_records, buffer_size=buffer_size, row_group_start_offsets=row_group_start_offsets, + initial_completed_records=initial_actual_num_records, progress_interval=self._resource_provider.run_config.progress_interval, progress_bar=self._resource_provider.run_config.progress_bar, request_pressure_provider=self._resource_provider.model_registry.request_admission, diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_progress_reporter.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_progress_reporter.py index c394ae613..7208bbb78 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_progress_reporter.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_progress_reporter.py @@ -42,9 +42,12 @@ def __init__( for col, tracker in trackers.items(): self._bar.add_bar(col, f"column '{col}'", tracker.total_records) - def log_start(self, num_row_groups: int) -> None: + def log_start(self, num_row_groups: int, scheduled_records: int | None = None) -> None: cols = ", ".join(self._trackers) - total = sum(t.total_records for t in self._trackers.values()) + if scheduled_records is None: + total = sum(max(0, t.total_records - t.completed) for t in self._trackers.values()) + else: + total = scheduled_records * len(self._trackers) logger.info( "⚑️ Async generation: %d column(s) (%s), %d tasks across %d row group(s)", len(self._trackers), diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/progress_tracker.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/progress_tracker.py index 73afa2e26..ebcdd88b2 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/progress_tracker.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/progress_tracker.py @@ -44,19 +44,23 @@ def __init__( quiet: bool = False, progress_bar: StickyProgressBar | None = None, progress_bar_key: str | None = None, + initial_completed: int = 0, ): self.total_records = total_records self.label = label self.quiet = quiet - self.completed = 0 - self.success = 0 + self.completed = min(max(0, initial_completed), total_records) + self.success = self.completed self.failed = 0 self.skipped = 0 + self._initial_completed = self.completed interval_fraction = max(1, log_interval_percent) / 100.0 self.log_interval = max(1, int(total_records * interval_fraction)) if total_records > 0 else 1 self.next_log_at = self.log_interval + while self.next_log_at <= self.completed: + self.next_log_at += self.log_interval self.start_time = time.perf_counter() self.lock = Lock() @@ -130,7 +134,8 @@ def _record_completion(self, *, success: bool | None) -> None: def _get_snapshot_unlocked(self, elapsed: float | None = None) -> tuple[int, int, int, int, int, float, float, str]: current_elapsed = time.perf_counter() - self.start_time if elapsed is None else elapsed - rate = self.completed / current_elapsed if current_elapsed > 0 else 0.0 + run_completed = max(0, self.completed - self._initial_completed) + rate = run_completed / current_elapsed if current_elapsed > 0 else 0.0 percent = (self.completed / self.total_records) * 100 if self.total_records else 100.0 emoji = self._random_emoji.progress(percent) return self.completed, self.total_records, self.success, self.failed, self.skipped, percent, rate, emoji diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py index ca32b499f..d705f674b 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py @@ -41,7 +41,9 @@ from data_designer.engine.dataset_builders.scheduling.task_admission import TaskAdmissionConfig, TaskAdmissionLease from data_designer.engine.dataset_builders.scheduling.task_model import Task from data_designer.engine.dataset_builders.scheduling.task_policies import BoundedBorrowTaskAdmissionPolicyConfig +from data_designer.engine.dataset_builders.utils.async_progress_reporter import AsyncProgressReporter from data_designer.engine.dataset_builders.utils.execution_graph import ExecutionGraph +from data_designer.engine.dataset_builders.utils.progress_tracker import ProgressTracker from data_designer.engine.dataset_builders.utils.row_group_buffer import RowGroupBufferManager from data_designer.engine.models.errors import ( RETRYABLE_MODEL_ERRORS, @@ -1111,6 +1113,29 @@ async def test_scheduler_eager_row_drop_skips_downstream_of_failed_column() -> N assert scheduler._reporter._trackers["downstream"].completed == 2 +def test_resume_progress_reporter_starts_from_completed_records(caplog: pytest.LogCaptureFixture) -> None: + """Resume progress should include persisted records while logging only remaining scheduled work.""" + trackers = { + "cell_a": ProgressTracker(total_records=1000, label="column 'cell_a'", quiet=True, initial_completed=252), + "cell_b": ProgressTracker(total_records=1000, label="column 'cell_b'", quiet=True, initial_completed=252), + } + completed, total, _success, _failed, _skipped, _pct, rate, _emoji = trackers["cell_a"].get_snapshot(elapsed=1.0) + assert completed == 252 + assert total == 1000 + assert rate == 0.0 + + trackers["cell_a"].record_success() + completed, _total, _success, _failed, _skipped, _pct, rate, _emoji = trackers["cell_a"].get_snapshot(elapsed=1.0) + assert completed == 253 + assert rate == 1.0 + + reporter = AsyncProgressReporter(trackers) + with caplog.at_level(logging.INFO): + reporter.log_start(num_row_groups=2, scheduled_records=128) + + assert "256 tasks across 2 row group(s)" in caplog.text + + @pytest.mark.asyncio(loop_scope="session") async def test_scheduler_non_retryable_seed_failure_no_keyerror_on_downstream() -> None: """Non-retryable seed failure does not cause KeyError on vacuously-ready downstream.