diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java index c511336767c3df..b8c184ca401819 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java @@ -60,6 +60,22 @@ public TVFTableSink(PlanNodeId exchNodeId, String tvfName, Map p this.cols = cols; } + public String getTvfName() { + return tvfName; + } + + /** + * Returns the backend_id specified in properties, or -1 if not set. + * For local TVF, this indicates the specific BE node where data should be written. + */ + public long getBackendId() { + String backendIdStr = properties.get("backend_id"); + if (backendIdStr != null) { + return Long.parseLong(backendIdStr); + } + return -1; + } + public void bindDataSink() throws AnalysisException { TTVFTableSink tSink = new TTVFTableSink(); tSink.setTvfName(tvfName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index fb6ca658c30b8d..e56c388cdd7aef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -72,6 +72,7 @@ import org.apache.doris.planner.SchemaScanNode; import org.apache.doris.planner.SetOperationNode; import org.apache.doris.planner.SortNode; +import org.apache.doris.planner.TVFTableSink; import org.apache.doris.planner.UnionNode; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PExecPlanFragmentResult; @@ -1767,6 +1768,24 @@ protected void computeFragmentHosts() throws Exception { // TODO: rethink the whole function logic. could All BE sink naturally merged into other judgements? return; } + // For local TVF sink with a specific backend_id, we must execute the sink fragment + // on the designated backend. Otherwise, data would be written to the wrong node's local disk. + if (fragment.getSink() instanceof TVFTableSink) { + TVFTableSink tvfSink = (TVFTableSink) fragment.getSink(); + if ("local".equals(tvfSink.getTvfName()) && tvfSink.getBackendId() != -1) { + Backend targetBackend = Env.getCurrentSystemInfo().getBackend(tvfSink.getBackendId()); + if (targetBackend == null || !targetBackend.isAlive()) { + throw new UserException("Backend " + tvfSink.getBackendId() + + " is not available for local TVF sink"); + } + TNetworkAddress execHostport = new TNetworkAddress( + targetBackend.getHost(), targetBackend.getBePort()); + this.addressToBackendID.put(execHostport, targetBackend.getId()); + FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, params); + params.instanceExecParams.add(instanceParam); + continue; + } + } if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) { Reference backendIdRef = new Reference();