Skip to content

Flink: Pass FileIO on Flink's read path#15663

Closed
nastra wants to merge 4 commits intoapache:mainfrom
nastra:remote-planning-with-flink
Closed

Flink: Pass FileIO on Flink's read path#15663
nastra wants to merge 4 commits intoapache:mainfrom
nastra:remote-planning-with-flink

Conversation

@nastra
Copy link
Copy Markdown
Contributor

@nastra nastra commented Mar 17, 2026

When accessing/reading data files, the codebase is using the Table's FileIO instance through table.io() on Flink's read path. With remote scan planning the FileIO instance is configured with a PlanID + custom storage credentials inside RESTTableScan, but that instance is never propagated to the place(s) that actually perform the read., thus leading to errors.

This PR passes the FileIO obtained during remote/distributed scan planning next to the Table instance on Flink's read path.

This is similar to #15448, where we applied the same approach on Spark's read path

if (version >= 4) {
if (fileIO != null) {
out.writeBoolean(true);
byte[] fileIOBytes = InstantiationUtil.serializeObject(fileIO);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the size of the serialized fileIO?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit concerning for me as we basically send this with every task


if (version >= 4) {
if (fileIO != null) {
out.writeBoolean(true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just a specific length instead of an extra boolean?
Maybe -1 length for null?

case 2:
return in.readUTF();
case 3:
case 4:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to add some unit tests for the serialization

CloseableIterable.transform(tasksIterable, IcebergSourceSplit::fromCombinedScanTask));
CloseableIterable.transform(
planResult.tasks(),
task -> IcebergSourceSplit.fromCombinedScanTask(task, planResult.fileIO().get())));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have an inkling how the supplier will be implemented? Maybe calling get for every split would be an overkill?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I agree, this will definitely be an overkill and that's also why we didn't add FileIO to the ScanTaskGroup for Spark. Right now I'm just experimenting here a bit, but we definitely need to improve this here

static CloseableIterable<CombinedScanTask> planTasks(
Table table, ScanContext context, ExecutorService workerPool) {
/** Result of planning that includes the scan's FileIO for use when reading. */
private static class PlanResult implements Closeable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use scan?

@github-actions
Copy link
Copy Markdown

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions Bot added the stale label Apr 17, 2026
@github-actions
Copy link
Copy Markdown

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions Bot closed this Apr 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants