From ddb1106e092f9b772279bbf67c57fd9790af872a Mon Sep 17 00:00:00 2001 From: manu-sj Date: Sat, 30 May 2026 18:58:32 +0200 Subject: [PATCH 1/2] [FSTORE-2030] Add support for specifying lookback windows for PIT queries https://hopsworks.atlassian.net/browse/FSTORE-2030 PIT joins for Hopsworks training datasets and batch feature retrieval emit `feature_fg.event_time <= root_fg.event_time` to pick the latest matching record. The range predicate defeats partition pruning so every historical partition of every joined feature group is scanned on every read, and the scan grows unboundedly with daily ingestion. Add an optional `lookback` parameter to the PIT join API (on `feature_view.get_batch_data`, the three in-memory training-data methods `training_data` / `train_test_split` / `train_validation_test_split`, and the three materialised create variants `create_training_data` / `create_train_test_split` / `create_train_validation_test_split`). When set, the backend emits an additional constant-bound predicate on a single DATE partition column of the root and each joined feature group, so Spark Catalyst's PartitionFilters, Hudi's HoodieFileIndex, and flyingduck's directory walker can each prune partitions before opening files. Adds a "Lookback window for PIT joins" section to the batch-data and training-data user guides covering uniform and per-feature-group shapes, with concrete instance and dict examples and the partition- column eligibility caveat (single DATE partition column). Adds a "Combining `lookback` with other filters" section explaining the sub-query vs outer-filter pruning behaviour, including the mixed-FG outer-filter case where Catalyst inlines the dim wrap and the root loses FileScan-level pruning. Reviewed-by: OpenAI Codex (GPT-5 via codex-plugin-cc 1.0.4) Signed-off-by: Manu Sathyarajan Joseph Co-Authored-By: Claude Opus 4.7 (1M context) --- .../user_guides/fs/feature_view/batch-data.md | 154 ++++++++++++++++++ .../fs/feature_view/training-data.md | 42 +++++ 2 files changed, 196 insertions(+) diff --git a/docs/user_guides/fs/feature_view/batch-data.md b/docs/user_guides/fs/feature_view/batch-data.md index b3c21b4dc1..6adbfe826a 100644 --- a/docs/user_guides/fs/feature_view/batch-data.md +++ b/docs/user_guides/fs/feature_view/batch-data.md @@ -103,6 +103,160 @@ df = feature_view.get_batch_data( If a bare name is ambiguous and no prefix is supplied, `get_feature` raises a `FeatureStoreException` listing the matching Feature Groups. +## Lookback window for PIT joins {#batch-data-lookback} + +Point-in-Time (PIT) joins use the condition `feature_fg.event_time <= root_fg.event_time` to pick the latest matching record from each joined Feature Group. +That predicate is a range comparison, not an equality, so partition pruning is defeated and every historical partition of every joined Feature Group is scanned on every read. +As Feature Groups grow with daily ingestion, this scan grows unboundedly. + +The `lookback` argument lets you cap how far back the join is allowed to consider rows from each joined Feature Group. +The backend turns the window into a constant-bound predicate on the joined Feature Group so flyingduck's directory walker and Spark's catalyst pushdown can prune partitions before opening any files. + +### Uniform lookback + +Apply the same window to every joined Feature Group with a `Lookback` instance from `hsfs.constructor.lookback`, or the equivalent dict. +Both forms accept `date` and `datetime` values. + +```python +import datetime +from hsfs.constructor.lookback import Lookback + +df = feature_view.get_batch_data( + start_time=datetime.date(2026, 5, 10), + end_time=datetime.date(2026, 5, 17), + lookback=Lookback( + key="partition_key", + start=datetime.date(2026, 5, 10), + end=datetime.date(2026, 5, 17), + ), +) +``` + +Equivalent dict form, no import required: + +```python +df = feature_view.get_batch_data( + start_time=datetime.date(2026, 5, 10), + end_time=datetime.date(2026, 5, 17), + lookback={ + "key": "partition_key", + "start": datetime.date(2026, 5, 10), + "end": datetime.date(2026, 5, 17), + }, +) +``` + +`key` selects which column the predicate is emitted against. +`"partition_key"` targets the Feature Group's partition column so the engine can prune partitions before reading files; the Feature Group must have a single DATE partition column. +`"event_time"` targets the Feature Group's `event_time` column and guarantees row-level correctness but offers only engine-dependent file pruning (Hudi or Delta column-stats indexing). + +`start` is required and emits a `>=` predicate. +`end` is optional and emits a `<=` predicate when present. +When `end` is omitted, only the lower bound is emitted, making the short form below valid: the root Feature Group and every joined Feature Group get `partition_col >= '2026-05-10'` and nothing else. + +```python +df = feature_view.get_batch_data( + lookback={ + "key": "partition_key", + "start": datetime.date(2026, 5, 10), + }, +) +``` + +### Per-feature-group lookback + +When different Feature Groups need different windows, use `Lookbacks` to bind a `Lookback` to specific joined Feature Groups. +An optional `default` applies to every Feature Group not listed in `feature_groups`. + +```python +import datetime +from hsfs.constructor.lookback import Lookback, Lookbacks + +df = feature_view.get_batch_data( + start_time=datetime.date(2026, 5, 11), + end_time=datetime.date(2026, 5, 17), + lookback=Lookbacks( + default=Lookback( + key="partition_key", + start=datetime.date(2026, 5, 5), + end=datetime.date(2026, 5, 17), + ), + feature_groups={ + "transactions": Lookback( + key="event_time", + start=datetime.datetime(2026, 5, 1, tzinfo=datetime.timezone.utc), + ), + }, + ), +) +``` + +Skip the `default` to apply lookbacks only to the listed Feature Groups; unlisted Feature Groups receive no lookback for that call. + +```python +df = feature_view.get_batch_data( + start_time=datetime.date(2026, 5, 11), + end_time=datetime.date(2026, 5, 17), + lookback=Lookbacks( + feature_groups={ + "transactions": Lookback( + key="partition_key", start=datetime.date(2026, 5, 5) + ), + } + ), +) +``` + +`feature_groups` keys identify a Feature Group in one of two ways: by name (a bare string matches every version of the named Feature Group at any join site in the Feature View) or by passing the Feature Group instance itself (matches the exact `(name, version)` so a specific version can be targeted when multiple versions of the same Feature Group are joined). +When both forms are supplied for the same name, the instance entry wins at its specific join site and the bare-string entry still applies elsewhere. + +Equivalent dict form: + +```python +df = feature_view.get_batch_data( + start_time=datetime.date(2026, 5, 11), + end_time=datetime.date(2026, 5, 17), + lookback={ + "default": { + "key": "partition_key", + "start": datetime.date(2026, 5, 5), + "end": datetime.date(2026, 5, 17), + }, + "feature_groups": { + "transactions": { + "key": "event_time", + "start": datetime.datetime(2026, 5, 1, tzinfo=datetime.timezone.utc), + }, + }, + }, +) +``` + +### Combining `lookback` with other filters + +The `lookback` predicate combines with filters declared on the Query, but where the filter is attached changes whether the engine can prune partitions on the root Feature Group. + +Filters attached to a sub-query (`fg.select(...).filter(...)`) always prune on that Feature Group regardless of which Feature Group they reference. +Filters attached to the outer query (`query.filter(...)` after the join, or `extra_filter` on `get_batch_data`) prune the root only when every referenced feature belongs to the root Feature Group. +A mixed-Feature-Group outer filter still produces correct results — the predicates apply at the outer level — but the root's partitions are no longer pruned at file-listing time. + +```python +# Root sub-query filter — lookback prunes both root and joined Feature Groups. +query = root.select_all().filter(root.amount > 100).join(dim.select_all()) + +# Joined sub-query filter — lookback still prunes both sides. +query = root.select_all().join(dim.select_all().filter(dim.category == "X")) + +# Outer filter referencing a joined Feature Group — root pruning is lost; +# joined Feature Groups still prune via their own predicates. +query = root.select_all().join(dim.select_all()).filter(dim.category == "X") +``` + +For best pruning, keep call-site filters at the sub-query level when their predicate references only one Feature Group. + +The same `lookback` argument is supported on `create_training_data` (see [the training-data section][training-data-lookback]). +Both `extra_filter` and `lookback` can be combined. + ## Creation with transformation If you have specified transformation functions when creating a feature view, you will get back transformed batch data as well. diff --git a/docs/user_guides/fs/feature_view/training-data.md b/docs/user_guides/fs/feature_view/training-data.md index f9319b9e76..23b7ab5ca3 100644 --- a/docs/user_guides/fs/feature_view/training-data.md +++ b/docs/user_guides/fs/feature_view/training-data.md @@ -55,6 +55,48 @@ df_restaurant_travel = feature_view.training_data( ) ``` +### Lookback window for PIT joins {#training-data-lookback} + +When training data is materialised from a Feature View that joins multiple Feature Groups, the PIT join scans every historical partition of the root and every joined Feature Group. +The `lookback` argument caps how far back the join is allowed to consider rows from the root and each joined Feature Group, so the engine can prune partitions before reading any files. +Apply the same window uniformly with `Lookback`, or use `Lookbacks` for per-Feature-Group control; the argument shape mirrors the one accepted by `get_batch_data` (see [the batch-data lookback section][batch-data-lookback]). + +```python +import datetime +from hsfs.constructor.lookback import Lookback + +version, job = feature_view.create_training_data( + start_time=datetime.date(2026, 5, 10), + end_time=datetime.date(2026, 5, 17), + description="fraud batch training data, weekly partition pruning", + lookback=Lookback( + key="partition_key", + start=datetime.date(2026, 5, 10), + end=datetime.date(2026, 5, 17), + ), +) +``` + +Equivalent dict form: + +```python +version, job = feature_view.create_training_data( + start_time=datetime.date(2026, 5, 10), + end_time=datetime.date(2026, 5, 17), + description="fraud batch training data, weekly partition pruning", + lookback={ + "key": "partition_key", + "start": datetime.date(2026, 5, 10), + "end": datetime.date(2026, 5, 17), + }, +) +``` + +For different lookbacks per joined Feature Group, pass a `Lookbacks` — see the [per-feature-group lookback section][batch-data-lookback] of the batch-data guide for the full shape. + +The resolved window is persisted with the training dataset, so re-reading the same training dataset version reconstructs the same per-join predicate. +The same parameter is accepted by `create_train_test_split` and `create_train_validation_test_split`. + ### Train/Validation/Test Splits In most cases, ML practitioners want to slice a dataset into multiple splits, most commonly train-test splits or train-validation-test splits, so that they can train and test their models. From 3533de12d94fc04291037bd96a2393a0b1635496 Mon Sep 17 00:00:00 2001 From: manu-sj Date: Mon, 1 Jun 2026 08:41:55 +0200 Subject: [PATCH 2/2] [FSTORE-2030] Add support for specifying lookback windows for PIT queries https://hopsworks.atlassian.net/browse/FSTORE-2030 Address Copilot review findings on the new lookback section. Use the lower-case "point-in-time" spelling to match the rest of the Feature Store docs. Rename the runtime reference from "flyingduck" to "ArrowFlight Server with DuckDB" and capitalise "Spark Catalyst" per existing convention. Make all dict-form code blocks self-contained by adding `import datetime` to each fenced `python` block that constructs `datetime.date(...)` / `datetime.datetime(...)` values. The "Equivalent dict form" snippets previously read as though no import was needed; corrected the prose to say no `Lookback` import is needed (datetime is still required for the bound values). Reword the lower-bound-only short form to refer to "" (each Feature Group's own DATE partition column) instead of the ambiguous bare token `partition_col`. Signed-off-by: Manu Sathyarajan Joseph Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/user_guides/fs/feature_view/batch-data.md | 14 ++++++++++---- docs/user_guides/fs/feature_view/training-data.md | 4 +++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/docs/user_guides/fs/feature_view/batch-data.md b/docs/user_guides/fs/feature_view/batch-data.md index 6adbfe826a..609696ce2b 100644 --- a/docs/user_guides/fs/feature_view/batch-data.md +++ b/docs/user_guides/fs/feature_view/batch-data.md @@ -105,12 +105,12 @@ If a bare name is ambiguous and no prefix is supplied, `get_feature` raises a `F ## Lookback window for PIT joins {#batch-data-lookback} -Point-in-Time (PIT) joins use the condition `feature_fg.event_time <= root_fg.event_time` to pick the latest matching record from each joined Feature Group. +Point-in-time (PIT) joins use the condition `feature_fg.event_time <= root_fg.event_time` to pick the latest matching record from each joined Feature Group. That predicate is a range comparison, not an equality, so partition pruning is defeated and every historical partition of every joined Feature Group is scanned on every read. As Feature Groups grow with daily ingestion, this scan grows unboundedly. The `lookback` argument lets you cap how far back the join is allowed to consider rows from each joined Feature Group. -The backend turns the window into a constant-bound predicate on the joined Feature Group so flyingduck's directory walker and Spark's catalyst pushdown can prune partitions before opening any files. +The backend turns the window into a constant-bound predicate on the joined Feature Group so the ArrowFlight Server with DuckDB and Spark Catalyst pushdown can prune partitions before opening any files. ### Uniform lookback @@ -132,9 +132,11 @@ df = feature_view.get_batch_data( ) ``` -Equivalent dict form, no import required: +Equivalent dict form, no `Lookback` import required (you still need `datetime` for the bound values): ```python +import datetime + df = feature_view.get_batch_data( start_time=datetime.date(2026, 5, 10), end_time=datetime.date(2026, 5, 17), @@ -152,9 +154,11 @@ df = feature_view.get_batch_data( `start` is required and emits a `>=` predicate. `end` is optional and emits a `<=` predicate when present. -When `end` is omitted, only the lower bound is emitted, making the short form below valid: the root Feature Group and every joined Feature Group get `partition_col >= '2026-05-10'` and nothing else. +When `end` is omitted, only the lower bound is emitted, making the short form below valid: the root Feature Group and every joined Feature Group get ` >= '2026-05-10'` (where `` is each Feature Group's own DATE partition column) and nothing else. ```python +import datetime + df = feature_view.get_batch_data( lookback={ "key": "partition_key", @@ -213,6 +217,8 @@ When both forms are supplied for the same name, the instance entry wins at its s Equivalent dict form: ```python +import datetime + df = feature_view.get_batch_data( start_time=datetime.date(2026, 5, 11), end_time=datetime.date(2026, 5, 17), diff --git a/docs/user_guides/fs/feature_view/training-data.md b/docs/user_guides/fs/feature_view/training-data.md index 23b7ab5ca3..a3b6543ea0 100644 --- a/docs/user_guides/fs/feature_view/training-data.md +++ b/docs/user_guides/fs/feature_view/training-data.md @@ -77,9 +77,11 @@ version, job = feature_view.create_training_data( ) ``` -Equivalent dict form: +Equivalent dict form (no `Lookback` import, but `datetime` is still required for the bound values): ```python +import datetime + version, job = feature_view.create_training_data( start_time=datetime.date(2026, 5, 10), end_time=datetime.date(2026, 5, 17),