diff --git a/README.md b/README.md index bd04999b..c2c45c9a 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ The library now supports reasoning traces through the `reasoning_content` field - [Using the library](#using-the-library) - [Data format](#data-format) - [Reasoning content support](#reasoning-content-support-1) +- [Continual pretraining mode](#continual-pretraining-mode) - [Documentation](#documentation) - [Learning about the training arguments](#learning-about-training-arguments) - [`TrainingArgs`](#trainingargs) @@ -122,6 +123,46 @@ The library now supports an optional `reasoning_content` field in addition to th } ``` +## Continual pretraining mode + +In addition to instruction tuning, the library can run document-style continual pretraining on raw text corpora. +Enable this by supplying a block size when invoking `main_ds.py`: + +```bash +torchrun main_ds.py \ + --model_name_or_path mistralai/Mistral-7B-v0.1 \ + --data_path /data/documents.jsonl \ + --ckpt_output_dir ./checkpoints \ + --effective_batch_size 128 \ + --max_batch_len 60000 \ + --block-size 8192 \ + --document-column-name text # optional, defaults to "document" +``` + +- `--block-size` (required) toggles continual pretraining and controls how many tokens are packed into each block. +- `--document-column-name` (optional) specifies which JSONL field contains the raw document text. + +The same options are available programmatically via `TrainingArgs.pretraining_config`: + +```python +from instructlab.training import TrainingArgs, PretrainingConfig + +train_args = TrainingArgs( + model_name_or_path="mistralai/Mistral-7B-v0.1", + data_path="documents.jsonl", + ckpt_output_dir="./checkpoints", + max_seq_len=4096, + max_batch_len=40000, + effective_batch_size=128, + pretraining_config=PretrainingConfig( + block_size=2048, + document_column_name="text", # optional + ), +) +``` + +When a pretraining config is provided, `process_documents_for_pretraining()` is invoked under the hood to tokenize raw documents before training. + **Standard message structure:** ```json @@ -139,7 +180,7 @@ The library now supports an optional `reasoning_content` field in addition to th } ``` -#### Important Notes +### Important Notes 1. **Automatic reasoning content processing**: If `reasoning_content` exists in a message, it will always be processed and unmasked as long as the message role is targeted for unmasking. This ensures that reasoning traces are properly included in the training data. diff --git a/src/instructlab/training/__init__.py b/src/instructlab/training/__init__.py index 78ba2bfd..136d1384 100644 --- a/src/instructlab/training/__init__.py +++ b/src/instructlab/training/__init__.py @@ -10,6 +10,7 @@ "FSDPOptions", "ShardingStrategies", "DistributedBackend", + "PretrainingConfig", ) # First Party @@ -23,6 +24,7 @@ DistributedBackend, FSDPOptions, LoraOptions, + PretrainingConfig, QuantizeDataType, ShardingStrategies, TorchrunArgs, diff --git a/src/instructlab/training/accelerator.py b/src/instructlab/training/accelerator.py index 4baa7c0e..49fa52a8 100644 --- a/src/instructlab/training/accelerator.py +++ b/src/instructlab/training/accelerator.py @@ -63,6 +63,9 @@ def __init__( self.lr_scheduler = None if self.distributed_framework == DistributedBackend.DEEPSPEED: # Standard + cpu_offload_optimizer_ratio = ( + self.deepspeed_cpu_offload_optimizer_ratio or 0.0 + ) accel_args = { "deepspeed_plugin": self.get_ds_plugin( world_size=torch.distributed.get_world_size(), @@ -70,7 +73,7 @@ def __init__( grad_accum=grad_accum, opts=DeepSpeedOptions( cpu_offload_optimizer=deepspeed_cpu_offload_optimizer, - cpu_offload_optimizer_ratio=self.deepspeed_cpu_offload_optimizer_ratio, + cpu_offload_optimizer_ratio=cpu_offload_optimizer_ratio, cpu_offload_optimizer_pin_memory=self.deepspeed_cpu_offload_optimizer_pin_memory, save_samples=save_samples, ), diff --git a/src/instructlab/training/config.py b/src/instructlab/training/config.py index e603b55e..3f2dd810 100644 --- a/src/instructlab/training/config.py +++ b/src/instructlab/training/config.py @@ -63,10 +63,34 @@ class DataProcessArgs(BaseModel): description="this is the number of CPU procs we use for data processing parallelization", ) + # Pretraining mode flag + is_pretraining: bool = Field( + default=False, + description="Enable pretraining mode: tokenizes raw documents without chat templates or chunking", + ) + pretraining_column_name: str = Field( + default="document", + description="the name of the column containing the text to pretrain on", + ) + # disable the protected namespace for the model_config field model_config = ConfigDict(protected_namespaces=()) +class PretrainingConfig(BaseModel): + """ + Configuration for pretraining mode. + """ + + block_size: int = Field( + description="Size of each block in tokens for pretraining datasets." + ) + document_column_name: str = Field( + default="document", + description="Name of the column containing raw documents for pretraining.", + ) + + # public API class TorchrunArgs(BaseModel): """ @@ -266,6 +290,14 @@ class TrainingArgs(BaseModel): # "last_epoch". This works alongside the '--checkpoint_at_epoch' flag. keep_last_checkpoint_only: Optional[bool] = False + pretraining_config: Optional[PretrainingConfig] = Field( + default=None, + description=( + "Pretraining configuration. When provided, enables block-based sampling " + "for raw document pretraining datasets." + ), + ) + # TODO(osilkin): # we are only exposing this here because `run_training` today is implicitly coupled # with `process_data`. Since we don't have a specific field for data processing arguments, diff --git a/src/instructlab/training/data_process.py b/src/instructlab/training/data_process.py index 2b93a48f..b0eaa268 100644 --- a/src/instructlab/training/data_process.py +++ b/src/instructlab/training/data_process.py @@ -412,7 +412,10 @@ def process_messages_into_input_ids_with_chat_template(args: DataProcessArgs): logger.info("Tokenizing the dataset with %s tokenizer...", args.model_path) data_with_input_ids = data.map( lambda x: { - "input_ids": tokenizer.apply_chat_template(x["messages"], tokenize=True), + # newer versions of transformers have `return_dict=True` by default + "input_ids": tokenizer.apply_chat_template( + x["messages"], tokenize=True, return_dict=False + ), "unmask": bool(x["unmask"]) if "unmask" in x else False, }, num_proc=NUM_PROC, @@ -687,7 +690,8 @@ def unmask_messages( if regions: message_regions_map[idx] = regions - input_ids = tokenizer.apply_chat_template(msgs_with_unmasking) + # newer versions of transformers have `return_dict=True` by default + input_ids = tokenizer.apply_chat_template(msgs_with_unmasking, return_dict=False) # Get token IDs for all unmask tokens unmask_begin_token_id = tokenizer.encode( @@ -1133,6 +1137,109 @@ def process_messages_into_input_ids( save_dataset(final_dataset, data_output_path, num_cpu_procs) +def process_documents_for_pretraining( + data_path: str, + data_output_path: str, + model_path: str, + num_cpu_procs: int, + document_column_name: str = "document", +) -> None: + """ + Process raw documents for pretraining by tokenizing without chunking. + + Outputs one JSONL record per document with only input_ids (no labels). + Blocking/chunking happens later during training. + + Pattern: Each document → [BOS][tokens][EOS] + + Args: + data_path: Path to input JSONL with {"document": "text"} format + data_output_path: Directory for processed data output + model_path: Path to model/tokenizer + num_cpu_procs: Number of parallel processes + document_column_name: Name of the column containing the documents + """ + ensure_can_write_to_directory(data_output_path) + + # Load and validate dataset + try: + data = load_dataset("json", data_files=data_path, split="train") + except Exception as e: + raise ValueError( + "Malformed or missing data, please ensure your dataset is correctly formatted" + ) from e + + if data.num_rows == 0: + raise ValueError("The provided dataset is empty") + + if document_column_name not in data.column_names: + raise ValueError( + f"Pretraining data must have '{document_column_name}' field. Found: {data.column_names}" + ) + + logger.info("Loading tokenizer from %s", model_path) + tokenizer = AutoTokenizer.from_pretrained(model_path) + + if tokenizer.eos_token_id is None: + raise ValueError("Tokenizer must have an EOS token defined for pretraining") + + logger.info("Tokenizing %d documents for pretraining...", data.num_rows) + + # Tokenize each document: encode() adds BOS, then append EOS + def tokenize_document(sample): + input_ids = tokenizer.encode( + sample[document_column_name], add_special_tokens=True + ) + + # ensures eos token is present without double-adding it. + if input_ids[-1] != tokenizer.eos_token_id: + input_ids.append(tokenizer.eos_token_id) + + return { + "input_ids": input_ids, + "len": len(input_ids), + } + + # Filter out empty documents before tokenization + def filter_empty_documents(batch): + return [bool(doc) for doc in batch[document_column_name]] + + filtered_data = data.filter( + filter_empty_documents, + batched=True, + num_proc=num_cpu_procs, + desc="Filtering empty documents", + ) + + dropped_count = data.num_rows - filtered_data.num_rows + if dropped_count > 0: + logger.info(f"Dropped {dropped_count:,} empty documents") + tokenized_data = filtered_data.map( + tokenize_document, + num_proc=num_cpu_procs, + desc="Tokenizing documents", + remove_columns=filtered_data.column_names, + ) + + # Calculate statistics + total_tokens = sum(tokenized_data["len"]) + avg_tokens = total_tokens / len(tokenized_data) + logger.info(f"Processed {len(tokenized_data):,} documents") + logger.info(f"Total tokens: {total_tokens:,}") + logger.info(f"Average tokens per document: {avg_tokens:.1f}") + + # Save to JSONL (one record per document) + os.makedirs(data_output_path, exist_ok=True) + output_file = Path(data_output_path) / "data.jsonl" + + tokenized_data.to_json( + output_file, num_proc=num_cpu_procs, lines=True, orient="records" + ) + + logger.info(f"Saved tokenized documents to {output_file}") + logger.info("Note: Blocking into fixed-size chunks will happen during training") + + def ensure_can_write_to_directory(output_dir: str) -> None: """ Ensure that we can write to the output directory. diff --git a/src/instructlab/training/main_ds.py b/src/instructlab/training/main_ds.py index d08afc91..ff5cea7d 100644 --- a/src/instructlab/training/main_ds.py +++ b/src/instructlab/training/main_ds.py @@ -6,7 +6,6 @@ import logging import os import subprocess -import sys import time import warnings @@ -47,6 +46,7 @@ from instructlab.training.config import ( DistributedBackend, ModelTypes, + PretrainingConfig, TorchrunArgs, TrainingArgs, ) @@ -364,6 +364,7 @@ def main(args): batch_size = args.effective_batch_size pad_token_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else 0 + train_loader = get_data_loader( data_path=args.data_path, batch_size=batch_size, @@ -374,6 +375,7 @@ def main(args): num_workers=8, # I don't like this but am setting it for consistency flash_enabled=flash_enabled, pad_token_id=pad_token_id, + pretraining_config=getattr(args, "pretraining_config", None), ) if args.local_rank == 0: @@ -469,18 +471,27 @@ def run_training(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: ) if train_args.process_data: - # TODO(osilkin): - # Decouple the data processing logic from training. - # Now that we've decided that repos will be less tethered to the - # design choices of the `ilab` CLI, we can make this change. - dp.process_data( - data_output_path=train_args.data_output_dir, - model_path=train_args.model_path, - data_path=train_args.data_path, - max_seq_len=train_args.max_seq_len, - chat_tmpl_path=train_args.chat_tmpl_path, - num_cpu_procs=train_args.data_process_num_cpu_procs, - ) + if train_args.pretraining_config is not None: + dp.process_documents_for_pretraining( + data_path=train_args.data_path, + data_output_path=train_args.data_output_dir, + model_path=train_args.model_path, + num_cpu_procs=train_args.data_process_num_cpu_procs, + document_column_name=train_args.pretraining_config.document_column_name, + ) + else: + # TODO(osilkin): + # Decouple the data processing logic from training. + # Now that we've decided that repos will be less tethered to the + # design choices of the `ilab` CLI, we can make this change. + dp.process_data( + data_output_path=train_args.data_output_dir, + model_path=train_args.model_path, + data_path=train_args.data_path, + max_seq_len=train_args.max_seq_len, + chat_tmpl_path=train_args.chat_tmpl_path, + num_cpu_procs=train_args.data_process_num_cpu_procs, + ) if not os.path.exists(train_args.ckpt_output_dir): os.makedirs(train_args.ckpt_output_dir, exist_ok=True) @@ -537,6 +548,12 @@ def run_training(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: ] ) + if train_args.pretraining_config is not None: + command.append(f"--block-size={train_args.pretraining_config.block_size}") + command.append( + f"--document-column-name={train_args.pretraining_config.document_column_name}" + ) + if train_args.chat_tmpl_path is not None: command.append(f"--chat-tmpl-path={train_args.chat_tmpl_path}") @@ -554,8 +571,8 @@ def run_training(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: if train_args.mock_data: command.append("--mock_data") - if train_args.mock_len: - command.append(f"--mock_len={train_args.mock_len}") + if train_args.mock_data_len: + command.append(f"--mock_len={train_args.mock_data_len}") if train_args.disable_flash_attn: command.append("--disable_flash_attn") @@ -784,6 +801,18 @@ def run_training(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: help="Which modules we should target for injecting LoRA layers. Defaults to selecting all projection layers when no values are provided.", ) parser.add_argument("--max_batch_len", type=int, default=60000) + parser.add_argument( + "--block-size", + type=int, + default=None, + help="When provided, enables pretraining mode with the given token block size.", + ) + parser.add_argument( + "--document-column-name", + type=str, + default=None, + help="Column name containing raw documents for continual pretraining data.", + ) parser.add_argument( "--cpu_offload_optimizer", action="store_true", @@ -856,6 +885,18 @@ def run_training(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: help="Epsilon for numerical stability in AdamW optimizer.", ) args = parser.parse_args() + if args.document_column_name is not None and args.block_size is None: + parser.error("--document-column-name requires --block-size to be specified.") + + if args.block_size is not None: + pretraining_kwargs = {} + if args.document_column_name is not None: + pretraining_kwargs["document_column_name"] = args.document_column_name + args.pretraining_config = PretrainingConfig( + block_size=args.block_size, **pretraining_kwargs + ) + else: + args.pretraining_config = None set_random_seed(args.seed) main(args) diff --git a/src/instructlab/training/sampler.py b/src/instructlab/training/sampler.py index be2b3f4b..35ba41bc 100644 --- a/src/instructlab/training/sampler.py +++ b/src/instructlab/training/sampler.py @@ -2,8 +2,10 @@ # Standard from typing import Optional +import logging # Third Party +from datasets import Dataset as HFDataset from datasets import load_dataset from torch.utils.data import DataLoader, Dataset, Sampler import numpy as np @@ -12,11 +14,14 @@ # First Party from instructlab.training.batch_packer import batch_lengths_to_minibatches_lpt +from instructlab.training.config import PretrainingConfig from instructlab.training.padded_batch_packer import ( batch_lengths_to_minibatches_padded, ) from instructlab.training.type_definitions import CollatedItem +logger = logging.getLogger(__name__) + class EpochSampler(Sampler): """ @@ -291,6 +296,99 @@ def __call__(self, batch: list[dict]): return all_minibatches +class PretrainingBlockDataset(Dataset): + """Dataset that concatenates documents and exposes fixed-size blocks.""" + + def __init__(self, dataset: HFDataset, block_size: int, pad_token_id: int): + if block_size <= 0: + raise ValueError(f"block_size must be positive, got {block_size}") + if "input_ids" not in dataset.column_names: + raise ValueError("Pretraining data must provide an 'input_ids' column.") + if pad_token_id < 0: + raise ValueError("pad_token_id must be a non-negative integer.") + + self.block_size = block_size + self.pad_token_id = pad_token_id + + all_input_ids: list[int] = [] + for sample in dataset: + ids = sample["input_ids"] + if isinstance(ids, torch.Tensor): + ids = ids.tolist() + all_input_ids.extend(ids) + + total_tokens = len(all_input_ids) + if total_tokens == 0: + raise ValueError("Pretraining dataset is empty after concatenation.") + + num_blocks, remainder = divmod(total_tokens, block_size) + if remainder: + num_blocks += 1 + + self.all_input_ids = all_input_ids + self.num_blocks = num_blocks + self.last_block_len = remainder if remainder else block_size + self.total_tokens = total_tokens + + logger.info( + "Pretraining dataset: %s tokens → %s block(s) (block_size=%s, remainder=%s)", + f"{total_tokens:,}", + f"{self.num_blocks:,}", + block_size, + remainder, + ) + + def __len__(self) -> int: + return self.num_blocks + + def __getitem__(self, index: int): + if index < 0 or index >= self.num_blocks: + raise IndexError( + f"Index {index} out of range for {self.num_blocks} blocks." + ) + + start = index * self.block_size + end = start + self.block_size + is_last_block = index == self.num_blocks - 1 + is_partial = is_last_block and self.last_block_len != self.block_size + + if is_partial: + actual_tokens = self.all_input_ids[start:] + actual_len = len(actual_tokens) + pad_len = self.block_size - actual_len + + input_ids = actual_tokens + [self.pad_token_id] * pad_len + labels = actual_tokens + [-100] * pad_len + loss_tokens = max(actual_len - 1, 0) + else: + input_ids = self.all_input_ids[start:end] + labels = list(input_ids) + loss_tokens = self.block_size - 1 + + return { + "input_ids": torch.tensor(input_ids, dtype=torch.long), + "labels": torch.tensor(labels, dtype=torch.long), + "len": self.block_size, + "num_loss_counted_tokens": loss_tokens, + } + + @classmethod + def from_jsonl_file( + cls, + data_path: str, + block_size: int, + pad_token_id: int, + ) -> "PretrainingBlockDataset": + dataset = load_dataset("json", data_files=data_path, split="train") + return cls(dataset, block_size, pad_token_id) + + def get_lengths(self) -> np.ndarray: + lengths = np.full(self.num_blocks, self.block_size, dtype=np.int64) + if self.num_blocks and self.last_block_len != self.block_size: + lengths[-1] = self.last_block_len + return lengths + + class TokenDataset(Dataset): """Dataset for loading tokenized data from JSONL files. @@ -346,6 +444,7 @@ def get_data_loader( num_workers: int = 0, flash_enabled: bool = True, pad_token_id: int = 0, + pretraining_config: Optional[PretrainingConfig] = None, ): """Create a data loader with epoch-based sampling and batch packing. @@ -360,11 +459,23 @@ def get_data_loader( num_workers: Number of data loading workers flash_enabled: Whether flash attention is enabled (affects collation strategy) pad_token_id: Token ID to use for padding (only used when flash_enabled=False) + pretraining_config: When provided, enables block-based pretraining dataset loading Returns: DataLoader configured with appropriate collator based on flash_enabled """ - dataset = TokenDataset(data_path) + if pretraining_config is not None: + dataset = PretrainingBlockDataset.from_jsonl_file( + data_path, pretraining_config.block_size, pad_token_id + ) + logger.info( + "Using pretraining dataset with block_size=%s and %s block(s)", + pretraining_config.block_size, + f"{len(dataset):,}", + ) + else: + dataset = TokenDataset(data_path) + sampler = EpochSampler(len(dataset), seed=seed) # Create unified collator with appropriate mode diff --git a/tests/unit/test_data_process.py b/tests/unit/test_data_process.py index d0d36f6c..c69cc9f8 100644 --- a/tests/unit/test_data_process.py +++ b/tests/unit/test_data_process.py @@ -67,7 +67,9 @@ def _mock_apply_chat_template( messages: t.List[Message], tokenize: bool = True, add_special_tokens: bool = True, - ) -> t.Union[str, t.List[int]]: + return_dict: bool = False, + **kwargs, + ) -> t.Union[str, t.List[int], t.Dict[str, t.Any]]: """Mock implementation of apply_chat_template.""" template_tokens = [] @@ -91,10 +93,14 @@ def _mock_apply_chat_template( ] template_tokens.extend(reasoning_tokens) - if tokenize: - return template_tokens - else: - return " ".join([f"token_{t}" for t in template_tokens]) + result = ( + template_tokens + if tokenize + else " ".join([f"token_{t}" for t in template_tokens]) + ) + if return_dict: + return {"input_ids": result} + return result def test_single_turn_assistant_only_content(self): """Test basic single-turn conversation with assistant content only.""" @@ -555,7 +561,9 @@ def _mock_apply_chat_template( messages: t.List[Message], tokenize: bool = True, add_special_tokens: bool = True, - ) -> t.Union[str, t.List[int]]: + return_dict: bool = False, + **kwargs, + ) -> t.Union[str, t.List[int], t.Dict[str, t.Any]]: """Mock implementation of apply_chat_template.""" template_str = "" for msg in messages: @@ -566,10 +574,14 @@ def _mock_apply_chat_template( template_str += msg["reasoning_content"] template_str += "\n" - if tokenize: - return [hash(template_str) % 1000 for _ in range(len(template_str.split()))] - else: - return template_str + result = ( + [hash(template_str) % 1000 for _ in range(len(template_str.split()))] + if tokenize + else template_str + ) + if return_dict: + return {"input_ids": result} + return result def test_wrap_masked_messages_with_reasoning_content(self): """Test that wrap_masked_messages correctly wraps both content and reasoning_content.""" diff --git a/tests/unit/test_pretraining_data_process.py b/tests/unit/test_pretraining_data_process.py new file mode 100644 index 00000000..e3651e00 --- /dev/null +++ b/tests/unit/test_pretraining_data_process.py @@ -0,0 +1,500 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for pretraining data processing functionality.""" + +# Standard +from unittest.mock import MagicMock, mock_open, patch +import json +import os +import tempfile + +# Third Party +from transformers import AutoTokenizer +import pytest + +# First Party +from instructlab.training.data_process import process_documents_for_pretraining + + +class TestProcessDocumentsForPretraining: + """Test suite for process_documents_for_pretraining function.""" + + @pytest.fixture + def mock_tokenizer(self): + """Mock AutoTokenizer with BOS/EOS behavior.""" + mock_tok = MagicMock() + mock_tok.bos_token_id = 1 + mock_tok.eos_token_id = 2 + + # Mock encode to add BOS automatically and generate predictable tokens + def mock_encode(text, add_special_tokens=True): + # Simple hash-based encoding for predictability + tokens = [hash(text) % 1000 + 100] + if add_special_tokens: + return [mock_tok.bos_token_id] + tokens + return tokens + + mock_tok.encode = mock_encode + return mock_tok + + @pytest.fixture + def temp_pretraining_jsonl(self, tmp_path): + """Create temp JSONL with 'documents' field.""" + data_file = tmp_path / "pretraining.jsonl" + samples = [ + {"documents": "This is document one."}, + {"documents": "This is document two with more text."}, + {"documents": "Short doc."}, + ] + + with open(data_file, "w") as f: + for sample in samples: + json.dump(sample, f) + f.write("\n") + + return str(data_file) + + @pytest.fixture + def temp_output_dir(self, tmp_path): + """Create temporary output directory.""" + output_dir = tmp_path / "output" + output_dir.mkdir() + return str(output_dir) + + @patch("instructlab.training.data_process.AutoTokenizer.from_pretrained") + @patch("instructlab.training.data_process.load_dataset") + def test_basic_tokenization_with_bos_eos( + self, + mock_load_dataset, + mock_from_pretrained, + mock_tokenizer, + temp_pretraining_jsonl, + temp_output_dir, + ): + """Verify basic tokenization adds BOS and EOS tokens correctly.""" + # Setup mocks + mock_from_pretrained.return_value = mock_tokenizer + + # Create mock dataset + mock_ds = MagicMock() + mock_ds.num_rows = 1 + mock_ds.column_names = ["documents"] + + # Mock single document + mock_ds.__iter__ = lambda self: iter([{"documents": "Test document"}]) + + # Create filtered dataset mock + filtered_ds = MagicMock() + filtered_ds.num_rows = 1 + filtered_ds.column_names = ["documents"] + + # Mock filter to return the filtered dataset + mock_ds.filter = MagicMock(return_value=filtered_ds) + + # Make map return a dataset with tokenized data + def map_side_effect(func, **kwargs): + result = func({"documents": "Test document"}) + mapped_ds = MagicMock() + mapped_ds.__getitem__ = lambda self, key: [result[key]] + mapped_ds.__len__ = lambda self: 1 + mapped_ds.to_json = MagicMock() + return mapped_ds + + filtered_ds.map = MagicMock(side_effect=map_side_effect) + mock_load_dataset.return_value = mock_ds + + # Run function + process_documents_for_pretraining( + data_path=temp_pretraining_jsonl, + data_output_path=temp_output_dir, + model_path="test-model", + num_cpu_procs=1, + document_column_name="documents", + ) + + # Verify tokenizer was loaded + mock_from_pretrained.assert_called_once_with("test-model") + + # Verify dataset filter and map were called + assert mock_ds.filter.called + + @patch("instructlab.training.data_process.AutoTokenizer.from_pretrained") + @patch("instructlab.training.data_process.load_dataset") + def test_multiple_documents_separate_records( + self, mock_load_dataset, mock_from_pretrained, mock_tokenizer, temp_output_dir + ): + """Ensure each document gets its own JSONL record.""" + # Setup + mock_from_pretrained.return_value = mock_tokenizer + + # Create mock dataset with 3 documents + mock_ds = MagicMock() + mock_ds.num_rows = 3 + mock_ds.column_names = ["documents"] + + docs = [{"documents": "Doc 1"}, {"documents": "Doc 2"}, {"documents": "Doc 3"}] + + # Create filtered dataset mock + filtered_ds = MagicMock() + filtered_ds.num_rows = 3 + filtered_ds.column_names = ["documents"] + + # Mock filter to return the filtered dataset + mock_ds.filter = MagicMock(return_value=filtered_ds) + + # Mock map to process all documents + def map_side_effect(func, **kwargs): + results = [func(doc) for doc in docs] + mapped_ds = MagicMock() + mapped_ds.__len__ = lambda self: len(results) + mapped_ds.__getitem__ = lambda self, key: [r[key] for r in results] + mapped_ds.to_json = MagicMock() + return mapped_ds + + filtered_ds.map = MagicMock(side_effect=map_side_effect) + mock_load_dataset.return_value = mock_ds + + # Run + process_documents_for_pretraining( + data_path="dummy.jsonl", + data_output_path=temp_output_dir, + model_path="test-model", + num_cpu_procs=1, + document_column_name="documents", + ) + + # Verify filter and map were called (which processes each document) + assert mock_ds.filter.called + + @patch("instructlab.training.data_process.load_dataset") + def test_empty_dataset_raises_error(self, mock_load_dataset, temp_output_dir): + """Validate error handling for empty input.""" + # Create empty dataset + mock_ds = MagicMock() + mock_ds.num_rows = 0 + mock_load_dataset.return_value = mock_ds + + # Should raise ValueError + with pytest.raises(ValueError, match="empty"): + process_documents_for_pretraining( + data_path="dummy.jsonl", + data_output_path=temp_output_dir, + model_path="test-model", + num_cpu_procs=1, + ) + + @patch("instructlab.training.data_process.load_dataset") + def test_missing_documents_field_raises_error( + self, mock_load_dataset, temp_output_dir + ): + """Validate schema enforcement.""" + # Create dataset with wrong field name + mock_ds = MagicMock() + mock_ds.num_rows = 1 + mock_ds.column_names = ["text"] # Wrong field name + mock_load_dataset.return_value = mock_ds + + # Should raise ValueError + with pytest.raises(ValueError, match="must have.*field"): + process_documents_for_pretraining( + data_path="dummy.jsonl", + data_output_path=temp_output_dir, + model_path="test-model", + num_cpu_procs=1, + document_column_name="documents", + ) + + @patch("instructlab.training.data_process.AutoTokenizer.from_pretrained") + @patch("instructlab.training.data_process.load_dataset") + def test_tokenizer_without_eos_raises_error( + self, mock_load_dataset, mock_from_pretrained, temp_output_dir + ): + """Validate tokenizer requirements.""" + # Create valid dataset + mock_ds = MagicMock() + mock_ds.num_rows = 1 + mock_ds.column_names = ["documents"] + mock_load_dataset.return_value = mock_ds + + # Create tokenizer without EOS token + mock_tok = MagicMock(spec=AutoTokenizer) + mock_tok.eos_token_id = None # No EOS token + mock_from_pretrained.return_value = mock_tok + + # Should raise ValueError + with pytest.raises(ValueError, match="must have an EOS token"): + process_documents_for_pretraining( + data_path="dummy.jsonl", + data_output_path=temp_output_dir, + model_path="test-model", + num_cpu_procs=1, + document_column_name="documents", + ) + + @patch("instructlab.training.data_process.logger") + @patch("instructlab.training.data_process.AutoTokenizer.from_pretrained") + @patch("instructlab.training.data_process.load_dataset") + def test_statistics_logging( + self, + mock_load_dataset, + mock_from_pretrained, + mock_logger, + mock_tokenizer, + temp_output_dir, + ): + """Verify statistics are calculated correctly.""" + # Setup + mock_from_pretrained.return_value = mock_tokenizer + + # Create dataset with known token counts + mock_ds = MagicMock() + mock_ds.num_rows = 2 + mock_ds.column_names = ["documents"] + + # Create filtered dataset mock + filtered_ds = MagicMock() + filtered_ds.num_rows = 2 + filtered_ds.column_names = ["documents"] + + # Mock filter to return the filtered dataset + mock_ds.filter = MagicMock(return_value=filtered_ds) + + # Mock map to return known lengths + def map_side_effect(func, **kwargs): + # Simulate 2 documents with 5 and 10 tokens each + mapped_ds = MagicMock() + mapped_ds.__getitem__ = lambda self, key: [5, 10] if key == "len" else None + mapped_ds.__len__ = lambda self: 2 + mapped_ds.to_json = MagicMock() + return mapped_ds + + filtered_ds.map = MagicMock(side_effect=map_side_effect) + mock_load_dataset.return_value = mock_ds + + # Run + process_documents_for_pretraining( + data_path="dummy.jsonl", + data_output_path=temp_output_dir, + model_path="test-model", + num_cpu_procs=1, + document_column_name="documents", + ) + + # Verify logging was called (check info was called multiple times) + assert mock_logger.info.call_count >= 3 + + @patch("instructlab.training.data_process.AutoTokenizer.from_pretrained") + @patch("instructlab.training.data_process.load_dataset") + def test_parallel_processing( + self, mock_load_dataset, mock_from_pretrained, mock_tokenizer, temp_output_dir + ): + """Ensure num_cpu_procs parameter works.""" + # Setup + mock_from_pretrained.return_value = mock_tokenizer + + mock_ds = MagicMock() + mock_ds.num_rows = 1 + mock_ds.column_names = ["documents"] + + # Create filtered dataset mock + filtered_ds = MagicMock() + filtered_ds.num_rows = 1 + filtered_ds.column_names = ["documents"] + + # Mock filter to return the filtered dataset + mock_ds.filter = MagicMock(return_value=filtered_ds) + + def map_side_effect(func, **kwargs): + mapped_ds = MagicMock() + mapped_ds.__len__ = lambda self: 1 + mapped_ds.__getitem__ = lambda self, key: [10] if key == "len" else None + mapped_ds.to_json = MagicMock() + return mapped_ds + + filtered_ds.map = MagicMock(side_effect=map_side_effect) + mock_load_dataset.return_value = mock_ds + + # Run with specific num_cpu_procs + process_documents_for_pretraining( + data_path="dummy.jsonl", + data_output_path=temp_output_dir, + model_path="test-model", + num_cpu_procs=4, + document_column_name="documents", + ) + + # Verify filter was called with num_proc=4 + filter_call_args = mock_ds.filter.call_args + assert filter_call_args[1]["num_proc"] == 4 + + # Verify map was also called with num_proc=4 + map_call_args = filtered_ds.map.call_args + assert map_call_args[1]["num_proc"] == 4 + + def test_output_directory_creation(self, tmp_path, mock_tokenizer): + """Verify directory is created if it doesn't exist.""" + # Use non-existent output path + output_dir = tmp_path / "nonexistent" / "nested" / "dir" + + with patch( + "instructlab.training.data_process.AutoTokenizer.from_pretrained" + ) as mock_from_pretrained: + with patch( + "instructlab.training.data_process.load_dataset" + ) as mock_load_dataset: + mock_from_pretrained.return_value = mock_tokenizer + + mock_ds = MagicMock() + mock_ds.num_rows = 1 + mock_ds.column_names = ["documents"] + + # Create filtered dataset mock + filtered_ds = MagicMock() + filtered_ds.num_rows = 1 + filtered_ds.column_names = ["documents"] + + # Mock filter to return the filtered dataset + mock_ds.filter = MagicMock(return_value=filtered_ds) + + def map_side_effect(func, **kwargs): + mapped_ds = MagicMock() + mapped_ds.__len__ = lambda self: 1 + mapped_ds.__getitem__ = ( + lambda self, key: [10] if key == "len" else None + ) + mapped_ds.to_json = MagicMock() + return mapped_ds + + filtered_ds.map = MagicMock(side_effect=map_side_effect) + mock_load_dataset.return_value = mock_ds + + # Run + process_documents_for_pretraining( + data_path="dummy.jsonl", + data_output_path=str(output_dir), + model_path="test-model", + num_cpu_procs=1, + document_column_name="documents", + ) + + # Verify directory was created + assert output_dir.exists() + + @patch("instructlab.training.data_process.AutoTokenizer.from_pretrained") + @patch("instructlab.training.data_process.load_dataset") + def test_output_jsonl_format( + self, mock_load_dataset, mock_from_pretrained, mock_tokenizer, temp_output_dir + ): + """Validate JSONL output format.""" + # Setup + mock_from_pretrained.return_value = mock_tokenizer + + mock_ds = MagicMock() + mock_ds.num_rows = 1 + mock_ds.column_names = ["documents"] + + # Create filtered dataset mock + filtered_ds = MagicMock() + filtered_ds.num_rows = 1 + filtered_ds.column_names = ["documents"] + + # Mock filter to return the filtered dataset + mock_ds.filter = MagicMock(return_value=filtered_ds) + + # Track what gets written + output_file_path = None + + def map_side_effect(func, **kwargs): + result = func({"documents": "Test"}) + mapped_ds = MagicMock() + mapped_ds.__len__ = lambda self: 1 + mapped_ds.__getitem__ = lambda self, key: [result[key]] + + def to_json_side_effect(path, **kw): + nonlocal output_file_path + output_file_path = path + # Write actual JSON to verify format + with open(path, "w") as f: + json.dump(result, f) + f.write("\n") + + mapped_ds.to_json = to_json_side_effect + return mapped_ds + + filtered_ds.map = MagicMock(side_effect=map_side_effect) + mock_load_dataset.return_value = mock_ds + + # Run + process_documents_for_pretraining( + data_path="dummy.jsonl", + data_output_path=temp_output_dir, + model_path="test-model", + num_cpu_procs=1, + document_column_name="documents", + ) + + # Verify file was created + assert output_file_path is not None + assert os.path.exists(output_file_path) + + # Verify format + with open(output_file_path, "r") as f: + line = f.readline() + data = json.loads(line) + + # Should have input_ids and len fields + assert "input_ids" in data + assert "len" in data + + # Should NOT have labels field + assert "labels" not in data + + # input_ids should be a list starting with BOS + assert isinstance(data["input_ids"], list) + assert data["input_ids"][0] == 1 # BOS token + assert data["input_ids"][-1] == 2 # EOS token + + @pytest.mark.slow + def test_integration_with_real_tokenizer(self, temp_output_dir): + """Integration test with actual GPT2 tokenizer.""" + # Create real input file + input_file = os.path.join(temp_output_dir, "input.jsonl") + with open(input_file, "w") as f: + json.dump( + {"documents": "This is a test document for GPT2 tokenization."}, f + ) + f.write("\n") + + # Run with real tokenizer + process_documents_for_pretraining( + data_path=input_file, + data_output_path=temp_output_dir, + model_path="gpt2", + num_cpu_procs=1, + document_column_name="documents", + ) + + # Verify output + output_file = os.path.join(temp_output_dir, "data.jsonl") + assert os.path.exists(output_file) + + with open(output_file, "r") as f: + line = f.readline() + data = json.loads(line) + + # Verify structure + assert "input_ids" in data + assert "len" in data + assert len(data["input_ids"]) == data["len"] + + # Load tokenizer to verify tokens + tokenizer = AutoTokenizer.from_pretrained("gpt2") + + # Verify EOS is present at the end + # Note: GPT2's encode() with add_special_tokens=True doesn't add BOS + # (GPT2 uses the same token for BOS and EOS) + # The implementation manually appends EOS if not present + assert data["input_ids"][-1] == tokenizer.eos_token_id + + # Verify token count is reasonable (should have content tokens + EOS) + assert data["len"] > 5 diff --git a/tests/unit/test_pretraining_mode.py b/tests/unit/test_pretraining_mode.py new file mode 100644 index 00000000..92056eac --- /dev/null +++ b/tests/unit/test_pretraining_mode.py @@ -0,0 +1,160 @@ +# SPDX-License-Identifier: Apache-2.0 + +# Standard +from pathlib import Path +from unittest.mock import patch +import json +import os +import tempfile +import unittest + +# Third Party +from datasets import Dataset as HFDataset +import torch + +# First Party +from instructlab.training.config import PretrainingConfig +from instructlab.training.data_process import process_documents_for_pretraining +from instructlab.training.sampler import ( + PretrainingBlockDataset, + get_data_loader, +) + + +class TestPretrainingBlockDataset(unittest.TestCase): + """Tests for the PretrainingBlockDataset behavior.""" + + def test_blocks_are_padded_and_loss_counts_tracked(self): + dataset = HFDataset.from_dict({"input_ids": [[1, 2, 3], [4, 5, 6, 7]]}) + block_ds = PretrainingBlockDataset(dataset, block_size=4, pad_token_id=0) + + self.assertEqual(len(block_ds), 2) + + first = block_ds[0] + self.assertTrue( + torch.equal( + first["input_ids"], torch.tensor([1, 2, 3, 4], dtype=torch.long) + ) + ) + self.assertTrue( + torch.equal(first["labels"], torch.tensor([1, 2, 3, 4], dtype=torch.long)) + ) + self.assertEqual(first["num_loss_counted_tokens"], 3) + self.assertEqual(first["len"], 4) + + second = block_ds[1] + self.assertTrue( + torch.equal( + second["input_ids"], torch.tensor([5, 6, 7, 0], dtype=torch.long) + ) + ) + self.assertTrue( + torch.equal( + second["labels"], torch.tensor([5, 6, 7, -100], dtype=torch.long) + ) + ) + self.assertEqual(second["num_loss_counted_tokens"], 2) + self.assertEqual(second["len"], 4) + + lengths = block_ds.get_lengths() + self.assertEqual(lengths.tolist(), [4, 3]) + + +class TestPretrainingDataLoader(unittest.TestCase): + """Tests for the pretraining-specific data loader integration.""" + + def test_pretraining_loader_returns_packed_batches(self): + cfg = PretrainingConfig(block_size=4) + + with tempfile.TemporaryDirectory() as tmpdir: + data_path = Path(tmpdir) / "data.jsonl" + records = [ + {"input_ids": [1, 2, 3, 4]}, + {"input_ids": [5, 6, 7, 8]}, + ] + with data_path.open("w", encoding="utf-8") as fh: + for record in records: + fh.write(json.dumps(record) + "\n") + + loader = get_data_loader( + data_path=str(data_path), + batch_size=2, + max_tokens_per_gpu=8, + seed=42, + rank=0, + world_size=1, + pretraining_config=cfg, + pad_token_id=0, + ) + + self.assertIsInstance(loader.dataset, PretrainingBlockDataset) + self.assertEqual(len(loader.dataset), 2) + + step = next(iter(loader)) + self.assertIsInstance(step, list) + self.assertEqual(len(step), 1) + + microbatch = step[0] + self.assertIn("input_ids", microbatch) + self.assertTrue(torch.is_tensor(microbatch["input_ids"])) + self.assertEqual(microbatch["input_ids"].shape, (1, 8)) + self.assertEqual(microbatch["num_samples"], 2) + self.assertEqual(microbatch["num_loss_counted_tokens"], 6) + self.assertEqual(microbatch["batch_num_loss_counted_tokens"], 6) + + +class TestPretrainingDataProcessing(unittest.TestCase): + """Tests for the pretraining data processing pipeline.""" + + def test_process_documents_for_pretraining_outputs_expected_fields(self): + class StubTokenizer: + eos_token_id = 999 + + def encode(self, text, add_special_tokens=True): + base = [ord(ch) % 50 + 1 for ch in text] + return base if add_special_tokens else base[1:] + + documents = [ + {"document": "alpha"}, + {"document": "beta"}, + ] + + with tempfile.TemporaryDirectory() as tmpdir: + data_path = Path(tmpdir) / "raw.jsonl" + with data_path.open("w", encoding="utf-8") as fh: + for record in documents: + fh.write(json.dumps(record) + "\n") + + output_dir = Path(tmpdir) / "processed" + + with patch( + "instructlab.training.data_process.AutoTokenizer.from_pretrained", + return_value=StubTokenizer(), + ) as mock_auto: + process_documents_for_pretraining( + data_path=str(data_path), + data_output_path=str(output_dir), + model_path="stub-model", + num_cpu_procs=1, + ) + + mock_auto.assert_called_once_with("stub-model") + + output_file = output_dir / "data.jsonl" + self.assertTrue(output_file.exists()) + + with output_file.open("r", encoding="utf-8") as fh: + rows = [json.loads(line) for line in fh if line.strip()] + + self.assertEqual(len(rows), len(documents)) + for row in rows: + self.assertIn("input_ids", row) + self.assertIn("len", row) + self.assertIsInstance(row["input_ids"], list) + self.assertIsInstance(row["len"], int) + self.assertEqual(len(row["input_ids"]), row["len"]) + self.assertEqual(row["input_ids"][-1], StubTokenizer.eos_token_id) + + +if __name__ == "__main__": # pragma: no cover + unittest.main() diff --git a/tests/unit/test_pretraining_sampler.py b/tests/unit/test_pretraining_sampler.py new file mode 100644 index 00000000..76b4b16d --- /dev/null +++ b/tests/unit/test_pretraining_sampler.py @@ -0,0 +1,516 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for pretraining sampler functionality.""" + +# Standard +from unittest.mock import MagicMock, patch +import json + +# Third Party +import pytest +import torch + +# First Party +from instructlab.training.config import PretrainingConfig +from instructlab.training.sampler import PretrainingBlockDataset, get_data_loader + + +class TestPretrainingBlockDataset: + """Test suite for PretrainingBlockDataset class.""" + + @pytest.fixture + def sample_pretraining_data(self): + """Sample tokenized data (14 total tokens).""" + return [ + {"input_ids": [1, 2, 3, 4, 5], "len": 5}, + {"input_ids": [6, 7, 8, 9, 10, 11], "len": 6}, + {"input_ids": [12, 13, 14], "len": 3}, + ] + + @pytest.fixture + def mock_hf_dataset(self, sample_pretraining_data): + """Mock HuggingFace dataset.""" + mock_ds = MagicMock() + mock_ds.column_names = ["input_ids", "len"] + mock_ds.__len__ = lambda self: len(sample_pretraining_data) + mock_ds.__iter__ = lambda self: iter(sample_pretraining_data) + return mock_ds + + def test_dataset_initialization(self, mock_hf_dataset): + """Test basic initialization of PretrainingBlockDataset.""" + dataset = PretrainingBlockDataset( + dataset=mock_hf_dataset, block_size=5, pad_token_id=0 + ) + + # Verify basic attributes + assert dataset.block_size == 5 + assert dataset.pad_token_id == 0 + assert dataset.num_blocks == 3 # 14 tokens / 5 = 2 complete + 1 partial + assert dataset.last_block_len == 4 # 14 % 5 = 4 + assert len(dataset.all_input_ids) == 14 # All tokens concatenated + + def test_concatenation_of_documents(self, mock_hf_dataset): + """Verify documents are concatenated in the correct order.""" + dataset = PretrainingBlockDataset( + dataset=mock_hf_dataset, block_size=5, pad_token_id=0 + ) + + # Check concatenation order + expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14] + assert dataset.all_input_ids == expected + + def test_num_blocks_calculation_with_partial(self, mock_hf_dataset): + """Test num_blocks calculation with partial block.""" + dataset = PretrainingBlockDataset( + dataset=mock_hf_dataset, block_size=5, pad_token_id=0 + ) + + # 14 tokens / 5 = 2 complete + 1 partial + assert dataset.num_blocks == 3 + assert dataset.last_block_len == 4 + + def test_num_blocks_calculation_exact_multiple(self, sample_pretraining_data): + """Test num_blocks calculation when tokens exactly divide by block_size.""" + # Add one more token to make it 15 (exact multiple of 5) + data = sample_pretraining_data + [{"input_ids": [15], "len": 1}] + + mock_ds = MagicMock() + mock_ds.column_names = ["input_ids", "len"] + mock_ds.__len__ = lambda self: len(data) + mock_ds.__iter__ = lambda self: iter(data) + + dataset = PretrainingBlockDataset(dataset=mock_ds, block_size=5, pad_token_id=0) + + # 15 tokens / 5 = 3 complete blocks + assert dataset.num_blocks == 3 + assert dataset.last_block_len == 5 # Last block is complete + + def test_getitem_complete_block(self, mock_hf_dataset): + """Test __getitem__ for a complete block.""" + dataset = PretrainingBlockDataset( + dataset=mock_hf_dataset, block_size=5, pad_token_id=0 + ) + + # Get first block (indices 0-4) + block = dataset[0] + + assert block["input_ids"].shape == (5,) + assert block["labels"].shape == (5,) + assert block["len"] == 5 + assert block["num_loss_counted_tokens"] == 4 # block_size - 1 (causal shift) + + # Check actual token values + assert torch.equal( + block["input_ids"], torch.tensor([1, 2, 3, 4, 5], dtype=torch.long) + ) + assert torch.equal( + block["labels"], torch.tensor([1, 2, 3, 4, 5], dtype=torch.long) + ) + + def test_getitem_partial_block_with_padding(self, mock_hf_dataset): + """Test __getitem__ for partial last block with padding.""" + dataset = PretrainingBlockDataset( + dataset=mock_hf_dataset, block_size=5, pad_token_id=0 + ) + + # Get last block (index 2) - should have 4 real tokens + 1 padding + block = dataset[2] + + assert block["input_ids"].shape == (5,) + assert block["labels"].shape == (5,) + assert block["len"] == 5 + + # 4 real tokens - 1 for causal shift = 3 + assert block["num_loss_counted_tokens"] == 3 + + # Last token should be pad_token_id (0) + assert block["input_ids"][-1].item() == 0 + + # Last label should be masked (-100) + assert block["labels"][-1].item() == -100 + + # First 4 tokens should be real data [11, 12, 13, 14] from position 10-13 + assert block["input_ids"][0].item() == 11 + assert block["input_ids"][1].item() == 12 + assert block["input_ids"][2].item() == 13 + assert block["input_ids"][3].item() == 14 + + def test_labels_are_copy_not_reference(self, mock_hf_dataset): + """Test that labels are a copy, not a reference to input_ids.""" + dataset = PretrainingBlockDataset( + dataset=mock_hf_dataset, block_size=5, pad_token_id=0 + ) + + block = dataset[0] + + # Tensors should not be the same object + assert block["input_ids"] is not block["labels"] + + # But values should be equal for complete blocks + assert torch.equal(block["input_ids"], block["labels"]) + + # Modify labels to verify they're independent + original_labels = block["labels"].clone() + block["labels"][0] = 999 + + # input_ids should remain unchanged + assert block["input_ids"][0].item() != 999 + assert block["input_ids"][0].item() == 1 + + def test_num_loss_counted_tokens_complete_block(self): + """Test num_loss_counted_tokens for complete blocks with various block sizes.""" + for block_size in [5, 10, 20]: + # Create data with at least 2 complete blocks + num_tokens = block_size * 2 + data = [{"input_ids": list(range(num_tokens)), "len": num_tokens}] + + mock_ds = MagicMock() + mock_ds.column_names = ["input_ids", "len"] + mock_ds.__len__ = lambda self: len(data) + mock_ds.__iter__ = lambda self: iter(data) + + dataset = PretrainingBlockDataset( + dataset=mock_ds, block_size=block_size, pad_token_id=0 + ) + + # Check first complete block + block = dataset[0] + assert block["num_loss_counted_tokens"] == block_size - 1 + + def test_num_loss_counted_tokens_partial_block(self, mock_hf_dataset): + """Test num_loss_counted_tokens for partial blocks.""" + dataset = PretrainingBlockDataset( + dataset=mock_hf_dataset, block_size=5, pad_token_id=0 + ) + + # Last block has 4 real tokens + block = dataset[2] + + # Should be actual_length - 1 = 4 - 1 = 3 + assert block["num_loss_counted_tokens"] == 3 + + def test_index_out_of_range(self, mock_hf_dataset): + """Test that accessing beyond num_blocks raises IndexError.""" + dataset = PretrainingBlockDataset( + dataset=mock_hf_dataset, block_size=5, pad_token_id=0 + ) + + # Try to access block beyond num_blocks (which is 3) + with pytest.raises(IndexError) as exc_info: + _ = dataset[3] + + assert "out of range" in str(exc_info.value).lower() + + def test_missing_input_ids_field_raises_error(self): + """Test that missing input_ids field raises ValueError.""" + # Create dataset without input_ids field + mock_ds = MagicMock() + mock_ds.column_names = ["len"] # Missing input_ids + + with pytest.raises(ValueError) as exc_info: + _ = PretrainingBlockDataset(dataset=mock_ds, block_size=5, pad_token_id=0) + + assert "input_ids" in str(exc_info.value) + + def test_tensor_dtype_correct(self, mock_hf_dataset): + """Test that all tensors use torch.long dtype.""" + dataset = PretrainingBlockDataset( + dataset=mock_hf_dataset, block_size=5, pad_token_id=0 + ) + + block = dataset[0] + + assert block["input_ids"].dtype == torch.long + assert block["labels"].dtype == torch.long + + +class TestGetDataLoaderPretraining: + """Test suite for get_data_loader with pretraining mode.""" + + @pytest.fixture + def temp_pretraining_file(self, tmp_path): + """Create temp JSONL with pretraining data.""" + data_file = tmp_path / "pretraining_data.jsonl" + samples = [ + {"input_ids": list(range(100, 150)), "len": 50}, + {"input_ids": list(range(200, 280)), "len": 80}, + {"input_ids": list(range(300, 370)), "len": 70}, + ] + + with open(data_file, "w") as f: + for sample in samples: + json.dump(sample, f) + f.write("\n") + + return str(data_file) + + @patch("instructlab.training.sampler.load_dataset") + def test_pretraining_mode_creates_block_dataset( + self, mock_load_dataset, temp_pretraining_file + ): + """Test that is_pretraining=True creates PretrainingBlockDataset.""" + # Create mock dataset + mock_ds = MagicMock() + mock_ds.column_names = ["input_ids", "len"] + mock_ds.__len__ = lambda self: 3 + mock_ds.__iter__ = lambda self: iter( + [ + {"input_ids": [1, 2, 3], "len": 3}, + {"input_ids": [4, 5, 6], "len": 3}, + {"input_ids": [7, 8, 9], "len": 3}, + ] + ) + mock_load_dataset.return_value = mock_ds + + # Call with pretraining mode + loader = get_data_loader( + data_path=temp_pretraining_file, + batch_size=2, + max_tokens_per_gpu=100, + seed=42, + rank=0, + world_size=1, + pretraining_config=PretrainingConfig(block_size=128), + ) + + # Verify load_dataset was called + mock_load_dataset.assert_called_once() + + # Verify dataset is PretrainingBlockDataset + assert isinstance(loader.dataset, PretrainingBlockDataset) + + def test_instruction_tuning_mode_creates_token_dataset(self, temp_pretraining_file): + """Test that is_pretraining=False uses TokenDataset.""" + # Create a valid instruction tuning JSONL file + # Standard + from pathlib import Path + + inst_file = Path(temp_pretraining_file).parent / "inst_data.jsonl" + samples = [ + {"input_ids": [1, 2, 3], "labels": [1, 2, 3], "len": 3}, + {"input_ids": [4, 5, 6], "labels": [4, 5, 6], "len": 3}, + ] + with open(inst_file, "w") as f: + for sample in samples: + json.dump(sample, f) + f.write("\n") + + # Call with instruction tuning mode (default) + loader = get_data_loader( + data_path=str(inst_file), + batch_size=2, + max_tokens_per_gpu=100, + seed=42, + rank=0, + world_size=1, + pretraining_config=None, + ) + + # Verify dataset is TokenDataset (not PretrainingBlockDataset) + # First Party + from instructlab.training.sampler import TokenDataset + + assert isinstance(loader.dataset, TokenDataset) + assert not isinstance(loader.dataset, PretrainingBlockDataset) + + @patch("instructlab.training.sampler.load_dataset") + def test_pretraining_block_size_parameter( + self, mock_load_dataset, temp_pretraining_file + ): + """Test that block_size parameter is correctly passed.""" + # Create mock dataset + mock_ds = MagicMock() + mock_ds.column_names = ["input_ids", "len"] + mock_ds.__len__ = lambda self: 1 + mock_ds.__iter__ = lambda self: iter( + [{"input_ids": list(range(100)), "len": 100}] + ) + mock_load_dataset.return_value = mock_ds + + # Call with specific block_size + block_size = 256 + loader = get_data_loader( + data_path=temp_pretraining_file, + batch_size=2, + max_tokens_per_gpu=1000, + seed=42, + rank=0, + world_size=1, + pretraining_config=PretrainingConfig(block_size=block_size), + ) + + # Verify dataset has correct block_size + assert loader.dataset.block_size == block_size + + @patch("instructlab.training.sampler.load_dataset") + def test_pretraining_pad_token_id_used( + self, mock_load_dataset, temp_pretraining_file + ): + """Test that pad_token_id is correctly passed to PretrainingBlockDataset.""" + # Create mock dataset + mock_ds = MagicMock() + mock_ds.column_names = ["input_ids", "len"] + mock_ds.__len__ = lambda self: 1 + mock_ds.__iter__ = lambda self: iter( + [{"input_ids": list(range(10)), "len": 10}] + ) + mock_load_dataset.return_value = mock_ds + + # Call with specific pad_token_id + pad_token_id = 99 + loader = get_data_loader( + data_path=temp_pretraining_file, + batch_size=2, + max_tokens_per_gpu=100, + seed=42, + rank=0, + world_size=1, + pretraining_config=PretrainingConfig( + block_size=7 + ), # Will create partial block + pad_token_id=pad_token_id, + ) + + # Verify dataset has correct pad_token_id + assert loader.dataset.pad_token_id == pad_token_id + + @patch("instructlab.training.sampler.load_dataset") + def test_data_loader_returns_correct_structure( + self, mock_load_dataset, temp_pretraining_file + ): + """Test that get_data_loader returns a properly configured DataLoader.""" + # Create mock dataset + mock_ds = MagicMock() + mock_ds.column_names = ["input_ids", "len"] + mock_ds.__len__ = lambda self: 2 + mock_ds.__iter__ = lambda self: iter( + [ + {"input_ids": list(range(50)), "len": 50}, + {"input_ids": list(range(50, 100)), "len": 50}, + ] + ) + mock_load_dataset.return_value = mock_ds + + # Call get_data_loader + loader = get_data_loader( + data_path=temp_pretraining_file, + batch_size=2, + max_tokens_per_gpu=100, + seed=42, + rank=0, + world_size=1, + pretraining_config=PretrainingConfig(block_size=25), + ) + + # Verify it's a DataLoader + # Third Party + from torch.utils.data import DataLoader + + assert isinstance(loader, DataLoader) + + # Verify batch_size + assert loader.batch_size == 2 + + @patch("instructlab.training.sampler.load_dataset") + def test_epoch_sampler_created(self, mock_load_dataset, temp_pretraining_file): + """Test that EpochSampler is created with correct parameters.""" + # Create mock dataset with known length + mock_ds = MagicMock() + mock_ds.column_names = ["input_ids", "len"] + mock_ds.__len__ = lambda self: 1 + mock_ds.__iter__ = lambda self: iter( + [{"input_ids": list(range(100)), "len": 100}] + ) + mock_load_dataset.return_value = mock_ds + + seed = 123 + block_size = 25 + + loader = get_data_loader( + data_path=temp_pretraining_file, + batch_size=2, + max_tokens_per_gpu=100, + seed=seed, + rank=0, + world_size=1, + pretraining_config=PretrainingConfig(block_size=block_size), + ) + + # Verify sampler is EpochSampler + # First Party + from instructlab.training.sampler import EpochSampler + + assert isinstance(loader.sampler, EpochSampler) + + # Verify seed is set correctly + assert loader.sampler.seed == seed + + @patch("instructlab.training.sampler.load_dataset") + def test_collator_configuration(self, mock_load_dataset, temp_pretraining_file): + """Test that MaxTokensPerRankCollator is configured correctly.""" + # Create mock dataset + mock_ds = MagicMock() + mock_ds.column_names = ["input_ids", "len"] + mock_ds.__len__ = lambda self: 1 + mock_ds.__iter__ = lambda self: iter( + [{"input_ids": list(range(50)), "len": 50}] + ) + mock_load_dataset.return_value = mock_ds + + flash_enabled = False + pad_token_id = 42 + max_tokens = 200 + + loader = get_data_loader( + data_path=temp_pretraining_file, + batch_size=2, + max_tokens_per_gpu=max_tokens, + seed=42, + rank=0, + world_size=1, + pretraining_config=PretrainingConfig(block_size=25), + flash_enabled=flash_enabled, + pad_token_id=pad_token_id, + ) + + # Verify collate_fn is MaxTokensPerRankCollator + # First Party + from instructlab.training.sampler import MaxTokensPerRankCollator + + assert isinstance(loader.collate_fn, MaxTokensPerRankCollator) + + # Verify collator configuration + assert loader.collate_fn.max_tokens_per_rank == max_tokens + assert loader.collate_fn.flash_enabled == flash_enabled + assert loader.collate_fn.pad_token_id == pad_token_id + + @patch("instructlab.training.sampler.load_dataset") + def test_num_workers_parameter(self, mock_load_dataset, temp_pretraining_file): + """Test that num_workers parameter is correctly applied.""" + # Create mock dataset + mock_ds = MagicMock() + mock_ds.column_names = ["input_ids", "len"] + mock_ds.__len__ = lambda self: 1 + mock_ds.__iter__ = lambda self: iter( + [{"input_ids": list(range(50)), "len": 50}] + ) + mock_load_dataset.return_value = mock_ds + + num_workers = 4 + + loader = get_data_loader( + data_path=temp_pretraining_file, + batch_size=2, + max_tokens_per_gpu=100, + seed=42, + rank=0, + world_size=1, + pretraining_config=PretrainingConfig(block_size=25), + num_workers=num_workers, + ) + + # Verify num_workers is set + assert loader.num_workers == num_workers + + # When num_workers > 0, persistent_workers should be True + assert loader.persistent_workers == True diff --git a/tox.ini b/tox.ini index 0794c417..8ce92518 100644 --- a/tox.ini +++ b/tox.ini @@ -104,6 +104,7 @@ deps = types-tqdm types-PyYAML pytest + pydantic commands = mypy {posargs:src}