Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
167 commits
Select commit Hold shift + click to select a range
a1573e7
Add S3-Snowflake stage operations for large-scale data transfers
abhishek-pattern Feb 5, 2026
2afeb6e
feat: implement S3 stage operations for Snowflake integration and add…
abhishek-pattern Feb 5, 2026
90be5e7
feat: enhance Snowflake to S3 operations with configuration and query…
abhishek-pattern Feb 9, 2026
4384700
feat: add S3 to Snowflake data transfer functionality
abhishek-pattern Feb 9, 2026
179c950
feat: add S3 data folder constant for Snowflake stage operations
abhishek-pattern Feb 9, 2026
e1c4a0e
refactor: remove outdated functional tests for pandas via S3 stage
abhishek-pattern Feb 9, 2026
09c172e
feat: add batch inference function for Snowflake integration and infe…
abhishek-pattern Feb 9, 2026
ed96149
fix: add missing import for listing files in S3 for batch inference
abhishek-pattern Feb 9, 2026
fa9eb51
feat: add batch size parameter to Snowflake to S3 copy query and upda…
abhishek-pattern Feb 9, 2026
3366ead
feat: enhance batch inference with multiprocessing for file processing
abhishek-pattern Feb 9, 2026
535cc71
feat: refactor file processing in batch inference to use a dedicated …
abhishek-pattern Feb 9, 2026
58b52f4
feat: refactor file processing in batch inference to inline function …
abhishek-pattern Feb 9, 2026
bac2b1d
feat: add function to download all files from S3 folder with URI vali…
abhishek-pattern Feb 9, 2026
ea0544c
feat: replace file listing with direct download of all files from S3 …
abhishek-pattern Feb 9, 2026
bb65911
feat: update input file processing to read parquet format instead of …
abhishek-pattern Feb 9, 2026
43bfb41
feat: refactor _download_all_files_in_s3_folder to use a direct S3 cl…
abhishek-pattern Feb 9, 2026
172278a
feat: optimize batch inference file processing using multiprocessing …
abhishek-pattern Feb 9, 2026
82f30af
feat: add picklability check for process_file function before multipr…
abhishek-pattern Feb 9, 2026
4b8712d
feat: refactor process_file function for improved picklability and in…
abhishek-pattern Feb 9, 2026
b7624cc
feat: replace itertools.partial with functools.partial for improved p…
abhishek-pattern Feb 9, 2026
ccc1e29
feat: replace multiprocessing.Pool with metaflow.parallel_map for imp…
abhishek-pattern Feb 9, 2026
c7f9b5b
feat: refactor process_file function for improved integration with pa…
abhishek-pattern Feb 9, 2026
70984bd
feat: replace parallel_map with concurrent.futures.ProcessPoolExecuto…
abhishek-pattern Feb 9, 2026
7fe6bf4
feat: switch from ProcessPoolExecutor to ThreadPoolExecutor for impro…
abhishek-pattern Feb 9, 2026
4bab774
feat: enhance batch inference and S3 integration with schema inferenc…
abhishek-pattern Feb 11, 2026
139fc89
feat: remove redundant import of ThreadPoolExecutor in batch_inferenc…
abhishek-pattern Feb 12, 2026
cfa7831
feat: streamline S3 file handling with temporary file management and …
abhishek-pattern Feb 12, 2026
7740a61
feat: add logging for data export process in batch inference
abhishek-pattern Feb 12, 2026
bfd8dcb
feat: add timing logs for data export and batch inference processes
abhishek-pattern Feb 12, 2026
e66fcda
feat: update batch file processing to handle single S3 file input and…
abhishek-pattern Feb 12, 2026
ea250c6
feat: fix data retrieval from S3 by removing unnecessary parquet read…
abhishek-pattern Feb 12, 2026
b23c845
feat: optimize batch inference by removing unnecessary parquet read s…
abhishek-pattern Feb 12, 2026
789b5b6
feat: update batch size handling and improve file processing in batch…
abhishek-pattern Feb 12, 2026
4172057
feat: add polars dependency for enhanced data processing capabilities
abhishek-pattern Feb 12, 2026
ad87d29
feat: switch from pandas to polars for improved performance in S3 fil…
abhishek-pattern Feb 12, 2026
a5662b7
feat: remove pandas_via_s3_stage module to streamline data processing…
abhishek-pattern Feb 12, 2026
c7d3e71
feat: remove polars dependency and revert to pandas for S3 file retri…
abhishek-pattern Feb 12, 2026
65a70bb
feat: remove polars dependency from project requirements
abhishek-pattern Feb 12, 2026
9ddef7f
feat: remove make_batch_predictions_from_snowflake_via_s3_stage impor…
abhishek-pattern Feb 12, 2026
368dcc2
feat: add Metaflow flow for publishing and querying pandas DataFrames…
abhishek-pattern Feb 12, 2026
17997ab
Refactor code structure for improved readability and maintainability
abhishek-pattern Feb 12, 2026
675ca8f
feat: add pytest-xdist for parallel test execution and update test co…
abhishek-pattern Feb 12, 2026
393abaf
fix: update flow steps in test to ensure correct execution order
abhishek-pattern Feb 12, 2026
d0a231c
fix: update table name and query for publishing and querying DataFram…
abhishek-pattern Feb 12, 2026
71b3169
fix: correct SQL syntax for creating temporary file format in Snowflake
abhishek-pattern Feb 12, 2026
d023186
fix: correct SQL syntax for creating temporary file format in Snowflake
abhishek-pattern Feb 12, 2026
46ea94d
fix: handle empty DataFrame in S3 write functions and adjust SQL quer…
abhishek-pattern Feb 12, 2026
4c96953
fix: ensure DataFrame columns are lowercase for consistent processing…
abhishek-pattern Feb 12, 2026
5a479bb
fix: add prefix to temporary file paths for S3 operations
abhishek-pattern Feb 12, 2026
15b9148
fix: update S3 file listing method to use recursive listing
abhishek-pattern Feb 12, 2026
162926e
fix: update test command and add pytest-xdist dependency for improved…
abhishek-pattern Feb 16, 2026
10e583e
fix: implement multithreading for batch inference processing to impro…
abhishek-pattern Feb 16, 2026
d29988e
feat: implement multi-threaded batch inference processing with detail…
abhishek-pattern Feb 16, 2026
eafad1b
fix: replace print statements with debug function for improved logging
abhishek-pattern Feb 16, 2026
45e882c
fix: rename snowflake_batch_transform function to batch_inference for…
abhishek-pattern Feb 16, 2026
74b6b87
fix: reduce queue maxsize in batch_inference for optimized memory usage
abhishek-pattern Feb 16, 2026
246d41e
fix: reduce queue maxsize in batch_inference for optimized memory usage
abhishek-pattern Feb 16, 2026
7501517
feat: enhance batch inference with timer context manager and improved…
abhishek-pattern Feb 18, 2026
031cd51
fix: rename NON_PROD_SCHEMA to DEV_SCHEMA for clarity
abhishek-pattern Feb 18, 2026
637eba0
fix: replace NON_PROD_SCHEMA with DEV_SCHEMA for consistency in Snowf…
abhishek-pattern Feb 18, 2026
758af9b
refactor: replace threading with ThreadPoolExecutor for batch process…
abhishek-pattern Feb 18, 2026
8bccfe6
feat: implement S3 to Snowflake and Snowflake to S3 data transfer fun…
abhishek-pattern Feb 18, 2026
48bfb2a
feat: add warning for oversized files in batch processing and make ta…
abhishek-pattern Feb 18, 2026
616be05
fix: add trailing slash to Snowflake stage path in copy_snowflake_to_…
abhishek-pattern Feb 18, 2026
3cf91c9
fix: remove unused schema parameter from copy_s3_to_snowflake function
abhishek-pattern Feb 18, 2026
4afa9ff
fix: correct Snowflake stage path syntax in S3 to Snowflake copy query
abhishek-pattern Feb 18, 2026
cff9710
fix: comment out unused SQL generation code in S3 to Snowflake copy q…
abhishek-pattern Feb 18, 2026
a54a7f9
feat: enable generation of COPY INTO query in S3 to Snowflake functio…
abhishek-pattern Feb 18, 2026
aca1ed0
feat: add logging for generated SQL queries in S3 to Snowflake copy f…
abhishek-pattern Feb 18, 2026
1f1f7e1
feat: add logging for inferred table schema in copy_s3_to_snowflake f…
abhishek-pattern Feb 18, 2026
11af568
fix: update condition to infer table schema in copy_s3_to_snowflake f…
abhishek-pattern Feb 18, 2026
6eb342e
feat: add logging for S3 upload path in copy_s3_to_snowflake function
abhishek-pattern Feb 18, 2026
90e944e
fix: update condition to infer table schema in copy_s3_to_snowflake f…
abhishek-pattern Feb 18, 2026
22db8f8
fix: validate S3 path format in copy_s3_to_snowflake function
abhishek-pattern Feb 18, 2026
da74b40
fix: update S3 path validation in copy_s3_to_snowflake function
abhishek-pattern Feb 18, 2026
db9d610
fix: update S3 path replacement logic in copy_s3_to_snowflake function
abhishek-pattern Feb 18, 2026
99ae8ec
fix: simplify S3 bucket replacement logic in copy_s3_to_snowflake fun…
abhishek-pattern Feb 18, 2026
af71a43
fix: refactor S3 configuration and update copy functions in pandas mo…
abhishek-pattern Feb 18, 2026
e66664b
fix: remove unused copy functions and update table schema definition …
abhishek-pattern Feb 18, 2026
2fb612e
fix: update import statement to use s3_stage module for copy query fu…
abhishek-pattern Feb 18, 2026
c90b043
feat: implement BatchInferencePipeline class for orchestrating batch …
abhishek-pattern Feb 18, 2026
2660c54
feat: add BatchInferencePipeline class for orchestrating batch infere…
abhishek-pattern Feb 18, 2026
3496dfa
fix: update BatchInferencePipeline to use worker_ids instead of batch…
abhishek-pattern Feb 18, 2026
f9c5622
feat: enhance BatchInferencePipeline with file splitting for worker p…
abhishek-pattern Feb 18, 2026
c2e2710
fix: correct spelling of 'table_definition' in multiple files
abhishek-pattern Feb 18, 2026
b10f061
feat: add run method to BatchInferencePipeline for streamlined proces…
abhishek-pattern Feb 18, 2026
2208b0a
fix: adjust worker file assignment to start from 1 for correct indexing
abhishek-pattern Feb 18, 2026
e171fc3
fix: adjust worker file assignment to start from 1 for correct indexing
abhishek-pattern Feb 18, 2026
fb087e7
fix: remove debug print statements and add error handling for table s…
abhishek-pattern Feb 18, 2026
7ad286c
feat: update BatchInferencePipeline and S3 stage for enhanced path ha…
abhishek-pattern Feb 19, 2026
14975a6
fix: update import from BatchInferenceManager to BatchInferencePipeli…
abhishek-pattern Feb 19, 2026
6824499
feat: enhance logging for Snowflake to S3 export process and ensure c…
abhishek-pattern Feb 19, 2026
1cab798
fix: adjust file indexing in download worker to start from 1 for accu…
abhishek-pattern Feb 19, 2026
3f96678
fix: rename timeout parameter for clarity and update usage in batch p…
abhishek-pattern Feb 19, 2026
82cc466
fix: ensure all worker futures are awaited and exceptions are propaga…
abhishek-pattern Feb 19, 2026
5efb586
feat: add execution state flags to BatchInferencePipeline for better …
abhishek-pattern Feb 23, 2026
1d88bfe
fix: suppress pylint warning for publish_results method in BatchInfer…
abhishek-pattern Feb 23, 2026
f189b80
fix: update BatchInferencePipeline initialization and enhance warning…
abhishek-pattern Feb 23, 2026
83a8332
feat: enhance README with detailed BatchInferencePipeline usage examp…
abhishek-pattern Feb 23, 2026
a654caa
Add Metaflow utilities documentation and modules for batch inference,…
abhishek-pattern Feb 23, 2026
23a5559
docs: update README and guides to reflect Outerbounds integration for…
abhishek-pattern Feb 23, 2026
dbc27d4
Refactor Snowflake integration and documentation
abhishek-pattern Feb 23, 2026
5489a16
refactor: remove batch inference module from Metaflow integration
abhishek-pattern Feb 23, 2026
a5575f2
fix: prevent re-execution of query_and_batch in BatchInferencePipelin…
abhishek-pattern Feb 23, 2026
84ffb52
Remove outdated guides and documentation files:
abhishek-pattern Feb 23, 2026
0f79a58
feat: add polars dependency and update S3 data retrieval to use polar…
abhishek-pattern Feb 24, 2026
61c4acc
fix: update S3 file retrieval to use polars for DataFrame conversion
abhishek-pattern Feb 24, 2026
0ef141f
fix: replace polars with pandas for S3 parquet file retrieval
abhishek-pattern Feb 24, 2026
374d539
fix: update parquet engine from pyarrow to fastparquet for S3 file re…
abhishek-pattern Feb 24, 2026
a7a3605
fix: update S3 DataFrame retrieval to use dtype_backend="pyarrow"
abhishek-pattern Feb 24, 2026
a9793b4
fix: update S3 DataFrame retrieval to remove dtype_backend and improv…
abhishek-pattern Feb 24, 2026
c1af0b1
fix: optimize S3 DataFrame retrieval by consolidating file reading in…
abhishek-pattern Feb 24, 2026
75a94a2
fix: update S3 DataFrame retrieval to use Polars for parquet files an…
abhishek-pattern Feb 25, 2026
c914268
fix: cast decimal columns to float64 for pandas compatibility in S3 D…
abhishek-pattern Feb 25, 2026
8cf7b71
fix: enhance S3 DataFrame retrieval by adding type mapping for decima…
abhishek-pattern Feb 26, 2026
b2e202f
feat: enhance Snowflake integration by adding query tagging for cost …
abhishek-pattern Feb 27, 2026
503e8ef
fix: remove fastparquet dependency and update polars versioning for c…
abhishek-pattern Feb 27, 2026
aed6cab
fix: remove unused import of _execute_sql from run_query.py
abhishek-pattern Feb 27, 2026
f937cd4
fix: remove unnecessary whitespace in documentation and code examples
abhishek-pattern Feb 27, 2026
7df02c1
fix: update Snowflake connection handling to include warehouse parame…
abhishek-pattern Feb 27, 2026
fe523dd
Implement structure for code changes with placeholders for future upd…
abhishek-pattern Feb 27, 2026
3f73bcf
fix: enhance test coverage for _execute_sql by adding fixtures and mo…
abhishek-pattern Feb 27, 2026
72ff942
feat: add SQL utility functions for query handling and tagging in Sno…
abhishek-pattern Feb 27, 2026
200c3f9
feat: add SQL utility functions for query handling and tagging in Sno…
abhishek-pattern Feb 27, 2026
1500e06
refactor: streamline _execute_sql function and enhance error handling…
abhishek-pattern Feb 27, 2026
fc223e7
refactor: replace warnings with print statements and enhance comment …
abhishek-pattern Feb 27, 2026
3355623
test: add test for multi-statement execution with comments in _execut…
abhishek-pattern Feb 27, 2026
ea6ddb2
feat: enhance SQL comment handling and add unit tests for comment ins…
abhishek-pattern Feb 27, 2026
2a63370
test: update multi-statement test to include comments in SQL execution
abhishek-pattern Feb 27, 2026
9aa83ee
refactor: simplify test structure for get_select_dev_query_tags and e…
abhishek-pattern Feb 27, 2026
0121904
refactor: restructure mock current object creation and enhance test c…
abhishek-pattern Feb 27, 2026
be8cda7
feat: add Metaflow flow for testing warehouse queries with dynamic pa…
abhishek-pattern Feb 27, 2026
355b2f5
fix: rename test_publish_with_warehouse_xs to test_publish_with_wareh…
abhishek-pattern Feb 27, 2026
c16fe77
feat: add additional warehouse configuration to test_publish_with_war…
abhishek-pattern Feb 27, 2026
33aaad4
feat: implement Metaflow flow for batch inference processing and resu…
abhishek-pattern Feb 27, 2026
289affc
fix: reduce query size in batch inference test for performance improv…
abhishek-pattern Feb 27, 2026
206d83c
fix: set quote_identifiers to False in publish_pandas function for im…
abhishek-pattern Feb 27, 2026
1f325ed
fix: update publish_results method to use inputs from parallel branches
abhishek-pattern Feb 27, 2026
a2d4dd4
fix: increase row count in query for batch inference testing
abhishek-pattern Feb 27, 2026
83113f5
fix: enable debug mode for query in publish_results step
abhishek-pattern Feb 27, 2026
a88e6b5
fix: update schema substitution order in query processing for consist…
abhishek-pattern Feb 27, 2026
57a276c
fix: add column aliases to query in query_and_batch step for clarity
abhishek-pattern Feb 27, 2026
d527876
Add documentation for Metaflow utilities and remove outdated files
abhishek-pattern Feb 27, 2026
13764bb
fix: ensure parallel_workers parameter is set in query_and_batch step
abhishek-pattern Feb 27, 2026
091a143
fix: add query_pandas_from_snowflake import and validate output row c…
abhishek-pattern Feb 27, 2026
321bee3
fix: improve error message for row count mismatch in query_and_batch …
abhishek-pattern Feb 27, 2026
0611896
fix: remove redundant warehouse selection in query_pandas_from_snowfl…
abhishek-pattern Feb 27, 2026
68f3984
fix: refactor test steps to streamline warehouse query and publish pr…
abhishek-pattern Feb 27, 2026
f78aaf5
fix: update table names to follow DS_PLATFORM_UTILS naming convention…
abhishek-pattern Feb 27, 2026
0b191a6
fix: specify schema in CREATE TABLE statement for warehouse publish test
abhishek-pattern Feb 27, 2026
709be05
fix: update default values for auto_create_table and overwrite in Bat…
abhishek-pattern Feb 27, 2026
faff8dd
docs: add recommendation for tuning batch_size_in_mb for cost-effecti…
abhishek-pattern Feb 27, 2026
d4754a9
fix: update parameter descriptions in BatchInferencePipeline document…
abhishek-pattern Feb 27, 2026
d41dd2e
feat: add estimate_chunk_size utility and improve chunk_size validati…
abhishek-pattern Feb 27, 2026
cb70156
refactor: rename copy functions to internal and add S3 stage path gen…
abhishek-pattern Feb 27, 2026
fa30112
fix: remove unnecessary whitespace in BatchInferencePipeline document…
abhishek-pattern Feb 27, 2026
d3fe5ec
refactor: streamline S3 stage path generation and update related func…
abhishek-pattern Feb 27, 2026
06434a4
fix: update return type description in _copy_snowflake_to_s3 function
abhishek-pattern Feb 27, 2026
958cd6c
Refactor code structure for improved readability and maintainability
abhishek-pattern Feb 27, 2026
37533be
fix: correct condition for single SQL statement validation in _genera…
abhishek-pattern Feb 27, 2026
79b20d1
fix: update command tags in Metaflow test scripts for improved tracking
abhishek-pattern Feb 27, 2026
9d94926
refactor: simplify S3 path generation and improve batch processing logic
abhishek-pattern Feb 27, 2026
9f8726b
fix: update tag syntax in Metaflow test scripts for consistency
abhishek-pattern Feb 27, 2026
c2ae896
fix: ensure proper cleanup of tags in TestWarehouseFlow to prevent ta…
abhishek-pattern Feb 27, 2026
08588f7
fix: update paths-ignore in CI/CD workflow and improve parameter tabl…
abhishek-pattern Feb 27, 2026
f071bdb
fix: update parameter table formatting in publish_pandas documentatio…
abhishek-pattern Feb 27, 2026
66acefc
lowercase warehouse name in test and clean up tag after query instead…
abhishek-pattern Feb 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/ci-cd-ds-platform-utils.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@ on:
push:
branches:
- main
paths-ignore:
- "README.md"
- "docs/**"
pull_request:
types: [opened, synchronize]
paths-ignore:
- "README.md"
- "docs/**"

jobs:
check-version:
Expand Down Expand Up @@ -110,7 +116,7 @@ jobs:
uv pip install --group dev
COVERAGE_DIR="$(python -c 'import ds_platform_utils; print(ds_platform_utils.__path__[0])')"
poe clean
poe test --cov="$COVERAGE_DIR" --no-cov
poe test --cov="$COVERAGE_DIR" --no-cov -n auto

tag-version:
needs: [check-version, code-quality-checks, build-wheel, execute-tests]
Expand Down
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## ds-platform-utils
# ds-platform-utils

## Metaflow API Docs

- [BatchInferencePipeline](docs/metaflow/batch_inference_pipeline.md)
- [make_pydantic_parser_fn](docs/metaflow/make_pydantic_parser_fn.md)
- [publish](docs/metaflow/publish.md)
- [publish_pandas](docs/metaflow/publish_pandas.md)
- [query_pandas_from_snowflake](docs/metaflow/query_pandas_from_snowflake.md)
- [restore_step_state](docs/metaflow/restore_step_state.md)

Utility library to support Pattern's [data-science-projects](https://github.com/patterninc/data-science-projects/).
204 changes: 204 additions & 0 deletions docs/metaflow/batch_inference_pipeline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# `BatchInferencePipeline`

Source: `ds_platform_utils.metaflow.batch_inference_pipeline.BatchInferencePipeline`

Utility class to orchestrate batch inference with Snowflake + S3 in Metaflow steps.

## Main methods

- `query_and_batch(...)`: export source data to S3 and create worker batches.
- `process_batch(...)`: run download → inference → upload for one worker.
- `publish_results(...)`: copy prediction outputs from S3 to Snowflake.

Or
- `run(...)`: convenience method to execute full flow sequentially.

## Detailed example (Metaflow foreach)

This example shows the intended 3-step pattern in a Metaflow `FlowSpec`:

1. `query_and_batch()` in `start`
2. `process_batch()` in `foreach`
3. `publish_results()` in `join`

```python
from metaflow import FlowSpec, step
import pandas as pd

from ds_platform_utils.metaflow import BatchInferencePipeline


def predict_fn(df: pd.DataFrame) -> pd.DataFrame:
# Example model logic
out = pd.DataFrame()
out["id"] = df["id"]
out["score"] = (df["feature_1"].fillna(0) * 0.7 + df["feature_2"].fillna(0) * 0.3).round(6)
out["label"] = (out["score"] >= 0.5).astype(int)
return out


class BatchPredictFlow(FlowSpec):

start
@step
def start(self):
self.next(self.query_and_batch)

@step
def query_and_batch(self):
self.pipeline = BatchInferencePipeline()

# Query can be inline SQL or a file path.
# {schema} is provided by ds_platform_utils (DEV/PROD selection).
self.worker_ids = self.pipeline.query_and_batch(
input_query="""
SELECT
id,
feature_1,
feature_2
FROM {schema}.model_features
WHERE ds = '2026-02-26'
""",
parallel_workers=8,
warehouse="ANALYTICS_WH",
use_utc=True,
)

self.next(self.process_batch, foreach="worker_ids")

@step
def process_batch(self):
# In a foreach step, self.input contains one worker_id.
self.pipeline.process_batch(
worker_id=self.input,
predict_fn=predict_fn,
batch_size_in_mb=256,
timeout_per_batch=300,
)
self.next(self.publish_results)

@step
def publish_results(self, inputs):
# Reuse one pipeline object from foreach branches.
self.pipeline = inputs[0].pipeline

self.pipeline.publish_results(
output_table_name="MODEL_PREDICTIONS_DAILY",
output_table_definition=[
("id", "NUMBER"),
("score", "FLOAT"),
("label", "NUMBER"),
],
auto_create_table=True,
overwrite=True,
warehouse="MED",
use_utc=True,
)
self.next(self.end)

@step
def end(self):
print("Batch inference complete")
```

## Detailed example (single-step convenience)

Use `run()` when you do not need Metaflow foreach parallelization:

```python
from ds_platform_utils.metaflow import BatchInferencePipeline
import pandas as pd


@step
def batch_inference_step(self):
def predict_fn(df: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame(
{
"id": df["id"],
"score": (df["feature_1"] * 0.9).fillna(0),
}
)

pipeline = BatchInferencePipeline()
pipeline.run(
input_query="""
SELECT id, feature_1
FROM {schema}.model_features
WHERE ds = '2026-02-26'
""",
output_table_name="MODEL_PREDICTIONS_DAILY",
predict_fn=predict_fn,
output_table_definition=[("id", "NUMBER"), ("score", "FLOAT")],
warehouse="XL",
)

self.next(self.end)
```

<!-- ...existing code... -->
## Parameters

### `query_and_batch(...)`

| Parameter | Type | Required | Description |
| ------------------ | ------------- | -------: | ----------------------------------------------------------------------------------------------------------------------- |
| `input_query` | `str \| Path` | Yes | SQL query string or SQL file path used to fetch source rows. `{schema}` placeholder is resolved by `ds_platform_utils`. |
| `ctx` | `dict` | No | Optional substitution map for templated SQL; merged with the internal `{"schema": ...}` mapping before query execution. |
| `warehouse` | `str` | No | Snowflake warehouse used to execute the source query/export. |
| `use_utc` | `bool` | No | If `True`, uses UTC timestamps/paths for partitioning and run metadata. |
| `parallel_workers` | `int` | No | Number of worker partitions to create for downstream processing. |

**Returns:** `list[int]` of `worker_id` values for Metaflow `foreach`.

---

### `process_batch(...)`

| Parameter | Type | Required | Description |
| ------------------- | ---------------------------------------- | -------: | -------------------------------------------------------------------------------------------------------- |
| `worker_id` | `int` | Yes | Worker partition identifier generated by `query_and_batch()`. |
| `predict_fn` | `Callable[[pd.DataFrame], pd.DataFrame]` | Yes | Inference function applied to each input chunk. Must return a DataFrame matching expected output schema. |
| `batch_size_in_mb` | `int` | No | Target chunk size for reading/processing batch files. |
| `timeout_per_batch` | `int` | No | Processing time for each batch in seconds. (Used for Queuing operations) |

**Returns:** `None`

**Recommended**: Tune `batch_size_in_mb` for Outerbounds Small tasks (3 CPU, 15 GB memory), which are about 6x more cost-effective than Medium tasks.

### `publish_results(...)`

| Parameter | Type | Required | Description |
| ------------------------- | ------------------------------- | -------: | ----------------------------------------------------------------- |
| `output_table_name` | `str` | Yes | Destination Snowflake table for predictions. |
| `output_table_definition` | `list[tuple[str, str]] \| None` | No | Optional output schema as `(column_name, snowflake_type)` tuples. |
| `auto_create_table` | `bool` | No | If `True`, creates destination table when missing. |
| `overwrite` | `bool` | No | If `True`, replaces existing table data before loading results. |
| `warehouse` | `str` | No | Snowflake warehouse used for load/publish operations. |
| `use_utc` | `bool` | No | If `True`, uses UTC for load metadata/time handling. |

**Returns:** `None`

---

### `run(...)` (convenience method)

Runs `query_and_batch()` → `process_batch()` → `publish_results()` in a single sequential call.

| Parameter | Type | Required | Description |
| ------------------------- | ---------------------------------------- | -------: | ----------------------------------------------------------------------------------------------------------------------- |
| `input_query` | `str \| Path` | Yes | SQL query string or SQL file path used to fetch source rows. `{schema}` placeholder is resolved by `ds_platform_utils`. |
| `output_table_name` | `str` | Yes | Destination Snowflake table for predictions. |
| `predict_fn` | `Callable[[pd.DataFrame], pd.DataFrame]` | Yes | Inference function applied to each input chunk. Must return a DataFrame matching expected output schema. |
| `ctx` | `dict` | No | Optional substitution map for templated SQL; merged with the internal `{"schema": ...}` mapping before query execution. |
| `output_table_definition` | `list[tuple[str, str]] \| None` | No | Optional output schema as `(column_name, snowflake_type)` tuples. |
| `batch_size_in_mb` | `int` | No | Target chunk size for reading/processing batch files. |
| `timeout_per_batch` | `int` | No | Processing time for each batch in seconds. (Used for Queuing operations) |
| `auto_create_table` | `bool` | No | If `True`, creates destination table when missing. |
| `overwrite` | `bool` | No | If `True`, replaces existing table data before loading results. |
| `warehouse` | `str` | No | Snowflake warehouse used for load/publish operations. |
| `use_utc` | `bool` | No | If `True`, uses UTC for load metadata/time handling. |

**Returns:** `None`

**Recommended**: Tune `batch_size_in_mb` for Outerbounds Small tasks (3 CPU, 15 GB memory), which are about 6x more cost-effective than Medium tasks.
37 changes: 37 additions & 0 deletions docs/metaflow/make_pydantic_parser_fn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# `make_pydantic_parser_fn`

Source: `ds_platform_utils.metaflow.validate_config.make_pydantic_parser_fn`

Creates a Metaflow `Config(..., parser=...)` parser backed by a Pydantic model.

## Signature

```python
make_pydantic_parser_fn(
pydantic_model: type[BaseModel],
) -> Callable[[str], dict]
```

## What it does

- Parses config content as JSON, TOML, or YAML.
- Validates and normalizes with Pydantic.
- Returns a dict with applied defaults from the model.

## Parameters

| Parameter | Type | Required | Description |
| ---------------- | ----------------- | -------: | ------------------------------------------------------------------- |
| `pydantic_model` | `type[BaseModel]` | Yes | Pydantic model class used to validate and normalize config content. |

**Returns:** `Callable[[str], dict]` parser function for Metaflow `Config(..., parser=...)`.

## Typical usage

```python
config: MyConfig = Config(
name="config",
default="./configs/default.yaml",
parser=make_pydantic_parser_fn(MyConfig),
)
```
49 changes: 49 additions & 0 deletions docs/metaflow/publish.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# `publish`

Source: `ds_platform_utils.metaflow.write_audit_publish.publish`

Publishes data to a Snowflake table using the write-audit-publish (WAP) pattern.

## Signature

```python
publish(
table_name: str,
query: str | Path,
audits: list[str | Path] | None = None,
ctx: dict[str, Any] | None = None,
warehouse: Literal["XS", "MED", "XL"] | None = None,
use_utc: bool = True,
) -> None
```

## What it does

- Reads SQL from a string or `.sql` path.
- Runs write/audit/publish operations through Snowflake.
- Adds operation details and table links to the Metaflow card when available.

## Parameters

| Parameter | Type | Required | Description |
| ------------ | ------------------------------------ | -------: | -------------------------------------------------------------------------- |
| `table_name` | `str` | Yes | Destination Snowflake table name for the publish operation. |
| `query` | `str \| Path` | Yes | SQL query text or path to SQL file that produces the table data. |
| `audits` | `list[str \| Path] \| None` | No | Optional SQL audits (strings or file paths) executed as validation checks. |
| `ctx` | `dict[str, Any] \| None` | No | Optional template substitution context for SQL operations. |
| `warehouse` | `Literal["XS", "MED", "XL"] \| None` | No | Snowflake warehouse override for this operation. |
| `use_utc` | `bool` | No | If `True`, uses UTC timezone for Snowflake session. |

**Returns:** `None`

## Typical usage

```python
from ds_platform_utils.metaflow import publish

publish(
table_name="MY_TABLE",
query="SELECT * FROM PATTERN_DB.{{schema}}.SOURCE",
audits=["SELECT COUNT(*) > 0 FROM PATTERN_DB.{{schema}}.{{table_name}}"],
)
```
53 changes: 53 additions & 0 deletions docs/metaflow/publish_pandas.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# `publish_pandas`

Source: `ds_platform_utils.metaflow.pandas.publish_pandas`

Writes a pandas DataFrame to Snowflake.

## Signature

```python
publish_pandas(
table_name: str,
df: pd.DataFrame,
add_created_date: bool = False,
chunk_size: int | None = None,
compression: Literal["snappy", "gzip"] = "snappy",
warehouse: Literal["XS", "MED", "XL"] | None = None,
parallel: int = 4,
quote_identifiers: bool = False,
auto_create_table: bool = False,
overwrite: bool = False,
use_logical_type: bool = True,
use_utc: bool = True,
use_s3_stage: bool = False,
table_definition: list[tuple[str, str]] | None = None,
) -> None
```

## What it does

- Validates DataFrame input.
- Writes directly via `write_pandas` or via S3 stage flow for large data.
- Adds a Snowflake table URL to Metaflow card output.

## Parameters

| Parameter | Type | Required | Description |
| ------------------- | ------------------------------------ | -------: | -------------------------------------------------------------------------------------- |
| `table_name` | `str` | Yes | Destination Snowflake table name. |
| `df` | `pd.DataFrame` | Yes | DataFrame to publish. |
| `add_created_date` | `bool` | No | If `True`, adds a `created_date` UTC timestamp column before publish. |
| `chunk_size` | `int \| None` | No | Number of rows per uploaded chunk. If not provided, calculate based on DataFrame size. |
| `compression` | `Literal["snappy", "gzip"]` | No | Compression codec used for staged parquet files. |
| `warehouse` | `Literal["XS", "MED", "XL"] \| None` | No | Snowflake warehouse override for this operation. |
| `parallel` | `int` | No | Number of upload threads used by `write_pandas` path. |
| `quote_identifiers` | `bool` | No | If `False`, passes identifiers unquoted so Snowflake applies uppercase coercion. |
| `auto_create_table` | `bool` | No | If `True`, creates destination table when missing. |
| `overwrite` | `bool` | No | If `True`, replaces existing table contents. |
| `use_logical_type` | `bool` | No | Controls parquet logical type handling when loading data. |
| `use_utc` | `bool` | No | If `True`, uses UTC timezone for Snowflake session. |
| `use_s3_stage` | `bool` | No | If `True`, publishes via S3 stage flow; otherwise uses direct `write_pandas`. |
| `table_definition` | `list[tuple[str, str]] \| None` | No | Optional Snowflake table schema; used by S3 stage flow when table creation is needed. |

**Returns:** `None`
Loading
Loading