diff --git a/docs/user_guides/fs/feature_view/batch-data.md b/docs/user_guides/fs/feature_view/batch-data.md index b3c21b4dc1..609696ce2b 100644 --- a/docs/user_guides/fs/feature_view/batch-data.md +++ b/docs/user_guides/fs/feature_view/batch-data.md @@ -103,6 +103,166 @@ df = feature_view.get_batch_data( If a bare name is ambiguous and no prefix is supplied, `get_feature` raises a `FeatureStoreException` listing the matching Feature Groups. +## Lookback window for PIT joins {#batch-data-lookback} + +Point-in-time (PIT) joins use the condition `feature_fg.event_time <= root_fg.event_time` to pick the latest matching record from each joined Feature Group. +That predicate is a range comparison, not an equality, so partition pruning is defeated and every historical partition of every joined Feature Group is scanned on every read. +As Feature Groups grow with daily ingestion, this scan grows unboundedly. + +The `lookback` argument lets you cap how far back the join is allowed to consider rows from each joined Feature Group. +The backend turns the window into a constant-bound predicate on the joined Feature Group so the ArrowFlight Server with DuckDB and Spark Catalyst pushdown can prune partitions before opening any files. + +### Uniform lookback + +Apply the same window to every joined Feature Group with a `Lookback` instance from `hsfs.constructor.lookback`, or the equivalent dict. +Both forms accept `date` and `datetime` values. + +```python +import datetime +from hsfs.constructor.lookback import Lookback + +df = feature_view.get_batch_data( + start_time=datetime.date(2026, 5, 10), + end_time=datetime.date(2026, 5, 17), + lookback=Lookback( + key="partition_key", + start=datetime.date(2026, 5, 10), + end=datetime.date(2026, 5, 17), + ), +) +``` + +Equivalent dict form, no `Lookback` import required (you still need `datetime` for the bound values): + +```python +import datetime + +df = feature_view.get_batch_data( + start_time=datetime.date(2026, 5, 10), + end_time=datetime.date(2026, 5, 17), + lookback={ + "key": "partition_key", + "start": datetime.date(2026, 5, 10), + "end": datetime.date(2026, 5, 17), + }, +) +``` + +`key` selects which column the predicate is emitted against. +`"partition_key"` targets the Feature Group's partition column so the engine can prune partitions before reading files; the Feature Group must have a single DATE partition column. +`"event_time"` targets the Feature Group's `event_time` column and guarantees row-level correctness but offers only engine-dependent file pruning (Hudi or Delta column-stats indexing). + +`start` is required and emits a `>=` predicate. +`end` is optional and emits a `<=` predicate when present. +When `end` is omitted, only the lower bound is emitted, making the short form below valid: the root Feature Group and every joined Feature Group get ` >= '2026-05-10'` (where `` is each Feature Group's own DATE partition column) and nothing else. + +```python +import datetime + +df = feature_view.get_batch_data( + lookback={ + "key": "partition_key", + "start": datetime.date(2026, 5, 10), + }, +) +``` + +### Per-feature-group lookback + +When different Feature Groups need different windows, use `Lookbacks` to bind a `Lookback` to specific joined Feature Groups. +An optional `default` applies to every Feature Group not listed in `feature_groups`. + +```python +import datetime +from hsfs.constructor.lookback import Lookback, Lookbacks + +df = feature_view.get_batch_data( + start_time=datetime.date(2026, 5, 11), + end_time=datetime.date(2026, 5, 17), + lookback=Lookbacks( + default=Lookback( + key="partition_key", + start=datetime.date(2026, 5, 5), + end=datetime.date(2026, 5, 17), + ), + feature_groups={ + "transactions": Lookback( + key="event_time", + start=datetime.datetime(2026, 5, 1, tzinfo=datetime.timezone.utc), + ), + }, + ), +) +``` + +Skip the `default` to apply lookbacks only to the listed Feature Groups; unlisted Feature Groups receive no lookback for that call. + +```python +df = feature_view.get_batch_data( + start_time=datetime.date(2026, 5, 11), + end_time=datetime.date(2026, 5, 17), + lookback=Lookbacks( + feature_groups={ + "transactions": Lookback( + key="partition_key", start=datetime.date(2026, 5, 5) + ), + } + ), +) +``` + +`feature_groups` keys identify a Feature Group in one of two ways: by name (a bare string matches every version of the named Feature Group at any join site in the Feature View) or by passing the Feature Group instance itself (matches the exact `(name, version)` so a specific version can be targeted when multiple versions of the same Feature Group are joined). +When both forms are supplied for the same name, the instance entry wins at its specific join site and the bare-string entry still applies elsewhere. + +Equivalent dict form: + +```python +import datetime + +df = feature_view.get_batch_data( + start_time=datetime.date(2026, 5, 11), + end_time=datetime.date(2026, 5, 17), + lookback={ + "default": { + "key": "partition_key", + "start": datetime.date(2026, 5, 5), + "end": datetime.date(2026, 5, 17), + }, + "feature_groups": { + "transactions": { + "key": "event_time", + "start": datetime.datetime(2026, 5, 1, tzinfo=datetime.timezone.utc), + }, + }, + }, +) +``` + +### Combining `lookback` with other filters + +The `lookback` predicate combines with filters declared on the Query, but where the filter is attached changes whether the engine can prune partitions on the root Feature Group. + +Filters attached to a sub-query (`fg.select(...).filter(...)`) always prune on that Feature Group regardless of which Feature Group they reference. +Filters attached to the outer query (`query.filter(...)` after the join, or `extra_filter` on `get_batch_data`) prune the root only when every referenced feature belongs to the root Feature Group. +A mixed-Feature-Group outer filter still produces correct results — the predicates apply at the outer level — but the root's partitions are no longer pruned at file-listing time. + +```python +# Root sub-query filter — lookback prunes both root and joined Feature Groups. +query = root.select_all().filter(root.amount > 100).join(dim.select_all()) + +# Joined sub-query filter — lookback still prunes both sides. +query = root.select_all().join(dim.select_all().filter(dim.category == "X")) + +# Outer filter referencing a joined Feature Group — root pruning is lost; +# joined Feature Groups still prune via their own predicates. +query = root.select_all().join(dim.select_all()).filter(dim.category == "X") +``` + +For best pruning, keep call-site filters at the sub-query level when their predicate references only one Feature Group. + +The same `lookback` argument is supported on `create_training_data` (see [the training-data section][training-data-lookback]). +Both `extra_filter` and `lookback` can be combined. + ## Creation with transformation If you have specified transformation functions when creating a feature view, you will get back transformed batch data as well. diff --git a/docs/user_guides/fs/feature_view/training-data.md b/docs/user_guides/fs/feature_view/training-data.md index f9319b9e76..a3b6543ea0 100644 --- a/docs/user_guides/fs/feature_view/training-data.md +++ b/docs/user_guides/fs/feature_view/training-data.md @@ -55,6 +55,50 @@ df_restaurant_travel = feature_view.training_data( ) ``` +### Lookback window for PIT joins {#training-data-lookback} + +When training data is materialised from a Feature View that joins multiple Feature Groups, the PIT join scans every historical partition of the root and every joined Feature Group. +The `lookback` argument caps how far back the join is allowed to consider rows from the root and each joined Feature Group, so the engine can prune partitions before reading any files. +Apply the same window uniformly with `Lookback`, or use `Lookbacks` for per-Feature-Group control; the argument shape mirrors the one accepted by `get_batch_data` (see [the batch-data lookback section][batch-data-lookback]). + +```python +import datetime +from hsfs.constructor.lookback import Lookback + +version, job = feature_view.create_training_data( + start_time=datetime.date(2026, 5, 10), + end_time=datetime.date(2026, 5, 17), + description="fraud batch training data, weekly partition pruning", + lookback=Lookback( + key="partition_key", + start=datetime.date(2026, 5, 10), + end=datetime.date(2026, 5, 17), + ), +) +``` + +Equivalent dict form (no `Lookback` import, but `datetime` is still required for the bound values): + +```python +import datetime + +version, job = feature_view.create_training_data( + start_time=datetime.date(2026, 5, 10), + end_time=datetime.date(2026, 5, 17), + description="fraud batch training data, weekly partition pruning", + lookback={ + "key": "partition_key", + "start": datetime.date(2026, 5, 10), + "end": datetime.date(2026, 5, 17), + }, +) +``` + +For different lookbacks per joined Feature Group, pass a `Lookbacks` — see the [per-feature-group lookback section][batch-data-lookback] of the batch-data guide for the full shape. + +The resolved window is persisted with the training dataset, so re-reading the same training dataset version reconstructs the same per-join predicate. +The same parameter is accepted by `create_train_test_split` and `create_train_validation_test_split`. + ### Train/Validation/Test Splits In most cases, ML practitioners want to slice a dataset into multiple splits, most commonly train-test splits or train-validation-test splits, so that they can train and test their models.