From 7749b4d87f3d21bd162b9c9e533020d6caed2a62 Mon Sep 17 00:00:00 2001 From: Patrick Date: Sat, 16 Aug 2025 10:01:41 +0200 Subject: [PATCH 01/13] introducing GenericPipelineRow --- datafast/schema/data_rows.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/datafast/schema/data_rows.py b/datafast/schema/data_rows.py index 922a1e0..29c07d9 100644 --- a/datafast/schema/data_rows.py +++ b/datafast/schema/data_rows.py @@ -103,5 +103,31 @@ class PreferenceRow(BaseModel): chosen_response_assessment: str | None = None rejected_response_assessment: str | None = None + uuid: UUID = Field(default_factory=uuid4) + metadata: dict[str, str] = Field(default_factory=dict) + + +class GenericPipelineSource(str, Enum): + SYNTHETIC = "synthetic" + VERIFIED = "verified" + HUMAN = "human" + CONSENSUS = "consensus" + + +class GenericPipelineRow(BaseModel): + """Row for storing generic pipeline processing results.""" + + # Input data (forwarded from source) + input_data: dict[str, str] = Field(default_factory=dict) + + # Generated outputs + generated_outputs: dict[str, str] = Field(default_factory=dict) + + # Processing metadata + model_id: str | None = None + pipeline_source: GenericPipelineSource = GenericPipelineSource.SYNTHETIC + language: str | None = None + + # System fields uuid: UUID = Field(default_factory=uuid4) metadata: dict[str, str] = Field(default_factory=dict) \ No newline at end of file From 60a87b0fd60905c80f28d0696e9fb843fd87b269 Mon Sep 17 00:00:00 2001 From: Patrick Date: Sat, 16 Aug 2025 10:02:27 +0200 Subject: [PATCH 02/13] Introducing a draft config for the new dataset pipeline: GenericPipelineDatasetConfig --- datafast/schema/config.py | 138 +++++++++++++++++++++++++++++++++++++- 1 file changed, 137 insertions(+), 1 deletion(-) diff --git a/datafast/schema/config.py b/datafast/schema/config.py index fb7f736..cc9d907 100644 --- a/datafast/schema/config.py +++ b/datafast/schema/config.py @@ -1,5 +1,5 @@ from pydantic import BaseModel, Field, field_validator, model_validator -from typing import Optional +from typing import Optional, Callable, Any import warnings @@ -536,3 +536,139 @@ def validate_judge_prompt(cls, v, info): required_placeholders = ["{document}", "{question}", "{response}"] return validate_prompt_placeholders(v, required_placeholders, "judge_prompt") + + +class GenericPipelineDatasetConfig(BaseModel): + """ + Configuration for generic pipeline dataset generation. + + This config allows processing any dataset with custom prompts and flexible column mapping. + Supports both Hugging Face datasets and local files (CSV, TXT, PARQUET, JSONL). + """ + dataset_type: str = Field(default="generic_pipeline_dataset") + + # Dataset source information + hf_dataset_name: str | None = Field( + default=None, + description="Name of a Hugging Face dataset to use as data source" + ) + + local_file_path: str | None = Field( + default=None, + description="Path to a local file (CSV, TXT, PARQUET, or JSONL) to use as data source" + ) + + input_columns: list[str] = Field( + description="List of column names to use as input for the processing pipeline" + ) + + forward_columns: list[str] | None = Field( + default=None, + description="List of column names to forward to the output" + ) + + output_columns: list[str] | None = Field( + default=None, + description="List of column names to use as output for the pipeline" + ) + + prompts: list[str] = Field( + description="List of custom prompt templates" + ) + + num_samples_per_prompt: int = Field( + default=1, + description="Number of samples to generate for each input" + ) + + # Where to save the output + output_file: str = Field( + default="generic_pipeline_dataset.jsonl", + description="Path to save generic pipeline dataset results" + ) + + skip_function: Callable[[dict[str, Any]], bool] | None = Field( + default=None, + description="Optional function that takes a dataset row and returns True if the row should be skipped" + ) + + sample_count: int | None = Field( + default=None, + description="Optional number of samples to process from the dataset" + ) + + # Standard config options + expansion: PromptExpansionConfig = PromptExpansionConfig() + + languages: dict[str, str] = Field( + default={"en": "English"}, + description="Language ISO codes and their corresponding names" + ) + + + @field_validator("input_columns") + def validate_input_columns(cls, v): + if not v or len(v) == 0: + raise ValueError("input_columns must contain at least one column name") + return v + + + @field_validator("num_samples_per_prompt") + def validate_num_samples_per_prompt(cls, v): + if v > 10: + warnings.warn( + f"num_samples_per_prompt is set to {v}. Values above 10 are generally not recommended " + "as they may lead to excessive API costs and processing time, and reduced overall quality of the output." + ) + return v + + + @field_validator("prompts") + def validate_prompts(cls, prompts, info): + # Get input_columns from the validation context + input_columns = info.data.get('input_columns', []) + + for i, prompt in enumerate(prompts): + # Check for required placeholders + required_placeholders = ["{num_samples}", "{language}"] + missing_required = [p for p in required_placeholders if p not in prompt] + if missing_required: + raise ValueError( + f"Prompt at index {i} is missing required placeholders: {', '.join(missing_required)}. " + f"All prompts must contain: {', '.join(required_placeholders)}" + ) + + # Check that at least one input_column is used as placeholder + input_column_placeholders = [f"{{{col}}}" for col in input_columns] + used_input_columns = [p for p in input_column_placeholders if p in prompt] + + if not used_input_columns: + raise ValueError( + f"Prompt at index {i} must contain at least one column for processing from input_columns: " + f"{', '.join(input_column_placeholders)}" + ) + + # Warn about unused input columns + unused_input_columns = [p for p in input_column_placeholders if p not in prompt] + if unused_input_columns: + warnings.warn( + f"Prompt at index {i} does not use the following input_columns as placeholders: " + f"{', '.join(unused_input_columns)}" + ) + + return prompts + + + @model_validator(mode='after') + def validate_optional_placeholders_model(self): + # Validate optional placeholders after the model is fully constructed + if self.prompts: + validate_optional_placeholders(self.prompts, self.expansion, "prompts") + return self + + + @model_validator(mode='after') + def validate_data_source_exists(self): + if not self.hf_dataset_name and not self.local_file_path: + raise ValueError("Either hf_dataset_name or local_file_path must be provided") + return self \ No newline at end of file From 9e0289320c9b02ab41595dc0b0e8518925f1a5a9 Mon Sep 17 00:00:00 2001 From: Patrick Date: Sat, 16 Aug 2025 10:03:02 +0200 Subject: [PATCH 03/13] New utils and commonalize the load_dataset_from_source for GenericPipeline and MCQ Dataset (still needs testing) --- datafast/utils.py | 146 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 144 insertions(+), 2 deletions(-) diff --git a/datafast/utils.py b/datafast/utils.py index f0f7dc6..78aa5d9 100644 --- a/datafast/utils.py +++ b/datafast/utils.py @@ -1,5 +1,7 @@ -from datafast.schema.config import PromptExpansionConfig, ClassificationDatasetConfig, RawDatasetConfig, UltrachatDatasetConfig, MCQDatasetConfig, PreferenceDatasetConfig +from datafast.schema.config import PromptExpansionConfig, ClassificationDatasetConfig, RawDatasetConfig, UltrachatDatasetConfig, MCQDatasetConfig, PreferenceDatasetConfig, GenericPipelineDatasetConfig from datafast.llms import LLMProvider +from datasets import Dataset, load_dataset +from pydantic import BaseModel, Field, create_model def calculate_num_prompt_expansions(base_prompts: list[str], expansion_config: PromptExpansionConfig) -> int: """Calculate the number of prompt expansions based on the expansion configuration. @@ -154,4 +156,144 @@ def _get_preference_num_expected_rows(config: PreferenceDatasetConfig, llms: lis len(config.languages or {"en": "English"}) * num_questions * num_expanded_prompts - ) \ No newline at end of file + ) + + +def load_dataset_from_source(hf_dataset_name: str | None = None, + local_file_path: str | None = None, + sample_count: int | None = None, + text_column: str = "text") -> list[dict]: + """Shared utility to load dataset from Hugging Face or local file. + + Args: + hf_dataset_name: Name of HuggingFace dataset + local_file_path: Path to local file + sample_count: Optional limit on number of samples + text_column: Column name for text files (used by MCQDataset) + + Returns: + List of dictionaries representing dataset rows + """ + try: + if hf_dataset_name: + # Load from Hugging Face + hf_dataset = load_dataset(hf_dataset_name) + # Most datasets have a 'train' split, but fallback to first available split + split_names = list(hf_dataset.keys()) + if not split_names: + raise ValueError(f"No splits found in dataset {hf_dataset_name}") + + main_split = "train" if "train" in split_names else split_names[0] + dataset = hf_dataset[main_split] + # Convert to list of dicts for consistent interface + dataset = [dict(row) for row in dataset] + + elif local_file_path: + # Load from local file based on extension + file_ext = local_file_path.lower().split('.')[-1] + + if file_ext == 'csv': + import pandas as pd + df = pd.read_csv(local_file_path) + dataset = df.to_dict('records') + + elif file_ext == 'txt': + # For TXT files, use provided text_column name + with open(local_file_path, 'r', encoding='utf-8') as f: + lines = [line.strip() for line in f if line.strip()] + dataset = [{text_column: line} for line in lines] + + elif file_ext == 'parquet': + import pandas as pd + df = pd.read_parquet(local_file_path) + dataset = df.to_dict('records') + + elif file_ext in ['jsonl', 'json']: + import json + dataset = [] + with open(local_file_path, 'r', encoding='utf-8') as f: + for line in f: + if line.strip(): + dataset.append(json.loads(line)) + + else: + raise ValueError(f"Unsupported file extension: {file_ext}. Supported extensions are: csv, txt, parquet, jsonl, json") + else: + raise ValueError("Either hf_dataset_name or local_file_path must be specified") + + # Limit the number of samples if specified + if sample_count is not None: + dataset = dataset[:min(sample_count, len(dataset))] + + return dataset + + except Exception as e: + raise ValueError(f"Error loading dataset: {str(e)}") + + +def _get_generic_pipeline_specific_factors(config: GenericPipelineDatasetConfig) -> dict[str, int]: + return {"": None} # There are no generic pipeline specific multipliers. Method here for consistency. + + +def _get_generic_pipeline_num_expected_rows(config: GenericPipelineDatasetConfig, llms: list[LLMProvider]) -> int: + """Calculate expected rows for GenericPipelineDataset including prompt expansions.""" + # Load source dataset to get row count + source_dataset = load_dataset_from_source( + hf_dataset_name=config.hf_dataset_name, + local_file_path=config.local_file_path, + sample_count=config.sample_count + ) + source_data_num_rows = len(source_dataset) + # Note: sample_count limit already applied in load_dataset_from_source() + num_llms = len(llms) + + # Calculate prompt expansions + if config.prompts is None: + num_expanded_prompts = 1 + else: + num_expanded_prompts = calculate_num_prompt_expansions(config.prompts, config.expansion) + + return ( + num_llms * + len(config.languages or {"en": "English"}) * + config.num_samples_per_prompt * + source_data_num_rows * + num_expanded_prompts + ) + + +def build_generic_pipeline_response_format_model(config: GenericPipelineDatasetConfig) -> type[BaseModel]: + """Build a dynamic Pydantic model for GenericPipelineDataset response format. + + Creates a model with fields based on output_columns configuration. + If output_columns is None/empty, defaults to a single 'generated_text' field. + + Args: + config: GenericPipelineDatasetConfig with output_columns specification + + Returns: + Pydantic BaseModel class for structured LLM responses + """ + from typing import Any + + # Determine output fields + if config.output_columns and len(config.output_columns) > 0: + output_fields = config.output_columns + else: + output_fields = ["generated_text"] + + # Create field definitions for individual entry model + entry_fields = {} + for field_name in output_fields: + entry_fields[field_name] = (str, Field(..., description=f"Generated content for {field_name}")) + + # Create the entry model + EntryModel = create_model("GenericPipelineEntry", **entry_fields) + + # Create the response model with entries list + ResponseModel = create_model( + "GenericPipelineResponse", + entries=(list[EntryModel], Field(..., description="List of generated entries")) + ) + + return ResponseModel \ No newline at end of file From 532d6ae3058d9f995e1fcb75498468080093f812 Mon Sep 17 00:00:00 2001 From: Patrick Date: Sat, 16 Aug 2025 14:48:43 +0200 Subject: [PATCH 04/13] Implemented new GenericPipelineDataset --- datafast/datasets.py | 204 ++++++++++++++++++++++++++++++++----------- 1 file changed, 151 insertions(+), 53 deletions(-) diff --git a/datafast/datasets.py b/datafast/datasets.py index 6d50fb5..634e51c 100644 --- a/datafast/datasets.py +++ b/datafast/datasets.py @@ -18,6 +18,7 @@ UltrachatDatasetConfig, MCQDatasetConfig, PreferenceDatasetConfig, + GenericPipelineDatasetConfig, ) from datafast.schema.data_rows import ( ChatRow, @@ -29,11 +30,14 @@ MCQSource, PreferenceRow, PreferenceSource, + GenericPipelineRow, + GenericPipelineSource, ) from datafast.expanders import expand_prompts import os from datafast import utils + ### Model for Raw Text Examples Generation class Example(BaseModel): @@ -759,60 +763,14 @@ def generate(self, llms: list[LLMProvider]) -> "MCQDataset": if not llms: raise ValueError("At least one LLM provider must be supplied") - # Load the dataset from Hugging Face or local file + # Load the dataset using shared utility try: - if self.config.hf_dataset_name: - # Load from Hugging Face - hf_dataset = load_dataset(self.config.hf_dataset_name) - # Most datasets have a 'train' split, but fallback to first available split - split_names = list(hf_dataset.keys()) - if not split_names: - raise ValueError(f"No splits found in dataset {self.config.hf_dataset_name}") - - main_split = "train" if "train" in split_names else split_names[0] - dataset = hf_dataset[main_split] - - elif self.config.local_file_path: - # Load from local file based on extension - file_path = self.config.local_file_path - file_ext = file_path.lower().split('.')[-1] - - if file_ext == 'csv': - # Load CSV file - import pandas as pd - df = pd.read_csv(file_path) - dataset = df.to_dict('records') - - elif file_ext == 'txt': - # For TXT files, create a dataset with one record per line - # and use the text_column as the key - with open(file_path, 'r', encoding='utf-8') as f: - lines = [line.strip() for line in f if line.strip()] - dataset = [{self.config.text_column: line} for line in lines] - - elif file_ext == 'parquet': - # Load Parquet file - import pandas as pd - df = pd.read_parquet(file_path) - dataset = df.to_dict('records') - - elif file_ext in ['jsonl', 'json']: - # Load JSONL file (one JSON object per line) - import json - dataset = [] - with open(file_path, 'r', encoding='utf-8') as f: - for line in f: - if line.strip(): - dataset.append(json.loads(line)) - - else: - raise ValueError(f"Unsupported file extension: {file_ext}. Supported extensions are: csv, txt, parquet, jsonl, json") - else: - raise ValueError("Either hf_dataset_name or local_file_path must be specified") - - # Limit the number of samples if specified - if self.config.sample_count is not None: - dataset = dataset[:min(self.config.sample_count, len(dataset))] + dataset = utils.load_dataset_from_source( + hf_dataset_name=self.config.hf_dataset_name, + local_file_path=self.config.local_file_path, + sample_count=self.config.sample_count, + text_column=self.config.text_column + ) except Exception as e: source = self.config.hf_dataset_name or self.config.local_file_path @@ -1300,3 +1258,143 @@ def _get_default_judge_prompt(self) -> str: from datafast.prompts import preference_prompts return preference_prompts.JUDGE_PROMPT + +class GenericPipelineDataset(DatasetBase): + def __init__(self, config: GenericPipelineDatasetConfig): + super().__init__(config) + self.config = config + + def get_num_expected_rows(self, llms: list[LLMProvider]) -> int: + """Calculate the expected number of rows that will be generated. + + Args: + llms: List of LLM providers that will be used for generation. + + Returns: + int: The expected number of rows that will be generated. + """ + if not llms: + raise ValueError("At least one LLM provider must be supplied") + return utils._get_generic_pipeline_num_expected_rows(self.config, llms) + + def _load_source_dataset(self): + """Load dataset from Hugging Face or local file using shared utility.""" + return utils.load_dataset_from_source( + hf_dataset_name=self.config.hf_dataset_name, + local_file_path=self.config.local_file_path, + sample_count=self.config.sample_count + ) + + def generate(self, llms: list[LLMProvider]) -> "GenericPipelineDataset": + """Generate data by processing source dataset through custom prompts. + + Args: + llms: List of LLM providers to use for generation. + + Returns: + Self for method chaining. + """ + if not llms: + raise ValueError("At least one LLM provider must be supplied") + + # Load source dataset + source_dataset = self._load_source_dataset() + print(f"Loaded source dataset with {len(source_dataset)} rows") + + # Apply sample limit if specified + if self.config.sample_count: + source_dataset = source_dataset[:min(self.config.sample_count, len(source_dataset))] + print(f"Limited to {len(source_dataset)} rows") + + # Get languages from config + languages = self.config.languages or {"en": "English"} + + # Process each row in the source dataset + for row_idx, source_row in enumerate(source_dataset): + # Apply skip function if provided + if self.config.skip_function and self.config.skip_function(source_row): + print(f"Skipping row {row_idx} due to skip_function") + continue + + # Extract input data based on input_columns + input_data = {col: str(source_row.get(col, "")) for col in self.config.input_columns} + + # Extract forward data if specified + forward_data = {} + if self.config.forward_columns: + forward_data = {col: str(source_row.get(col, "")) for col in self.config.forward_columns} + + # Process for each language + for lang_code, language_name in languages.items(): + # Process each prompt + for prompt_idx, prompt_template in enumerate(self.config.prompts): + # Format prompt with input data and required placeholders + formatted_prompt = prompt_template.format( + num_samples=self.config.num_samples_per_prompt, + language=language_name, + **input_data + ) + + # Expand prompts with configured variations + expansions = expand_prompts( + prompt_templates=[formatted_prompt], + **self.config.expansion.model_dump() + ) + + # Process each expanded prompt + for expanded_prompt, meta in expansions: + # Process with each LLM + for llm in llms: + try: + # Create dynamic response model based on output_columns configuration + response_model = utils.create_response_model(self.config) + + # Create dynamic row model based on output_columns configuration + row_model = utils.create_generic_pipeline_row_model(self.config) + + # Generate response using the LLM with proper response format + response = llm.generate(expanded_prompt, response_format=response_model) + + # Create rows for each generated sample + new_rows = [] + for entry in response.entries: + # Prepare row data with all columns as separate top-level fields + row_data = { + "model_id": llm.model_id, + "pipeline_source": GenericPipelineSource.SYNTHETIC, + "language": lang_code, + "metadata": { + "prompt_index": str(prompt_idx), + "source_row_index": str(row_idx), + } + } + + # Add input data as individual top-level fields + for column, value in input_data.items(): + row_data[column] = value + + # Add forward data as individual top-level fields + for column, value in forward_data.items(): + row_data[column] = value + + # Add each output column as a separate field + if self.config.output_columns: + for column in self.config.output_columns: + row_data[column] = getattr(entry, column, "") + else: + row_data["generated_text"] = getattr(entry, "generated_text", "") + + # Create the dynamic row + row = row_model(**row_data) + self.data_rows.append(row) + new_rows.append(row) + + # Save this batch + self.to_jsonl(self.config.output_file, new_rows, append=True) + print(f"Generated and saved {len(self.data_rows)} examples total") + + except Exception as e: + print(f"Error with llm provider {llm.provider_name} on row {row_idx}: {e}") + continue + + return self \ No newline at end of file From c024ef4a79835e2795bc761ea253c7dff06d343a Mon Sep 17 00:00:00 2001 From: Patrick Date: Sat, 16 Aug 2025 14:49:00 +0200 Subject: [PATCH 05/13] utils for dynamic models creation --- datafast/utils.py | 71 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 61 insertions(+), 10 deletions(-) diff --git a/datafast/utils.py b/datafast/utils.py index 78aa5d9..576b78e 100644 --- a/datafast/utils.py +++ b/datafast/utils.py @@ -262,17 +262,16 @@ def _get_generic_pipeline_num_expected_rows(config: GenericPipelineDatasetConfig ) -def build_generic_pipeline_response_format_model(config: GenericPipelineDatasetConfig) -> type[BaseModel]: - """Build a dynamic Pydantic model for GenericPipelineDataset response format. - - Creates a model with fields based on output_columns configuration. - If output_columns is None/empty, defaults to a single 'generated_text' field. - +def create_response_model(config: GenericPipelineDatasetConfig) -> type[BaseModel]: + """ + Build a dynamic Pydantic model for GenericPipelineDataset response format. + Args: - config: GenericPipelineDatasetConfig with output_columns specification - + config (GenericPipelineDatasetConfig): Config with output_columns specification. + Returns: - Pydantic BaseModel class for structured LLM responses + type[BaseModel]: Pydantic BaseModel class for structured LLM responses. + If output_columns is None/empty, defaults to a single 'generated_text' field. """ from typing import Any @@ -296,4 +295,56 @@ def build_generic_pipeline_response_format_model(config: GenericPipelineDatasetC entries=(list[EntryModel], Field(..., description="List of generated entries")) ) - return ResponseModel \ No newline at end of file + return ResponseModel + + +def create_generic_pipeline_row_model(config: GenericPipelineDatasetConfig) -> type[BaseModel]: + """ + Build a dynamic Pydantic model for GenericPipelineRow based on configuration. + + Args: + config (GenericPipelineDatasetConfig): Config with input_columns, forward_columns, and output_columns. + + Returns: + type[BaseModel]: Dynamic Pydantic BaseModel class with all columns as separate fields. + """ + from uuid import UUID, uuid4 + from datafast.schema.data_rows import GenericPipelineSource + + # Determine output fields + if config.output_columns and len(config.output_columns) > 0: + output_fields = config.output_columns + else: + output_fields = ["generated_text"] + + # Create field definitions for the row model in desired order + row_fields = { + # System fields first + "uuid": (UUID, Field(default_factory=uuid4)), + } + + # Add each output column as a separate field (right after uuid) + for field_name in output_fields: + row_fields[field_name] = (str, Field(..., description=f"Generated content for {field_name}")) + + # Processing metadata + row_fields["model_id"] = (str | None, None) + row_fields["pipeline_source"] = (GenericPipelineSource, GenericPipelineSource.SYNTHETIC) + row_fields["language"] = (str | None, None) + + # Add each input column as a separate field + for field_name in config.input_columns: + row_fields[field_name] = (str, Field(..., description=f"Input data for {field_name}")) + + # Add each forward column as a separate field + if config.forward_columns: + for field_name in config.forward_columns: + row_fields[field_name] = (str, Field(..., description=f"Forwarded data for {field_name}")) + + # Metadata last + row_fields["metadata"] = (dict[str, str], Field(default_factory=dict)) + + # Create the dynamic row model + DynamicRowModel = create_model("DynamicGenericPipelineRow", **row_fields) + + return DynamicRowModel \ No newline at end of file From 24fc9f85873731d2ce5407a7d6ce232a66b268df Mon Sep 17 00:00:00 2001 From: Patrick Date: Sat, 16 Aug 2025 14:50:00 +0200 Subject: [PATCH 06/13] mini script to check the dynamic response format construction --- .../generic_pipeline_response_format_example.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 datafast/examples/generic_pipeline_response_format_example.py diff --git a/datafast/examples/generic_pipeline_response_format_example.py b/datafast/examples/generic_pipeline_response_format_example.py new file mode 100644 index 0000000..627cc74 --- /dev/null +++ b/datafast/examples/generic_pipeline_response_format_example.py @@ -0,0 +1,17 @@ +"""Simple test for create_response_model function.""" + +from datafast.schema.config import GenericPipelineDatasetConfig +from datafast.utils import create_response_model + +# Test with multiple columns and num_samples_per_prompt = 3 +config = GenericPipelineDatasetConfig( + hf_dataset_name="imdb", + input_columns=["text"], + output_columns=["summary", "sentiment"], + prompts=["Analyze: {text}. Language: {language}. Generate {num_samples} responses."], + num_samples_per_prompt=3 +) + +ResponseModel = create_response_model(config) + +print(ResponseModel.model_json_schema()) From ce82735540b279000a898fa82c3a07f6a0bd0ea0 Mon Sep 17 00:00:00 2001 From: Patrick Date: Sat, 16 Aug 2025 14:50:12 +0200 Subject: [PATCH 07/13] Example demonstrating create_generic_pipeline_row_model function --- .../generic_pipeline_row_model_example.py | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 datafast/examples/generic_pipeline_row_model_example.py diff --git a/datafast/examples/generic_pipeline_row_model_example.py b/datafast/examples/generic_pipeline_row_model_example.py new file mode 100644 index 0000000..c96c318 --- /dev/null +++ b/datafast/examples/generic_pipeline_row_model_example.py @@ -0,0 +1,62 @@ +"""Example demonstrating create_generic_pipeline_row_model function.""" + +from datafast.schema.config import GenericPipelineDatasetConfig +from datafast.utils import create_generic_pipeline_row_model + +# Test with multiple input, forward, and output columns +config = GenericPipelineDatasetConfig( + hf_dataset_name="imdb", + input_columns=["text"], + forward_columns=["label"], + output_columns=["translation", "question"], + prompts=["Analyze: {text}. Generate {num_samples} translation of the move review in {language} and a follow-up question someone may ask about it."], + num_samples_per_prompt=1, + languages={"fr": "French", "es": "Spanish"} +) + +# Create the dynamic row model +DynamicRowModel = create_generic_pipeline_row_model(config) + +# Print the model schema to see all fields +print("Dynamic Row Model Schema:") +print(DynamicRowModel.model_json_schema()) + +# Create an example instance to show the structure +example_row = DynamicRowModel( + # Input columns (from source data) + text="This movie was amazing!", + label="positive", + + # Output columns (generated by LLM) + translation="Ce film était trop bien", + question="Qu'est ce qui t'a plu?", + + # System fields + model_id="gpt-4", + language="fr" +) + +print("\nExample row instance:") +print(example_row.model_dump_json(indent=2)) + +# Test with minimal configuration (no forward columns, default output) +minimal_config = GenericPipelineDatasetConfig( + hf_dataset_name="imdb", + input_columns=["text"], + prompts=["Create {num_samples} summary in {language} of the move review : {text}"], + num_samples_per_prompt=1, +) + +MinimalRowModel = create_generic_pipeline_row_model(minimal_config) + +print("\nMinimal Row Model Schema:") +print(MinimalRowModel.model_json_schema()) + +minimal_example = MinimalRowModel( + text="This is a test text", + generated_text="This is a generated summary", + model_id="gpt-4" +) + +print("\nMinimal example row:") +print(minimal_example.model_dump_json(indent=2)) From 5648552f020f6f048c36058d59bbf7b2dbfc640b Mon Sep 17 00:00:00 2001 From: Patrick Date: Sat, 16 Aug 2025 14:50:46 +0200 Subject: [PATCH 08/13] Script for end-to-end testing of the GenericPipelineDatasetConfig. Still need to test it but it looks promising --- datafast/examples/generic_pipeline_example.py | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 datafast/examples/generic_pipeline_example.py diff --git a/datafast/examples/generic_pipeline_example.py b/datafast/examples/generic_pipeline_example.py new file mode 100644 index 0000000..2984464 --- /dev/null +++ b/datafast/examples/generic_pipeline_example.py @@ -0,0 +1,79 @@ +""" +Example script for generating a dataset using GenericPipelineDataset. +This example uses the patrickfleith/FinePersonas-v0.1-100k-space-filtered dataset to generate tweets and CVs for different personas. +""" + +import os +from datafast.schema.config import GenericPipelineDatasetConfig +from datafast.datasets import GenericPipelineDataset +from datafast.llms import OpenAIProvider, GeminiProvider, OllamaProvider + + +def main(): + # 1. Define the configuration + config = GenericPipelineDatasetConfig( + hf_dataset_name="patrickfleith/FinePersonas-v0.1-100k-space-filtered", + input_columns=["persona"], # Input data for generation + forward_columns=["summary_label"], # Data to forward through + output_columns=["tweet", "cv"], # Generated content columns + sample_count=3, # Process only 10 samples for testing + num_samples_per_prompt=1, # Generate 1 set per persona + prompts=[ + """Based on this persona: {persona} + +Generate {num_samples} texts in {language} with: +1. A tweet that this person might write (engaging, authentic to their character) +2. A short CV highlighting their background + +Make sure the content reflects their personality and background authentically. +The CV should include higher education degree (and school/university they obtained it from), work experience (if any), and relevant skills, and a hobby. +Your response should be formatted in valid JSON""" + ], + output_file="generic_pipeline_test_dataset.jsonl", + ) + + # 2. Initialize LLM providers + providers = [ + OpenAIProvider(model_id="gpt-5-nano-2025-08-07"), + # AnthropicProvider(model_id="claude-3-5-haiku-latest"), + # GeminiProvider(model_id="gemini-2.5-flash-lite", rpm_limit=15), + # OllamaProvider(model_id="gemma3:4b"), + ] + + # 3. Generate the dataset + dataset = GenericPipelineDataset(config) + num_expected_rows = dataset.get_num_expected_rows(providers) + print(f"\nExpected number of rows: {num_expected_rows}") + dataset.generate(providers) + + # 4. Print results summary + print(f"\nGenerated {len(dataset.data_rows)} examples") + print(f"Results saved to {config.output_file}") + + # # 5. Show a sample of the generated data + # if dataset.data_rows: + # print("\nSample generated row:") + # sample_row = dataset.data_rows[0] + # print(f"UUID: {sample_row.uuid}") + # print(f"Tweet: {getattr(sample_row, 'tweet', 'N/A')}") + # print(f"CV: {getattr(sample_row, 'cv', 'N/A')}") + # print(f"Persona: {getattr(sample_row, 'persona', 'N/A')}") + # print(f"Summary Label: {getattr(sample_row, 'summary_label', 'N/A')}") + # print(f"Model ID: {sample_row.model_id}") + + # 6. Optional: Push to HF hub + USERNAME = "username" # <--- Your hugging face username + DATASET_NAME = "generic_pipeline_test_dataset" # <--- Your hugging face dataset name + url = dataset.push_to_hub( + repo_id=f"{USERNAME}/{DATASET_NAME}", + seed=20250816, + shuffle=True, + ) + print(f"\nDataset pushed to Hugging Face Hub: {url}") + + +if __name__ == "__main__": + from dotenv import load_dotenv + + load_dotenv("secrets.env") + main() From a7b1b9791abc02a3e9c5463f50b4e2131db8f8ba Mon Sep 17 00:00:00 2001 From: Patrick Date: Sat, 16 Aug 2025 17:51:10 +0200 Subject: [PATCH 09/13] used proper logging for new pipeline --- datafast/datasets.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafast/datasets.py b/datafast/datasets.py index 634e51c..907a38d 100644 --- a/datafast/datasets.py +++ b/datafast/datasets.py @@ -36,6 +36,7 @@ from datafast.expanders import expand_prompts import os from datafast import utils +from loguru import logger ### Model for Raw Text Examples Generation @@ -1391,10 +1392,10 @@ def generate(self, llms: list[LLMProvider]) -> "GenericPipelineDataset": # Save this batch self.to_jsonl(self.config.output_file, new_rows, append=True) - print(f"Generated and saved {len(self.data_rows)} examples total") + logger.success(f"Generated and saved {len(self.data_rows)} examples total") except Exception as e: - print(f"Error with llm provider {llm.provider_name} on row {row_idx}: {e}") + logger.error(f"Error with llm provider {llm.provider_name} on row {row_idx}: {e}") continue return self \ No newline at end of file From 0d95bbe9b6272e347f512c40dc1bb1c3196192c5 Mon Sep 17 00:00:00 2001 From: Patrick Date: Sat, 16 Aug 2025 17:51:58 +0200 Subject: [PATCH 10/13] example --- datafast/examples/generic_pipeline_example.py | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/datafast/examples/generic_pipeline_example.py b/datafast/examples/generic_pipeline_example.py index 2984464..b010d41 100644 --- a/datafast/examples/generic_pipeline_example.py +++ b/datafast/examples/generic_pipeline_example.py @@ -9,6 +9,19 @@ from datafast.llms import OpenAIProvider, GeminiProvider, OllamaProvider +PROMPT_TEMPLATE = """I will give you a persona. +Generate {num_samples} texts in {language} with: +1. A tweet that this person might write (engaging, authentic to their character) +2. A short CV highlighting their background + +Make sure the content reflects their personality and background authentically. +The CV should include higher education degree (and school/university they obtained it from), work experience (if any), and relevant skills, and a hobby.\ + +Here is the persona: +{persona} + +Your response should be formatted in valid JSON with {num_samples} entries and all required fields.""" + def main(): # 1. Define the configuration config = GenericPipelineDatasetConfig( @@ -16,25 +29,19 @@ def main(): input_columns=["persona"], # Input data for generation forward_columns=["summary_label"], # Data to forward through output_columns=["tweet", "cv"], # Generated content columns - sample_count=3, # Process only 10 samples for testing - num_samples_per_prompt=1, # Generate 1 set per persona - prompts=[ - """Based on this persona: {persona} - -Generate {num_samples} texts in {language} with: -1. A tweet that this person might write (engaging, authentic to their character) -2. A short CV highlighting their background - -Make sure the content reflects their personality and background authentically. -The CV should include higher education degree (and school/university they obtained it from), work experience (if any), and relevant skills, and a hobby. -Your response should be formatted in valid JSON""" - ], + sample_count=5, # Process only 5 samples for testing + num_samples_per_prompt=2, # Generate 1 set per persona + prompts=[PROMPT_TEMPLATE], # Use the prompt template output_file="generic_pipeline_test_dataset.jsonl", + languages={"en": "English", "fr": "French"} ) # 2. Initialize LLM providers providers = [ - OpenAIProvider(model_id="gpt-5-nano-2025-08-07"), + OpenAIProvider( + model_id="gpt-5-mini-2025-08-07", + temperature=1 + ), # AnthropicProvider(model_id="claude-3-5-haiku-latest"), # GeminiProvider(model_id="gemini-2.5-flash-lite", rpm_limit=15), # OllamaProvider(model_id="gemma3:4b"), @@ -63,7 +70,7 @@ def main(): # 6. Optional: Push to HF hub USERNAME = "username" # <--- Your hugging face username - DATASET_NAME = "generic_pipeline_test_dataset" # <--- Your hugging face dataset name + DATASET_NAME = "generic_pipeline_test_dataset_2" # <--- Your hugging face dataset name url = dataset.push_to_hub( repo_id=f"{USERNAME}/{DATASET_NAME}", seed=20250816, From 78e78e6300fa8339c3a9584d4a177bcd7803f2c5 Mon Sep 17 00:00:00 2001 From: Patrick Date: Sat, 16 Aug 2025 17:54:56 +0200 Subject: [PATCH 11/13] example with keywords extraction --- .../examples/keywords_extraction_example.py | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 datafast/examples/keywords_extraction_example.py diff --git a/datafast/examples/keywords_extraction_example.py b/datafast/examples/keywords_extraction_example.py new file mode 100644 index 0000000..0225882 --- /dev/null +++ b/datafast/examples/keywords_extraction_example.py @@ -0,0 +1,66 @@ +""" +Example script for generating a dataset using GenericPipelineDataset. +This example uses the patrickfleith/FinePersonas-v0.1-100k-space-filtered dataset to generate tweets and CVs for different personas. +""" + +import os +from datafast.schema.config import GenericPipelineDatasetConfig +from datafast.datasets import GenericPipelineDataset +from datafast.llms import OpenAIProvider, GeminiProvider, OllamaProvider + +PROMPT_TEMPLATE = """I will give you a tweet. +Generate a comma separated list of 3 keywords for the tweet. Avoid multi-word keywords. + +Here is the tweet: +{tweet} + +Your response should be in {language} and formatted in valid JSON with {num_samples} entry and all required fields.""" + +def main(): + # 1. Define the configuration + config = GenericPipelineDatasetConfig( + hf_dataset_name="patrickfleith/generic_pipeline_test_dataset", + input_columns=["tweet"], # Input data for generation + output_columns=["keywords"], # Generated content columns + num_samples_per_prompt=1, # Generate 1 set per persona + prompts=[PROMPT_TEMPLATE], + output_file="keywords_extraction_test_dataset.jsonl", + languages={"en": "English", "fr": "French", "es": "Spanish", "de": "German", "it": "Italian"} + ) + + # 2. Initialize LLM providers + providers = [ + OpenAIProvider( + model_id="gpt-5-nano-2025-08-07", + ), + # AnthropicProvider(model_id="claude-3-5-haiku-latest"), + # GeminiProvider(model_id="gemini-2.5-flash-lite", rpm_limit=15), + # OllamaProvider(model_id="gemma3:4b"), + ] + + # 3. Generate the dataset + dataset = GenericPipelineDataset(config) + num_expected_rows = dataset.get_num_expected_rows(providers) + print(f"\nExpected number of rows: {num_expected_rows}") + dataset.generate(providers) + + # 4. Print results summary + print(f"\nGenerated {len(dataset.data_rows)} examples") + print(f"Results saved to {config.output_file}") + + # 6. Optional: Push to HF hub + USERNAME = "username" # <--- Your hugging face username + DATASET_NAME = "keywords_extraction_test_dataset" # <--- Your hugging face dataset name + url = dataset.push_to_hub( + repo_id=f"{USERNAME}/{DATASET_NAME}", + seed=20250816, + shuffle=True, + ) + print(f"\nDataset pushed to Hugging Face Hub: {url}") + + +if __name__ == "__main__": + from dotenv import load_dotenv + + load_dotenv("secrets.env") + main() From 73f0177634fd8c679863e3800da531ee3708fc9e Mon Sep 17 00:00:00 2001 From: Patrick Date: Mon, 18 Aug 2025 19:25:23 +0200 Subject: [PATCH 12/13] keyword extraction example --- datafast/examples/keywords_extraction_example.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/datafast/examples/keywords_extraction_example.py b/datafast/examples/keywords_extraction_example.py index 0225882..2e30687 100644 --- a/datafast/examples/keywords_extraction_example.py +++ b/datafast/examples/keywords_extraction_example.py @@ -24,18 +24,17 @@ def main(): output_columns=["keywords"], # Generated content columns num_samples_per_prompt=1, # Generate 1 set per persona prompts=[PROMPT_TEMPLATE], - output_file="keywords_extraction_test_dataset.jsonl", + output_file="keywords_extraction_gemma3_runpod.jsonl", languages={"en": "English", "fr": "French", "es": "Spanish", "de": "German", "it": "Italian"} ) # 2. Initialize LLM providers providers = [ - OpenAIProvider( - model_id="gpt-5-nano-2025-08-07", - ), - # AnthropicProvider(model_id="claude-3-5-haiku-latest"), - # GeminiProvider(model_id="gemini-2.5-flash-lite", rpm_limit=15), - # OllamaProvider(model_id="gemma3:4b"), + OllamaProvider( + model_id="gemma3:27b-it-qat", + api_base="https://xxxxxxx-11434.proxy.runpod.net", + temperature=1 + ) ] # 3. Generate the dataset @@ -50,7 +49,7 @@ def main(): # 6. Optional: Push to HF hub USERNAME = "username" # <--- Your hugging face username - DATASET_NAME = "keywords_extraction_test_dataset" # <--- Your hugging face dataset name + DATASET_NAME = "keywords_extraction_gemma3_runpod" # <--- Your hugging face dataset name url = dataset.push_to_hub( repo_id=f"{USERNAME}/{DATASET_NAME}", seed=20250816, From c219f6c231289abe4b624715424ea05bbecda22d Mon Sep 17 00:00:00 2001 From: Patrick Date: Sun, 12 Oct 2025 10:20:48 +0200 Subject: [PATCH 13/13] fix: correct typo in movie review prompt and update config descriptions in examples --- .../generic_pipeline_row_model_example.py | 2 +- .../examples/keywords_extraction_example.py | 24 +++++++++++-------- datafast/schema/config.py | 2 +- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/datafast/examples/generic_pipeline_row_model_example.py b/datafast/examples/generic_pipeline_row_model_example.py index c96c318..41323b1 100644 --- a/datafast/examples/generic_pipeline_row_model_example.py +++ b/datafast/examples/generic_pipeline_row_model_example.py @@ -9,7 +9,7 @@ input_columns=["text"], forward_columns=["label"], output_columns=["translation", "question"], - prompts=["Analyze: {text}. Generate {num_samples} translation of the move review in {language} and a follow-up question someone may ask about it."], + prompts=["Analyze: {text}. Generate {num_samples} translation of the movie review in {language} and a follow-up question someone may ask about it."], num_samples_per_prompt=1, languages={"fr": "French", "es": "Spanish"} ) diff --git a/datafast/examples/keywords_extraction_example.py b/datafast/examples/keywords_extraction_example.py index 2e30687..c91d7bc 100644 --- a/datafast/examples/keywords_extraction_example.py +++ b/datafast/examples/keywords_extraction_example.py @@ -1,12 +1,12 @@ """ Example script for generating a dataset using GenericPipelineDataset. -This example uses the patrickfleith/FinePersonas-v0.1-100k-space-filtered dataset to generate tweets and CVs for different personas. +This example uses the local file output from the generic_pipeline_test_dataset example script to generate keywords for different tweets. """ import os from datafast.schema.config import GenericPipelineDatasetConfig from datafast.datasets import GenericPipelineDataset -from datafast.llms import OpenAIProvider, GeminiProvider, OllamaProvider +from datafast.llms import OpenAIProvider, GeminiProvider, OllamaProvider, OpenRouterProvider PROMPT_TEMPLATE = """I will give you a tweet. Generate a comma separated list of 3 keywords for the tweet. Avoid multi-word keywords. @@ -19,21 +19,25 @@ def main(): # 1. Define the configuration config = GenericPipelineDatasetConfig( - hf_dataset_name="patrickfleith/generic_pipeline_test_dataset", + local_file_path="generic_pipeline_test_dataset.jsonl", input_columns=["tweet"], # Input data for generation output_columns=["keywords"], # Generated content columns num_samples_per_prompt=1, # Generate 1 set per persona prompts=[PROMPT_TEMPLATE], - output_file="keywords_extraction_gemma3_runpod.jsonl", - languages={"en": "English", "fr": "French", "es": "Spanish", "de": "German", "it": "Italian"} + output_file="keywords_extraction_z-ai_glm-4.6.jsonl", + languages={"en": "English", "fr": "French"}, + sample_count=10 ) # 2. Initialize LLM providers providers = [ - OllamaProvider( - model_id="gemma3:27b-it-qat", - api_base="https://xxxxxxx-11434.proxy.runpod.net", - temperature=1 + # OllamaProvider( + # model_id="gemma3:27b-it-qat", + # api_base="https://xxxxxxx-11434.proxy.runpod.net", # if you have a runpod ollama instance + # temperature=1 + # ) + OpenRouterProvider( + model_id="z-ai/glm-4.6", ) ] @@ -49,7 +53,7 @@ def main(): # 6. Optional: Push to HF hub USERNAME = "username" # <--- Your hugging face username - DATASET_NAME = "keywords_extraction_gemma3_runpod" # <--- Your hugging face dataset name + DATASET_NAME = "keywords_extraction_z-ai_glm-4.6" # <--- Your hugging face dataset name url = dataset.push_to_hub( repo_id=f"{USERNAME}/{DATASET_NAME}", seed=20250816, diff --git a/datafast/schema/config.py b/datafast/schema/config.py index cc9d907..37a41d7 100644 --- a/datafast/schema/config.py +++ b/datafast/schema/config.py @@ -569,7 +569,7 @@ class GenericPipelineDatasetConfig(BaseModel): output_columns: list[str] | None = Field( default=None, - description="List of column names to use as output for the pipeline" + description="List of column names to use as output of the pipeline" ) prompts: list[str] = Field(