feat(flink): lookup join caches only specified partitions of dimension table#18229
feat(flink): lookup join caches only specified partitions of dimension table#18229wombatu-kun wants to merge 2 commits intoapache:masterfrom
Conversation
8d6d988 to
0297914
Compare
| try { | ||
| return getBatchInputFormat(); | ||
| } finally { | ||
| this.partitionPruner = saved; |
There was a problem hiding this comment.
this is error prone, just pass around the PartitionPruner as a param into getBatchInputFormat ?
There was a problem hiding this comment.
@danny0405 Ok, i've refactored this code (in separate commit). Agree with you about error-prone, but it was not enough to just pass around PartitionPruner into getBatchInputFormat, because it is used deeper (in FileIndexReader). I think, now it becomes not less error-prone, but more complicated.
Review it please, if it's ok for you, approve it, then I'll actualize PR description and merge.
If during review you decide that the first version was better - let me know, i'll drop the second commit and re-request review.
There was a problem hiding this comment.
you are right, the original code looks simpler, we can revert to the old impl as long as we can ensure the file index is not reused for other scan sources. maybe we just nullify the file index manually here to make it safe.
And if we can use SQL filter push down to specifiy the partitions of dim table, there is no need to make any changes to the HoodieTableSource.
0297914 to
1f1bb42
Compare
1f1bb42 to
5b7100c
Compare
| * @throws IllegalArgumentException if any key in the spec is not a valid partition key, | ||
| * or if a key-value pair does not follow {@code key=value} format | ||
| */ | ||
| public static List<String> parseLookupPartitionPaths(String spec, List<String> partitionKeys, boolean hiveStyle) { |
There was a problem hiding this comment.
lookup.partitions currently allows partial key sets for multi-key partitioned tables (e.g. only year=2024 when partition keys are year,month).
For lookup pruning, this is risky because StaticPartitionPruner does exact partition-path matching. A partial path can silently fail to match real partition directories (e.g. year=2024/month=01), resulting in empty lookup cache for expected rows.
Could we enforce that each partition spec includes all declared partition keys (fail fast with IllegalArgumentException if any key is missing)?
There was a problem hiding this comment.
ok, i'll do it.
0dc55ee to
f1ea3e2
Compare
| + "The directory is cleaned up when the lookup function is closed."); | ||
|
|
||
| public static final ConfigOption<String> LOOKUP_PARTITIONS = | ||
| key("lookup.partitions") |
There was a problem hiding this comment.
it looks like dim table already supports filer push down through SQL: https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs, we can utilize the SQL filters instead of the explicit options.
There was a problem hiding this comment.
Sorry, i don't understand your point. Could you please explain your idea?
Why using SQL filters are better than pruning partitions directly?
Or you mean, that users prefer to write SQL with "... where dim.partitionField='par1' or dim.partitionField='par2'"? then this whole PR is not needed at all.
There was a problem hiding this comment.
"... where dim.partitionField='par1' or dim.partitionField='par2'"? then this whole PR is not needed at all.
yeah, if the SQL already works, it's better we guide the user to use SQL for dim table partition pruning.
danny0405
left a comment
There was a problem hiding this comment.
blocked to switch to SQL filter pushdown.
f1ea3e2 to
cbe77a8
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18229 +/- ##
=========================================
Coverage 57.28% 57.28%
- Complexity 18532 18555 +23
=========================================
Files 1944 1945 +1
Lines 106132 106226 +94
Branches 13118 13135 +17
=========================================
+ Hits 60797 60854 +57
- Misses 39612 39648 +36
- Partials 5723 5724 +1
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
When using Hudi as a lookup join dimension table in Flink, the entire table is loaded into the cache on each reload. For large dimension tables that are partitioned by a slowly-changing attribute (e.g., date, region), this means reading and caching data from all partitions — including historical ones that are irrelevant to the running job. This can cause excessive memory pressure and slow cache warm-up times.
This PR introduces a new option lookup.partitions that allows users to declare which partitions should be loaded into the lookup join cache, so that only the relevant slice of the dimension table is materialized in memory.
Summary and Changelog
Users can now limit the lookup join cache to a subset of partitions by specifying the
lookup.partitionsoption via a query hint. This is analogous to the existing lookup.async option that controls the lookup execution mode.Partitions are expressed as comma-separated
key=value pairs(one partition per entry), with multiple partitions separated by ;:hudi-flink-datasource/hudi-flink — FlinkOptions.java
Added LOOKUP_PARTITIONS (lookup.partitions) config option of type String with no default value.
hudi-flink-datasource/hudi-flink — HoodieTableSource.java
Modified getLookupRuntimeProvider() to build an optional partition pruner from lookup.partitions and pass it to the HoodieLookupTableReader supplier when the option is set.
Added buildLookupPartitionPruner(): reads lookup.partitions, delegates path parsing to PartitionPathParser, and wraps the result in a PartitionPruners.StaticPartitionPruner via the existing PartitionPruners.Builder API.
Added getBatchInputFormatWithPruner(PartitionPruners.PartitionPruner): temporarily substitutes this.partitionPruner with the lookup-specific pruner before calling getBatchInputFormat(), then restores the original value. This allows the lookup path to reuse all existing batch input format logic (COW, MOR, read-optimized) without duplication.
hudi-common — PartitionPathParser.java
Added parseLookupPartitionPaths(String spec, List partitionKeys, boolean hiveStyle): parses the lookup.partitions spec into a list of Hudi partition paths.
Validates that every key in the spec belongs to the table's declared partition key set; throws IllegalArgumentException with an informative message (including the list of valid keys) on unknown keys.
Validates that every token follows key=value format; throws IllegalArgumentException on bare values without a key.
Constructs paths in partitionKeys order regardless of the order keys appear in the spec, ensuring correct path generation for both Hive-style (key=value/key=value) and plain-value (value/value) layouts.
Impact
When lookup.partitions is not configured — the feature is fully opt-in and backward-compatible. When configured, only the specified partitions are read during cache population; rows whose lookup keys resolve to an excluded partition will not match any cache entry and will produce null values on the dimension side of a LEFT lookup join.
Cache warm-up time and memory footprint are reduced proportionally to the fraction of partitions excluded.
Risk Level
none
Documentation Update
Documentation update is needed: to add new FlinkOption
lookup.partitions.Contributor's checklist