diff --git a/docs/user_guides/fs/feature_group/on_demand_transformations.md b/docs/user_guides/fs/feature_group/on_demand_transformations.md index 9eadc8c45c..9e883e86af 100644 --- a/docs/user_guides/fs/feature_group/on_demand_transformations.md +++ b/docs/user_guides/fs/feature_group/on_demand_transformations.md @@ -270,3 +270,39 @@ On-demand transformation functions can also be accessed and executed as normal f "on_demand_feature1" ](feature_vector["transaction_time"], datetime.now()) ``` + +## Chaining On-Demand Transformations + +On-demand transformations attached to the same feature group can be chained — one ODT's output column can serve as another ODT's input. +The execution order is resolved automatically; the DAG is visible from the feature group overview page in the Hopsworks UI. + +An ODT's output column becomes a regular feature in the feature group, which a downstream feature view can consume and pass into a model-dependent transformation. +This is the implicit cross-DAG path between ODT and MDT chains: nothing extra to configure on either side. + +!!! example "ODT that consumes an upstream ODT's output" + === "Python" + + ```python + from hopsworks import udf + + + @udf(int) + def add_one(col): + return col + 1 + + + @udf(int) + def double(col): + return col * 2 + + + fg = fs.create_feature_group( + name="chained_odt_fg", + version=1, + primary_key=["id"], + transformation_functions=[ + add_one("raw").alias("raw_plus_one"), + double("raw_plus_one").alias("raw_plus_one_doubled"), + ], + ) + ``` diff --git a/docs/user_guides/fs/feature_view/model-dependent-transformations.md b/docs/user_guides/fs/feature_view/model-dependent-transformations.md index bed6b1137a..1e0211507c 100644 --- a/docs/user_guides/fs/feature_view/model-dependent-transformations.md +++ b/docs/user_guides/fs/feature_view/model-dependent-transformations.md @@ -175,3 +175,39 @@ To achieve this, set the `transform` parameter to False. # Fetching untransformed batch data. untransformed_batch_data = feature_view.get_batch_data(transform=False) ``` + +## Chaining Model-Dependent Transformations + +A model-dependent transformation can consume another MDT's output as its input. +The DAG is resolved automatically at execution time, so producers always run before consumers. + +!!! example "Chaining two normalizers and a sum" + === "Python" + + ```python + from hopsworks import udf + + + @udf(int) + def add_one(col): + return col + 1 + + + @udf(int) + def add(a, b): + return a + b + + + fv = fs.create_feature_view( + name="chained_mdt_fv", + query=fg.select_all(), + transformation_functions=[ + add_one("data1").alias("data1_plus_one"), + add_one("data2").alias("data2_plus_one"), + add("data1_plus_one", "data2_plus_one").alias("sum_plus_two"), + ], + version=1, + ) + ``` + +See [Transformation Functions Performance Tuning][transformation-functions-performance-tuning] for `n_processes` semantics on chained DAGs. diff --git a/docs/user_guides/fs/transformation_functions.md b/docs/user_guides/fs/transformation_functions.md index 4e2487f3dd..25332f48d4 100644 --- a/docs/user_guides/fs/transformation_functions.md +++ b/docs/user_guides/fs/transformation_functions.md @@ -345,3 +345,46 @@ If only the `name` is provided, then the version will default to 1. ## Using transformation functions Transformation functions can be used by attaching it to a feature view to [create model-dependent transformations](./feature_view/model-dependent-transformations.md) or attached to feature groups to [create on-demand transformations](./feature_group/on_demand_transformations.md) + +## Chained Transformation Functions + +Transformation functions can be chained — the output column of one transformation function can serve as the input to another. +Hopsworks resolves the execution order automatically using a topological sort of the resulting DAG, so dependencies always run before their consumers. +Chaining works for both on-demand transformations attached to a feature group and model-dependent transformations attached to a feature view. + +!!! example "Chained MDTs on a feature view" + === "Python" + + ```python + from hopsworks import udf + + + @udf(int) + def add_one(col): + return col + 1 + + + @udf(int) + def add(a, b): + return a + b + + + fv = fs.create_feature_view( + name="chained_mdts_fv", + query=fg.select_all(), + transformation_functions=[ + add_one("data1").alias("data1_plus_one"), + add_one("data2").alias("data2_plus_one"), + add("data1_plus_one", "data2_plus_one").alias("sum_plus_two"), + ], + version=1, + ) + ``` + +The DAG is visible from the Hopsworks UI on both the feature view and feature group overview pages under "Transformation execution DAG." +The same DAG drives offline training data generation and online feature vector retrieval, so chains apply uniformly across both inference paths. + +Cross-DAG chaining is implicit: an on-demand transformation's output column becomes a feature in its feature group, which a feature view can consume and feed into a model-dependent transformation. +No additional setup is required. + +For tuning parallelism, see [Transformation Functions Performance Tuning][transformation-functions-performance-tuning]. diff --git a/docs/user_guides/fs/transformation_functions_performance.md b/docs/user_guides/fs/transformation_functions_performance.md new file mode 100644 index 0000000000..3f2c4d5e27 --- /dev/null +++ b/docs/user_guides/fs/transformation_functions_performance.md @@ -0,0 +1,55 @@ +# Transformation Functions Performance Tuning + +This page covers how to tune transformation function execution for offline and online workloads, when to set `n_processes`, and the latency trade-offs of the different paths. + +## When parallelism helps + +Transformation function execution is sequential by default. +A worker pool is spawned only when the workload justifies the overhead — small DAGs and small inputs run faster sequentially because pool spawn and shared-memory setup cost more than the work itself. + +Hopsworks applies these defaults when `n_processes` is not provided: + +- DataFrame input with fewer than 10000 rows or fewer than two TFs in the chain: sequential. +- DataFrame input large enough or chain wide enough: parallel, capped at the DAG's maximum width. +- `dict` or `list[dict]` input (online vector and batch paths): sequential by default. + Per-row UDF work is usually cheaper than process-pool overhead, so the engine keeps the safe default and lets the caller opt in. +- Spark DataFrames: `n_processes` is ignored; the DAG is pushed down to Spark. + +Callers can always force a specific value by passing `n_processes` explicitly. + +## Where `n_processes` takes effect + +`n_processes` is honored on every Python-engine entry point — DataFrames, single dicts, and lists of dicts — but the parallelism axis differs by input shape: + +- **DataFrame input.** + Independent TFs in the DAG run concurrently in the process pool; dependent TFs are submitted as soon as their predecessors complete. + This is the offline path used by `training_data`, `get_batch_data`, and `execute_mdts(dataframe)`. +- **Single dict (`get_feature_vector(entry)`).** + When the chain has independent branches (`max_parallelism >= 2`), the engine submits those branches concurrently for the single row. + A strictly linear chain has nothing to parallelize and the call falls through to the sequential path. +- **List of dicts (`get_feature_vectors(entries)`).** + Rows are chunked across workers; each worker runs the chain sequentially on its slice. + This is the most useful parallelism axis for batched online calls with CPU-heavy chained UDFs. + +## Latency reference + +The chained-TF online latency benchmark in the `loadtest` repository records p50, p95, and p99 across UDF styles and call shapes. +Absolute latency depends strongly on host CPU and UDF cost, so this guide does not publish fixed numbers — running the benchmark against your own deployment gives a meaningful baseline. + +Run the benchmark with: + +```bash +pytest -m e2e_performance \ + tests/performance/feature_store/python_driver/test_online_batch_chaining_benchmark.py +``` + +The benchmark sweeps four UDF styles (vectorized Pandas, fake Pandas with an internal loop, scalar Python, and CPU-heavy Python), two batch sizes (1 and 100), and two `n_processes` values (1 and 2). +It writes `online_batch_chaining_benchmark.csv` with p50, p95, and p99 for each cell. +Consult that CSV when tuning `n_processes` for your deployment. + +Two patterns hold across deployments and are worth knowing up front: + +- For vectorized Pandas UDFs at small batch sizes, sequential is at least as fast as parallel. + The framework's auto-vectorization wins; the pool overhead loses. +- For CPU-heavy Python UDFs in a chain with parallelism (diamond DAGs, multi-output, batch sizes above ~10), `n_processes >= 2` typically delivers a meaningful p99 reduction. + For a chained single-vector call with two or more independent branches, setting `n_processes=2` overlaps those branches and reduces tail latency on CPU-heavy UDFs; if the branches dominate the chain runtime, the speedup approaches their concurrency. diff --git a/docs/user_guides/migration/40_migration.md b/docs/user_guides/migration/40_migration.md index 58268d31f6..193bebb8a8 100644 --- a/docs/user_guides/migration/40_migration.md +++ b/docs/user_guides/migration/40_migration.md @@ -110,3 +110,18 @@ The following is how transformation functions were used in previous versions of ``` Note that the number of lines of code required has been significantly reduced using the “@hopsworks.udf” python decorator. + +## FSTORE-1938 — Chained Transformation Functions + +This release adds support for chained transformation functions: a transformation function's output column can serve as the input to another transformation function in the same feature group or feature view. +The DAG is resolved automatically by topological sort at execution time. + +The following behaviors changed and may affect existing pipelines: + +- Stricter validation of transformation function input types on feature view and feature group create or update. + Pre-FSTORE-1938 versions stored an empty string when an input feature's type could not be resolved. + This release validates types strictly on create and update and raises `TRANSFORMATION_FUNCTION_INPUT_TYPE_UNRESOLVABLE` if a typoed or missing feature reference produces an empty type. + Read paths continue to tolerate empty types so existing detail pages do not break on upgrade. + +No minimum backend version is required for non-chained usage — existing feature views without chains continue to work unchanged after upgrade. +Creating a chained feature view requires both backend and SDK on this release. diff --git a/mkdocs.yml b/mkdocs.yml index f8dc7bf80f..408a57de7e 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -113,6 +113,7 @@ nav: - Feature Logging: user_guides/fs/feature_view/feature_logging.md - Vector Similarity Search: user_guides/fs/vector_similarity_search.md - Transformation Functions: user_guides/fs/transformation_functions.md + - Transformation Functions Performance Tuning: user_guides/fs/transformation_functions_performance.md - Compute Engines: user_guides/fs/compute_engines.md - Client Integrations: - user_guides/integrations/index.md