From bd6fb72c1e1ce351305e05fe74e1e0fac9e35f63 Mon Sep 17 00:00:00 2001 From: "Mingyu Chen (Rayner)" Date: Wed, 25 Mar 2026 18:56:30 -0700 Subject: [PATCH] [fix](fe) Fix INSERT INTO local TVF ignoring backend_id during scheduling (#61732) ### What problem does this PR solve? Followup #60719 Problem Summary: When using `INSERT INTO local("backend_id" = "X" ...)`, the data should only be written to the BE node specified by `backend_id`. However, the Coordinator schedules the sink fragment to an arbitrary backend because the fragment uses `UNPARTITIONED` partition, which causes `SimpleScheduler.getHost()` to pick any available BE. This results in file creation failures when the target directory only exists on the intended BE. **Root Cause:** - The read path (`SELECT FROM local(...)`) correctly handles this via `TVFScanNode.initBackendPolicy()`, restricting the scan to the specified backend. - The write path (`INSERT INTO local(...)`) had no equivalent logic. `PhysicalPlanTranslator.visitPhysicalTVFTableSink()` creates the fragment as `UNPARTITIONED`, and `Coordinator.computeFragmentHosts()` assigns it to a random BE. **Fix:** Added backend_id-aware scheduling in `Coordinator.computeFragmentHosts()` for local `TVFTableSink`, forcing the sink fragment to execute on the designated backend. This is consistent with the existing `DictionarySink` pattern that also overrides fragment scheduling for specific sink types. **Changes:** 1. `TVFTableSink.java` - Added `getTvfName()` and `getBackendId()` accessor methods 2. `Coordinator.java` - Added check before UNPARTITIONED scheduling: if the sink is a local TVFTableSink with a specific backend_id, force the fragment onto that backend --- .../apache/doris/planner/TVFTableSink.java | 16 ++++++++++++++++ .../java/org/apache/doris/qe/Coordinator.java | 19 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java index c511336767c3df..b8c184ca401819 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java @@ -60,6 +60,22 @@ public TVFTableSink(PlanNodeId exchNodeId, String tvfName, Map p this.cols = cols; } + public String getTvfName() { + return tvfName; + } + + /** + * Returns the backend_id specified in properties, or -1 if not set. + * For local TVF, this indicates the specific BE node where data should be written. + */ + public long getBackendId() { + String backendIdStr = properties.get("backend_id"); + if (backendIdStr != null) { + return Long.parseLong(backendIdStr); + } + return -1; + } + public void bindDataSink() throws AnalysisException { TTVFTableSink tSink = new TTVFTableSink(); tSink.setTvfName(tvfName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index fb6ca658c30b8d..e56c388cdd7aef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -72,6 +72,7 @@ import org.apache.doris.planner.SchemaScanNode; import org.apache.doris.planner.SetOperationNode; import org.apache.doris.planner.SortNode; +import org.apache.doris.planner.TVFTableSink; import org.apache.doris.planner.UnionNode; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PExecPlanFragmentResult; @@ -1767,6 +1768,24 @@ protected void computeFragmentHosts() throws Exception { // TODO: rethink the whole function logic. could All BE sink naturally merged into other judgements? return; } + // For local TVF sink with a specific backend_id, we must execute the sink fragment + // on the designated backend. Otherwise, data would be written to the wrong node's local disk. + if (fragment.getSink() instanceof TVFTableSink) { + TVFTableSink tvfSink = (TVFTableSink) fragment.getSink(); + if ("local".equals(tvfSink.getTvfName()) && tvfSink.getBackendId() != -1) { + Backend targetBackend = Env.getCurrentSystemInfo().getBackend(tvfSink.getBackendId()); + if (targetBackend == null || !targetBackend.isAlive()) { + throw new UserException("Backend " + tvfSink.getBackendId() + + " is not available for local TVF sink"); + } + TNetworkAddress execHostport = new TNetworkAddress( + targetBackend.getHost(), targetBackend.getBePort()); + this.addressToBackendID.put(execHostport, targetBackend.getId()); + FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, params); + params.instanceExecParams.add(instanceParam); + continue; + } + } if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) { Reference backendIdRef = new Reference();