Skip to content

feat(flink): lookup join caches only specified partitions of dimension table#18229

Closed
wombatu-kun wants to merge 2 commits intoapache:masterfrom
wombatu-kun:lookup_join_fixed_pt
Closed

feat(flink): lookup join caches only specified partitions of dimension table#18229
wombatu-kun wants to merge 2 commits intoapache:masterfrom
wombatu-kun:lookup_join_fixed_pt

Conversation

@wombatu-kun
Copy link
Copy Markdown
Contributor

Describe the issue this Pull Request addresses

When using Hudi as a lookup join dimension table in Flink, the entire table is loaded into the cache on each reload. For large dimension tables that are partitioned by a slowly-changing attribute (e.g., date, region), this means reading and caching data from all partitions — including historical ones that are irrelevant to the running job. This can cause excessive memory pressure and slow cache warm-up times.
This PR introduces a new option lookup.partitions that allows users to declare which partitions should be loaded into the lookup join cache, so that only the relevant slice of the dimension table is materialized in memory.

Summary and Changelog

Users can now limit the lookup join cache to a subset of partitions by specifying the lookup.partitions option via a query hint. This is analogous to the existing lookup.async option that controls the lookup execution mode.

Partitions are expressed as comma-separated key=value pairs (one partition per entry), with multiple partitions separated by ;:

hudi-flink-datasource/hudi-flink — FlinkOptions.java
Added LOOKUP_PARTITIONS (lookup.partitions) config option of type String with no default value.

hudi-flink-datasource/hudi-flink — HoodieTableSource.java
Modified getLookupRuntimeProvider() to build an optional partition pruner from lookup.partitions and pass it to the HoodieLookupTableReader supplier when the option is set.
Added buildLookupPartitionPruner(): reads lookup.partitions, delegates path parsing to PartitionPathParser, and wraps the result in a PartitionPruners.StaticPartitionPruner via the existing PartitionPruners.Builder API.
Added getBatchInputFormatWithPruner(PartitionPruners.PartitionPruner): temporarily substitutes this.partitionPruner with the lookup-specific pruner before calling getBatchInputFormat(), then restores the original value. This allows the lookup path to reuse all existing batch input format logic (COW, MOR, read-optimized) without duplication.

hudi-common — PartitionPathParser.java
Added parseLookupPartitionPaths(String spec, List partitionKeys, boolean hiveStyle): parses the lookup.partitions spec into a list of Hudi partition paths.
Validates that every key in the spec belongs to the table's declared partition key set; throws IllegalArgumentException with an informative message (including the list of valid keys) on unknown keys.
Validates that every token follows key=value format; throws IllegalArgumentException on bare values without a key.
Constructs paths in partitionKeys order regardless of the order keys appear in the spec, ensuring correct path generation for both Hive-style (key=value/key=value) and plain-value (value/value) layouts.

Impact

When lookup.partitions is not configured — the feature is fully opt-in and backward-compatible. When configured, only the specified partitions are read during cache population; rows whose lookup keys resolve to an excluded partition will not match any cache entry and will produce null values on the dimension side of a LEFT lookup join.
Cache warm-up time and memory footprint are reduced proportionally to the fraction of partitions excluded.

Risk Level

none

Documentation Update

Documentation update is needed: to add new FlinkOption lookup.partitions.

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions Bot added the size:M PR with lines of changes in (100, 300] label Feb 20, 2026
try {
return getBatchInputFormat();
} finally {
this.partitionPruner = saved;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is error prone, just pass around the PartitionPruner as a param into getBatchInputFormat ?

Copy link
Copy Markdown
Contributor Author

@wombatu-kun wombatu-kun Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 Ok, i've refactored this code (in separate commit). Agree with you about error-prone, but it was not enough to just pass around PartitionPruner into getBatchInputFormat, because it is used deeper (in FileIndexReader). I think, now it becomes not less error-prone, but more complicated.
Review it please, if it's ok for you, approve it, then I'll actualize PR description and merge.
If during review you decide that the first version was better - let me know, i'll drop the second commit and re-request review.

Copy link
Copy Markdown
Contributor

@danny0405 danny0405 Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, the original code looks simpler, we can revert to the old impl as long as we can ensure the file index is not reused for other scan sources. maybe we just nullify the file index manually here to make it safe.

And if we can use SQL filter push down to specifiy the partitions of dim table, there is no need to make any changes to the HoodieTableSource.

@github-actions github-actions Bot added size:L PR with lines of changes in (300, 1000] and removed size:M PR with lines of changes in (100, 300] labels Feb 23, 2026
* @throws IllegalArgumentException if any key in the spec is not a valid partition key,
* or if a key-value pair does not follow {@code key=value} format
*/
public static List<String> parseLookupPartitionPaths(String spec, List<String> partitionKeys, boolean hiveStyle) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lookup.partitions currently allows partial key sets for multi-key partitioned tables (e.g. only year=2024 when partition keys are year,month).
For lookup pruning, this is risky because StaticPartitionPruner does exact partition-path matching. A partial path can silently fail to match real partition directories (e.g. year=2024/month=01), resulting in empty lookup cache for expected rows.
Could we enforce that each partition spec includes all declared partition keys (fail fast with IllegalArgumentException if any key is missing)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i'll do it.

@wombatu-kun wombatu-kun requested a review from cshuo February 24, 2026 04:24
+ "The directory is cleaned up when the lookup function is closed.");

public static final ConfigOption<String> LOOKUP_PARTITIONS =
key("lookup.partitions")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like dim table already supports filer push down through SQL: https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs, we can utilize the SQL filters instead of the explicit options.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, i don't understand your point. Could you please explain your idea?
Why using SQL filters are better than pruning partitions directly?
Or you mean, that users prefer to write SQL with "... where dim.partitionField='par1' or dim.partitionField='par2'"? then this whole PR is not needed at all.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"... where dim.partitionField='par1' or dim.partitionField='par2'"? then this whole PR is not needed at all.

yeah, if the SQL already works, it's better we guide the user to use SQL for dim table partition pruning.

Copy link
Copy Markdown
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocked to switch to SQL filter pushdown.

@github-actions github-actions Bot added size:M PR with lines of changes in (100, 300] and removed size:L PR with lines of changes in (300, 1000] labels Feb 25, 2026
@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 0% with 27 lines in your changes missing coverage. Please review.
✅ Project coverage is 57.28%. Comparing base (8c5f832) to head (cbe77a8).
⚠️ Report is 62 commits behind head on master.

Files with missing lines Patch % Lines
.../apache/hudi/common/table/PartitionPathParser.java 0.00% 27 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff            @@
##             master   #18229   +/-   ##
=========================================
  Coverage     57.28%   57.28%           
- Complexity    18532    18555   +23     
=========================================
  Files          1944     1945    +1     
  Lines        106132   106226   +94     
  Branches      13118    13135   +17     
=========================================
+ Hits          60797    60854   +57     
- Misses        39612    39648   +36     
- Partials       5723     5724    +1     
Flag Coverage Δ
hadoop-mr-java-client 45.40% <0.00%> (-0.01%) ⬇️
spark-java-tests 47.41% <0.00%> (-0.02%) ⬇️
spark-scala-tests 45.51% <0.00%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
.../apache/hudi/common/table/PartitionPathParser.java 18.94% <0.00%> (-7.53%) ⬇️

... and 16 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:M PR with lines of changes in (100, 300]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants