Flink: Pass FileIO on Flink's read path#15663
Conversation
| if (version >= 4) { | ||
| if (fileIO != null) { | ||
| out.writeBoolean(true); | ||
| byte[] fileIOBytes = InstantiationUtil.serializeObject(fileIO); |
There was a problem hiding this comment.
What is the size of the serialized fileIO?
There was a problem hiding this comment.
This is a bit concerning for me as we basically send this with every task
|
|
||
| if (version >= 4) { | ||
| if (fileIO != null) { | ||
| out.writeBoolean(true); |
There was a problem hiding this comment.
Why not just a specific length instead of an extra boolean?
Maybe -1 length for null?
| case 2: | ||
| return in.readUTF(); | ||
| case 3: | ||
| case 4: |
There was a problem hiding this comment.
We will need to add some unit tests for the serialization
| CloseableIterable.transform(tasksIterable, IcebergSourceSplit::fromCombinedScanTask)); | ||
| CloseableIterable.transform( | ||
| planResult.tasks(), | ||
| task -> IcebergSourceSplit.fromCombinedScanTask(task, planResult.fileIO().get()))); |
There was a problem hiding this comment.
Do we have an inkling how the supplier will be implemented? Maybe calling get for every split would be an overkill?
There was a problem hiding this comment.
yes I agree, this will definitely be an overkill and that's also why we didn't add FileIO to the ScanTaskGroup for Spark. Right now I'm just experimenting here a bit, but we definitely need to improve this here
| static CloseableIterable<CombinedScanTask> planTasks( | ||
| Table table, ScanContext context, ExecutorService workerPool) { | ||
| /** Result of planning that includes the scan's FileIO for use when reading. */ | ||
| private static class PlanResult implements Closeable { |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
When accessing/reading data files, the codebase is using the Table's
FileIOinstance throughtable.io()on Flink's read path. With remote scan planning theFileIOinstance is configured with a PlanID + custom storage credentials insideRESTTableScan, but that instance is never propagated to the place(s) that actually perform the read., thus leading to errors.This PR passes the
FileIOobtained during remote/distributed scan planning next to theTableinstance on Flink's read path.This is similar to #15448, where we applied the same approach on Spark's read path