feat(spark): SparkSource query+path and pre-computed offline read for BatchFeatureView#6440
Conversation
a98e23b to
57b2489
Compare
57b2489 to
e30e146
Compare
…orical_features The function and its call were removed in this PR but the replacement (_apply_bfv_transformations_for_historical) lives in a separate PR (feast-dev#6440). Removing it here would silently return raw untransformed features for any BatchFeatureView with a Python UDF via the standard get_historical_features() API path (FeatureStore → passthrough_provider → SparkOfflineStore). Restoring the function and its call until feast-dev#6440 lands. Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
…orical_features The function and its call were removed in this PR but the replacement (_apply_bfv_transformations_for_historical) lives in a separate PR (feast-dev#6440). Removing it here would silently return raw untransformed features for any BatchFeatureView with a Python UDF via the standard get_historical_features() API path (FeatureStore → passthrough_provider → SparkOfflineStore). Restoring the function and its call until feast-dev#6440 lands. Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
e30e146 to
4316349
Compare
|
@ntkathole May I request your review here too ? |
jyejare
left a comment
There was a problem hiding this comment.
This PR adds support for SparkSource with combined query+path configuration and pre-computed offline reads for BatchFeatureView. The changes enable reading from materialized offline stores to avoid expensive UDF re-execution. While the feature is useful, there are several security vulnerabilities and error handling gaps that need attention.
| file_format = fv.batch_source.file_format or "parquet" | ||
| try: | ||
| df = spark_session.read.format(file_format).load(fv.batch_source.path) | ||
| df.createOrReplaceTempView(tmp_view) | ||
| ctx = replace(ctx, table_subquery=tmp_view) | ||
| new_contexts.append(ctx) |
There was a problem hiding this comment.
[Critical] Path traversal vulnerability in file loading
The code directly loads files from fv.batch_source.path without any validation. This creates a path traversal security vulnerability where malicious paths could access unauthorized files on the system.
Suggested:
| file_format = fv.batch_source.file_format or "parquet" | |
| try: | |
| df = spark_session.read.format(file_format).load(fv.batch_source.path) | |
| df.createOrReplaceTempView(tmp_view) | |
| ctx = replace(ctx, table_subquery=tmp_view) | |
| new_contexts.append(ctx) | |
| + # Validate and sanitize the path | |
| + import os | |
| + normalized_path = os.path.normpath(fv.batch_source.path) | |
| + if '..' in normalized_path or normalized_path.startswith('/'): | |
| + warnings.warn(f"Invalid path '{fv.batch_source.path}' for '{ctx.name}'", RuntimeWarning) | |
| + new_contexts.append(ctx) | |
| + continue | |
| + try: | |
| + df = spark_session.read.format(file_format).load(normalized_path) | |
| + df.createOrReplaceTempView(tmp_view) | |
| + ctx = replace(ctx, table_subquery=tmp_view) | |
| + new_contexts.append(ctx) | |
| + continue |
There was a problem hiding this comment.
Thanks for flagging this. fv.batch_source.path is a trusted configuration value set by the feature store admin at definition time — it's not runtime user input flowing from an API boundary, so path traversal in the traditional web security sense doesn't apply here.
There was a problem hiding this comment.
Also, the suggested fix using os.path.normpath() would silently corrupt S3/GCS URIs (s3://bucket/path - s3:/bucket/path) since normpath collapses double slashes. Happy to add a lightweight guard for local paths only if you'd like, but I'd keep it separate from object storage paths.
| continue | ||
| except Exception: | ||
| warnings.warn( | ||
| f"Offline path '{fv.batch_source.path}' not readable for " | ||
| f"'{ctx.name}'; falling back to source query.", | ||
| RuntimeWarning, |
There was a problem hiding this comment.
[Warning] Overly broad exception handling masks specific errors
Catching all exceptions with 'except Exception:' is too broad and could mask important errors like permission issues, file corruption, or configuration problems. This makes debugging difficult and could hide security issues.
Suggested:
| continue | |
| except Exception: | |
| warnings.warn( | |
| f"Offline path '{fv.batch_source.path}' not readable for " | |
| f"'{ctx.name}'; falling back to source query.", | |
| RuntimeWarning, | |
| + except (FileNotFoundError, PermissionError) as e: | |
| + warnings.warn( | |
| + f"Offline path '{fv.batch_source.path}' not accessible for " | |
| + f"'{ctx.name}': {str(e)}; falling back to source query.", | |
| + RuntimeWarning, | |
| + stacklevel=2, | |
| + ) | |
| + except Exception as e: | |
| + # Log unexpected errors but continue with fallback | |
| + import logging | |
| + logging.warning(f"Unexpected error loading '{fv.batch_source.path}': {e}") | |
| + warnings.warn( | |
| + f"Failed to load offline path for '{ctx.name}'; falling back to source query.", | |
| + RuntimeWarning, | |
| + stacklevel=2, | |
| + ) |
There was a problem hiding this comment.
Good catch — updated to catch FileNotFoundError and PermissionError explicitly for the expected fallback cases, with a separate except Exception block emitting a distinct warning so unexpected errors aren't silently swallowed.
| if udf is not None: | ||
| temp_view_name = f"__feast_bfv_{ctx.name}_{uuid.uuid4().hex[:8]}" | ||
| spark_session.conf.set("spark.sql.runSQLOnFiles", "true") | ||
| raw_df = spark_session.sql(f"SELECT * FROM {ctx.table_subquery}") |
There was a problem hiding this comment.
[Warning] UDF execution without sandboxing poses security risks
Executing user-defined functions without proper sandboxing or validation could allow arbitrary code execution. This is a significant security risk in production environments.
Suggested:
| if udf is not None: | |
| temp_view_name = f"__feast_bfv_{ctx.name}_{uuid.uuid4().hex[:8]}" | |
| spark_session.conf.set("spark.sql.runSQLOnFiles", "true") | |
| raw_df = spark_session.sql(f"SELECT * FROM {ctx.table_subquery}") | |
| + # Add UDF validation and execution controls | |
| + if not hasattr(udf, '__call__'): | |
| + raise ValueError(f"Invalid UDF for {ctx.name}") | |
| + raw_df = spark_session.sql(f"SELECT * FROM {ctx.table_subquery}") | |
| + # Consider adding timeout and resource limits here | |
| + transformed_df = udf(raw_df) | |
| + transformed_df.createOrReplaceTempView(temp_view_name) |
There was a problem hiding this comment.
The UDF here is feature_transformation.udf registered in the Feast registry by the ML engineer at feature view definition time — same function that runs during materialize(). It's not arbitrary user input. The hasattr(udf, '__call__') check doesn't add a meaningful boundary since any Python object can implement __call__. Executor sandboxing is a cluster-level concern (Spark resource limits, executor isolation) — outside Feast's scope.
4316349 to
5312075
Compare
SparkSource previously required exactly one of table/query/path. This relaxes the constraint to allow query + path together: - query: used for reading raw data during materialization - path: used for offline write-back (offline=True) and as pre-computed read source in get_historical_features Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
… get_historical_features Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
2f6910e to
9eb3d29
Compare
|
|
||
| query_context = _apply_bfv_transformations( | ||
| spark_session, feature_views, query_context | ||
| query_context = _apply_bfv_transformations_for_historical( |
There was a problem hiding this comment.
so previous _apply_bfv_transformations helper not removed from code?
| ) | ||
|
|
||
| if ( | ||
| hasattr(fv, "feature_transformation") |
There was a problem hiding this comment.
Use has_transformation() and get_transformation_function() instead
| max_date_partition: str | ||
|
|
||
|
|
||
| def _apply_bfv_transformations_for_historical( |
There was a problem hiding this comment.
I think instead of writing new helper, better approach is to extend the existing _apply_bfv_transformations with a pre-computed-path shortcut (adding the "read from parquet if offline=True and path exists" branch at the top), rather than creating a parallel function that reimplements UDF execution.
What this PR does / why we need it
get_historical_features()on aBatchFeatureViewre-runs the full UDF on raw data every call. For embedding pipelines, that's 20–40 min of compute per training run even though features already exist from the lastmaterialize.Fix: Route
get_historical_features()to read pre-computed parquet frombatch_source.pathinstead of re-executing the UDF.To support this,
SparkSourcenow acceptsquery + pathtogether:query— raw data read duringmaterialize()path— write-back target and pre-computed read source forget_historical_features()Also allows
BatchFeatureViewwithonline=False, offline=True(offline-only) to skip the online validation check inget_historical_features(), so it can be used purely for training data without configuring an online store.Falls back to live query if
pathdoesn't exist yet (first run before any materialization).Which issue(s) this PR fixes
N/A. Enables efficient training data retrieval for
BatchFeatureViewembedding pipelines without re-running UDFs.Checks
git commit -s)Testing Strategy
get_historical_features()reads from parquet, not UDF, after materialization