Skip to content

feat(spark): SparkSource query+path and pre-computed offline read for BatchFeatureView#6440

Open
abhijeet-dhumal wants to merge 5 commits into
feast-dev:masterfrom
abhijeet-dhumal:feat/spark-bfv-offline-historical-features
Open

feat(spark): SparkSource query+path and pre-computed offline read for BatchFeatureView#6440
abhijeet-dhumal wants to merge 5 commits into
feast-dev:masterfrom
abhijeet-dhumal:feat/spark-bfv-offline-historical-features

Conversation

@abhijeet-dhumal
Copy link
Copy Markdown
Contributor

@abhijeet-dhumal abhijeet-dhumal commented May 27, 2026

What this PR does / why we need it

get_historical_features() on a BatchFeatureView re-runs the full UDF on raw data every call. For embedding pipelines, that's 20–40 min of compute per training run even though features already exist from the last materialize.

Fix: Route get_historical_features() to read pre-computed parquet from batch_source.path instead of re-executing the UDF.

To support this, SparkSource now accepts query + path together:

  • query — raw data read during materialize()
  • path — write-back target and pre-computed read source for get_historical_features()
SparkSource(
    query="SELECT id, text, event_timestamp FROM bronze.documents",
    path="s3://my-bucket/feast/features/document_embeddings/",
)

Also allows BatchFeatureView with online=False, offline=True (offline-only) to skip the online validation check in get_historical_features(), so it can be used purely for training data without configuring an online store.

Falls back to live query if path doesn't exist yet (first run before any materialization).

Which issue(s) this PR fixes

N/A. Enables efficient training data retrieval for BatchFeatureView embedding pipelines without re-running UDFs.

Checks

  • I've made sure the tests are passing.
  • My commits are signed off (git commit -s)
  • My PR title follows conventional commits format

Testing Strategy

  • Unit tests — offline path routing, SparkSource constraint, graceful fallback
  • Manual tests — get_historical_features() reads from parquet, not UDF, after materialization

@abhijeet-dhumal abhijeet-dhumal changed the title Feat/spark bfv offline historical features feat(spark): SparkSource query+path and pre-computed offline read for BatchFeatureView May 27, 2026
@abhijeet-dhumal abhijeet-dhumal force-pushed the feat/spark-bfv-offline-historical-features branch from a98e23b to 57b2489 Compare May 27, 2026 14:50
@abhijeet-dhumal abhijeet-dhumal marked this pull request as ready for review May 28, 2026 06:23
@abhijeet-dhumal abhijeet-dhumal requested a review from a team as a code owner May 28, 2026 06:23
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 potential issue.

View 5 additional findings in Devin Review.

Open in Devin Review

Comment thread sdk/python/feast/feature_store.py
@abhijeet-dhumal abhijeet-dhumal force-pushed the feat/spark-bfv-offline-historical-features branch from 57b2489 to e30e146 Compare May 29, 2026 08:57
abhijeet-dhumal added a commit to abhijeet-dhumal/feast that referenced this pull request May 29, 2026
…orical_features

The function and its call were removed in this PR but the replacement
(_apply_bfv_transformations_for_historical) lives in a separate PR (feast-dev#6440).
Removing it here would silently return raw untransformed features for any
BatchFeatureView with a Python UDF via the standard get_historical_features()
API path (FeatureStore → passthrough_provider → SparkOfflineStore).

Restoring the function and its call until feast-dev#6440 lands.

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
abhijeet-dhumal added a commit to abhijeet-dhumal/feast that referenced this pull request Jun 1, 2026
…orical_features

The function and its call were removed in this PR but the replacement
(_apply_bfv_transformations_for_historical) lives in a separate PR (feast-dev#6440).
Removing it here would silently return raw untransformed features for any
BatchFeatureView with a Python UDF via the standard get_historical_features()
API path (FeatureStore → passthrough_provider → SparkOfflineStore).

Restoring the function and its call until feast-dev#6440 lands.

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
@abhijeet-dhumal abhijeet-dhumal force-pushed the feat/spark-bfv-offline-historical-features branch from e30e146 to 4316349 Compare June 1, 2026 07:48
@abhijeet-dhumal
Copy link
Copy Markdown
Contributor Author

@ntkathole May I request your review here too ?

Copy link
Copy Markdown
Collaborator

@jyejare jyejare left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR adds support for SparkSource with combined query+path configuration and pre-computed offline reads for BatchFeatureView. The changes enable reading from materialized offline stores to avoid expensive UDF re-execution. While the feature is useful, there are several security vulnerabilities and error handling gaps that need attention.

Comment on lines +114 to +119
file_format = fv.batch_source.file_format or "parquet"
try:
df = spark_session.read.format(file_format).load(fv.batch_source.path)
df.createOrReplaceTempView(tmp_view)
ctx = replace(ctx, table_subquery=tmp_view)
new_contexts.append(ctx)
Copy link
Copy Markdown
Collaborator

@jyejare jyejare Jun 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Critical] Path traversal vulnerability in file loading

The code directly loads files from fv.batch_source.path without any validation. This creates a path traversal security vulnerability where malicious paths could access unauthorized files on the system.

Suggested:

Suggested change
file_format = fv.batch_source.file_format or "parquet"
try:
df = spark_session.read.format(file_format).load(fv.batch_source.path)
df.createOrReplaceTempView(tmp_view)
ctx = replace(ctx, table_subquery=tmp_view)
new_contexts.append(ctx)
+ # Validate and sanitize the path
+ import os
+ normalized_path = os.path.normpath(fv.batch_source.path)
+ if '..' in normalized_path or normalized_path.startswith('/'):
+ warnings.warn(f"Invalid path '{fv.batch_source.path}' for '{ctx.name}'", RuntimeWarning)
+ new_contexts.append(ctx)
+ continue
+ try:
+ df = spark_session.read.format(file_format).load(normalized_path)
+ df.createOrReplaceTempView(tmp_view)
+ ctx = replace(ctx, table_subquery=tmp_view)
+ new_contexts.append(ctx)
+ continue

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for flagging this. fv.batch_source.path is a trusted configuration value set by the feature store admin at definition time — it's not runtime user input flowing from an API boundary, so path traversal in the traditional web security sense doesn't apply here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the suggested fix using os.path.normpath() would silently corrupt S3/GCS URIs (s3://bucket/path - s3:/bucket/path) since normpath collapses double slashes. Happy to add a lightweight guard for local paths only if you'd like, but I'd keep it separate from object storage paths.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make Sense.

Comment on lines +120 to +125
continue
except Exception:
warnings.warn(
f"Offline path '{fv.batch_source.path}' not readable for "
f"'{ctx.name}'; falling back to source query.",
RuntimeWarning,
Copy link
Copy Markdown
Collaborator

@jyejare jyejare Jun 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Warning] Overly broad exception handling masks specific errors

Catching all exceptions with 'except Exception:' is too broad and could mask important errors like permission issues, file corruption, or configuration problems. This makes debugging difficult and could hide security issues.
Suggested:

Suggested change
continue
except Exception:
warnings.warn(
f"Offline path '{fv.batch_source.path}' not readable for "
f"'{ctx.name}'; falling back to source query.",
RuntimeWarning,
+ except (FileNotFoundError, PermissionError) as e:
+ warnings.warn(
+ f"Offline path '{fv.batch_source.path}' not accessible for "
+ f"'{ctx.name}': {str(e)}; falling back to source query.",
+ RuntimeWarning,
+ stacklevel=2,
+ )
+ except Exception as e:
+ # Log unexpected errors but continue with fallback
+ import logging
+ logging.warning(f"Unexpected error loading '{fv.batch_source.path}': {e}")
+ warnings.warn(
+ f"Failed to load offline path for '{ctx.name}'; falling back to source query.",
+ RuntimeWarning,
+ stacklevel=2,
+ )

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — updated to catch FileNotFoundError and PermissionError explicitly for the expected fallback cases, with a separate except Exception block emitting a distinct warning so unexpected errors aren't silently swallowed.

Comment on lines +143 to +146
if udf is not None:
temp_view_name = f"__feast_bfv_{ctx.name}_{uuid.uuid4().hex[:8]}"
spark_session.conf.set("spark.sql.runSQLOnFiles", "true")
raw_df = spark_session.sql(f"SELECT * FROM {ctx.table_subquery}")
Copy link
Copy Markdown
Collaborator

@jyejare jyejare Jun 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Warning] UDF execution without sandboxing poses security risks

Executing user-defined functions without proper sandboxing or validation could allow arbitrary code execution. This is a significant security risk in production environments.

Suggested:

Suggested change
if udf is not None:
temp_view_name = f"__feast_bfv_{ctx.name}_{uuid.uuid4().hex[:8]}"
spark_session.conf.set("spark.sql.runSQLOnFiles", "true")
raw_df = spark_session.sql(f"SELECT * FROM {ctx.table_subquery}")
+ # Add UDF validation and execution controls
+ if not hasattr(udf, '__call__'):
+ raise ValueError(f"Invalid UDF for {ctx.name}")
+ raw_df = spark_session.sql(f"SELECT * FROM {ctx.table_subquery}")
+ # Consider adding timeout and resource limits here
+ transformed_df = udf(raw_df)
+ transformed_df.createOrReplaceTempView(temp_view_name)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UDF here is feature_transformation.udf registered in the Feast registry by the ML engineer at feature view definition time — same function that runs during materialize(). It's not arbitrary user input. The hasattr(udf, '__call__') check doesn't add a meaningful boundary since any Python object can implement __call__. Executor sandboxing is a cluster-level concern (Spark resource limits, executor isolation) — outside Feast's scope.

@abhijeet-dhumal abhijeet-dhumal force-pushed the feat/spark-bfv-offline-historical-features branch from 4316349 to 5312075 Compare June 2, 2026 12:45
@abhijeet-dhumal abhijeet-dhumal requested a review from jyejare June 2, 2026 13:38
SparkSource previously required exactly one of table/query/path.
This relaxes the constraint to allow query + path together:
- query: used for reading raw data during materialization
- path: used for offline write-back (offline=True) and as
  pre-computed read source in get_historical_features

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
… get_historical_features

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
@ntkathole ntkathole force-pushed the feat/spark-bfv-offline-historical-features branch from 2f6910e to 9eb3d29 Compare June 3, 2026 08:03

query_context = _apply_bfv_transformations(
spark_session, feature_views, query_context
query_context = _apply_bfv_transformations_for_historical(
Copy link
Copy Markdown
Member

@ntkathole ntkathole Jun 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so previous _apply_bfv_transformations helper not removed from code?

)

if (
hasattr(fv, "feature_transformation")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use has_transformation() and get_transformation_function() instead

max_date_partition: str


def _apply_bfv_transformations_for_historical(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of writing new helper, better approach is to extend the existing _apply_bfv_transformations with a pre-computed-path shortcut (adding the "read from parquet if offline=True and path exists" branch at the top), rather than creating a parallel function that reimplements UDF execution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants