From 202acc62762eedd0319bb9e4249728d675a408df Mon Sep 17 00:00:00 2001 From: jkukreja Date: Wed, 27 May 2026 14:46:46 -0400 Subject: [PATCH] [flink] Implement FLIP-314 LineageVertexProvider for non-table APIs --- .../paimon/flink/lineage/LineageUtils.java | 6 +- .../flink/lineage/PaimonLineageDataset.java | 51 +++++++++++ .../sink/FlinkFormatTableDataStreamSink.java | 10 ++- .../apache/paimon/flink/sink/FlinkSink.java | 3 +- .../flink/sink/PaimonDiscardingSink.java | 46 ++++++++++ .../flink/source/FlinkSourceBuilder.java | 5 +- .../flink/source/PaimonDataStreamSource.java | 90 +++++++++++++++++++ .../flink/source/operator/MonitorSource.java | 44 +++++++-- .../flink/lineage/LineageUtilsTest.java | 67 ++++++++++++-- .../FlinkFormatTableDataStreamSinkTest.java | 73 +++++++++++++++ .../sink/FlinkSinkBuilderLineageTest.java | 84 +++++++++++++++++ .../flink/source/FlinkSourceBuilderTest.java | 68 ++++++++++++++ 12 files changed, 528 insertions(+), 19 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PaimonDiscardingSink.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/PaimonDataStreamSource.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSinkTest.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkBuilderLineageTest.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java index 110365c76ee4..3d41de6f9d55 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java @@ -78,7 +78,8 @@ public static String getNamespace(Table table) { public static SourceLineageVertex sourceLineageVertex( String name, boolean isBounded, Table table) { LineageDataset dataset = - new PaimonLineageDataset(name, getNamespace(table), buildConfigMap(table)); + new PaimonLineageDataset( + name, getNamespace(table), buildConfigMap(table), table.rowType()); Boundedness boundedness = isBounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED; return new PaimonSourceLineageVertex(boundedness, Collections.singletonList(dataset)); @@ -92,7 +93,8 @@ public static SourceLineageVertex sourceLineageVertex( */ public static LineageVertex sinkLineageVertex(String name, Table table) { LineageDataset dataset = - new PaimonLineageDataset(name, getNamespace(table), buildConfigMap(table)); + new PaimonLineageDataset( + name, getNamespace(table), buildConfigMap(table), table.rowType()); return new PaimonSinkLineageVertex(Collections.singletonList(dataset)); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java index 5e99df0b2d4d..c122d14cc83e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java @@ -18,11 +18,19 @@ package org.apache.paimon.flink.lineage; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; + import org.apache.flink.streaming.api.lineage.DatasetConfigFacet; +import org.apache.flink.streaming.api.lineage.DatasetSchemaFacet; +import org.apache.flink.streaming.api.lineage.DatasetSchemaField; import org.apache.flink.streaming.api.lineage.LineageDataset; import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import javax.annotation.Nullable; + import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; /** @@ -34,11 +42,21 @@ public class PaimonLineageDataset implements LineageDataset { private final String name; private final String namespace; private final Map tableOptions; + @Nullable private final RowType rowType; public PaimonLineageDataset(String name, String namespace, Map tableOptions) { + this(name, namespace, tableOptions, null); + } + + public PaimonLineageDataset( + String name, + String namespace, + Map tableOptions, + @Nullable RowType rowType) { this.name = name; this.namespace = namespace; this.tableOptions = tableOptions; + this.rowType = rowType; } @Override @@ -67,6 +85,39 @@ public Map config() { return tableOptions; } }); + if (rowType != null) { + facets.put( + "schema", + new DatasetSchemaFacet() { + @Override + public String name() { + return "schema"; + } + + @Override + public Map> fields() { + Map> result = new LinkedHashMap<>(); + for (DataField field : rowType.getFields()) { + String fieldName = field.name(); + String fieldType = field.type().asSQLString(); + result.put( + fieldName, + new DatasetSchemaField() { + @Override + public String name() { + return fieldName; + } + + @Override + public String type() { + return fieldType; + } + }); + } + return result; + } + }); + } return facets; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSink.java index 3d133a9ceace..7305076cba57 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSink.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.FlinkRowWrapper; +import org.apache.paimon.flink.lineage.LineageUtils; import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.format.FormatTableWrite; import org.apache.paimon.table.sink.BatchTableCommit; @@ -32,6 +33,8 @@ import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.table.data.RowData; import java.util.List; @@ -55,7 +58,7 @@ public DataStreamSink sinkFrom(DataStream dataStream) { return dataStream.sinkTo(new FormatTableSink(table, overwrite, staticPartitions)); } - private static class FormatTableSink implements Sink { + private static class FormatTableSink implements Sink, LineageVertexProvider { private final FormatTable table; private final boolean overwrite; @@ -68,6 +71,11 @@ public FormatTableSink( this.staticPartitions = staticPartitions; } + @Override + public LineageVertex getLineageVertex() { + return LineageUtils.sinkLineageVertex(table.fullName(), table); + } + /** * Do not annotate with @override here to maintain compatibility with Flink * 2.0+. diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 959132ad58e0..9948863c547b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -40,7 +40,6 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import javax.annotation.Nullable; @@ -240,7 +239,7 @@ public DataStreamSink doCommit(DataStream written, String commit } configureSlotSharingGroup( committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY)); - return committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1); + return committed.sinkTo(new PaimonDiscardingSink<>(table)).name("end").setParallelism(1); } public static void configureSlotSharingGroup( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PaimonDiscardingSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PaimonDiscardingSink.java new file mode 100644 index 000000000000..c5949dfa07ac --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PaimonDiscardingSink.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.flink.lineage.LineageUtils; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; + +/** + * A {@link DiscardingSink} that implements {@link LineageVertexProvider} so Flink's lineage graph + * discovers the Paimon sink table when using the DataStream API. + */ +public class PaimonDiscardingSink extends DiscardingSink implements LineageVertexProvider { + + private static final long serialVersionUID = 1L; + + private final FileStoreTable table; + + public PaimonDiscardingSink(FileStoreTable table) { + this.table = table; + } + + @Override + public LineageVertex getLineageVertex() { + return LineageUtils.sinkLineageVertex(table.fullName(), table); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index 3e96dec1ea50..37346573a262 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -240,7 +240,7 @@ private DataStream buildAlignedContinuousFileSource() { private DataStream toDataStream(Source source) { DataStreamSource dataStream = env.fromSource( - source, + new PaimonDataStreamSource<>(source, table), watermarkStrategy == null ? WatermarkStrategy.noWatermarks() : watermarkStrategy, @@ -354,7 +354,8 @@ private DataStream buildDedicatedSplitGenSource(boolean isBounded) { unordered, outerProject(), isBounded, - limit); + limit, + table); if (parallelism != null) { dataStream.getTransformation().setParallelism(parallelism); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/PaimonDataStreamSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/PaimonDataStreamSource.java new file mode 100644 index 000000000000..95999ab39e3f --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/PaimonDataStreamSource.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.paimon.flink.lineage.LineageUtils; +import org.apache.paimon.table.Table; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; + +/** + * A {@link Source} wrapper that preserves the wrapped source behavior and exposes Paimon lineage + * for sources built through {@link FlinkSourceBuilder}. + */ +public class PaimonDataStreamSource + implements Source, LineageVertexProvider { + + private static final long serialVersionUID = 1L; + + private final Source source; + private final Table table; + + public PaimonDataStreamSource(Source source, Table table) { + this.source = source; + this.table = table; + } + + @Override + public Boundedness getBoundedness() { + return source.getBoundedness(); + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) + throws Exception { + return source.createReader(readerContext); + } + + @Override + public SplitEnumerator createEnumerator( + SplitEnumeratorContext enumContext) throws Exception { + return source.createEnumerator(enumContext); + } + + @Override + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext enumContext, CheckpointT checkpoint) throws Exception { + return source.restoreEnumerator(enumContext, checkpoint); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return source.getSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + return source.getEnumeratorCheckpointSerializer(); + } + + @Override + public LineageVertex getLineageVertex() { + return LineageUtils.sourceLineageVertex( + table.fullName(), getBoundedness() == Boundedness.BOUNDED, table); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java index d9b7d054cfcb..b6de64472b5b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java @@ -21,9 +21,12 @@ import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.AbstractNonCoordinatedSource; import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; +import org.apache.paimon.flink.source.NoOpEnumState; +import org.apache.paimon.flink.source.PaimonDataStreamSource; import org.apache.paimon.flink.source.SimpleSourceSplit; import org.apache.paimon.flink.source.SplitListState; import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.ChannelComputer; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.EndOfScanException; @@ -37,6 +40,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.java.tuple.Tuple2; @@ -242,13 +246,43 @@ public static DataStream buildSource( NestedProjectedRowData nestedProjectedRowData, boolean isBounded, @Nullable Long limit) { + return buildSource( + env, + name, + typeInfo, + readBuilder, + monitorInterval, + emitSnapshotWatermark, + shuffleBucketWithPartition, + unordered, + nestedProjectedRowData, + isBounded, + limit, + null); + } + + public static DataStream buildSource( + StreamExecutionEnvironment env, + String name, + TypeInformation typeInfo, + ReadBuilder readBuilder, + long monitorInterval, + boolean emitSnapshotWatermark, + boolean shuffleBucketWithPartition, + boolean unordered, + NestedProjectedRowData nestedProjectedRowData, + boolean isBounded, + @Nullable Long limit, + @Nullable Table table) { + MonitorSource monitorSource = + new MonitorSource(readBuilder, monitorInterval, emitSnapshotWatermark, isBounded); + Source source = monitorSource; + if (table != null) { + source = new PaimonDataStreamSource<>(monitorSource, table); + } SingleOutputStreamOperator operator = env.fromSource( - new MonitorSource( - readBuilder, - monitorInterval, - emitSnapshotWatermark, - isBounded), + source, WatermarkStrategy.noWatermarks(), name + "-Monitor", new JavaTypeInfo<>(Split.class)) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java index 62d601ec1b23..cea640ab8f10 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java @@ -20,7 +20,10 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.PaimonDataStreamScanProvider; -import org.apache.paimon.flink.PaimonDataStreamSinkProvider; +import org.apache.paimon.flink.sink.PaimonDiscardingSink; +import org.apache.paimon.flink.source.ContinuousFileStoreSource; +import org.apache.paimon.flink.source.PaimonDataStreamSource; +import org.apache.paimon.flink.source.operator.MonitorSource; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.schema.Schema; @@ -33,6 +36,7 @@ import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.streaming.api.lineage.DatasetConfigFacet; +import org.apache.flink.streaming.api.lineage.DatasetSchemaFacet; import org.apache.flink.streaming.api.lineage.LineageDataset; import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; import org.apache.flink.streaming.api.lineage.LineageVertex; @@ -177,6 +181,24 @@ void testConfigFacetWithEmptyKeys() throws Exception { assertThat(config).containsEntry("primary-keys", ""); } + @Test + void testSchemaFacetContainsPaimonFields() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + LineageVertex vertex = LineageUtils.sinkLineageVertex("paimon.db.t", table); + LineageDataset dataset = vertex.datasets().get(0); + + Map facets = dataset.facets(); + assertThat(facets).containsKey("schema"); + + DatasetSchemaFacet schemaFacet = (DatasetSchemaFacet) facets.get("schema"); + assertThat(schemaFacet.fields()).containsOnlyKeys("f0", "f1", "f2"); + assertThat(schemaFacet.fields().get("f0").type()).isEqualTo("INT NOT NULL"); + assertThat(schemaFacet.fields().get("f1").type()).isEqualTo("VARCHAR(100)"); + assertThat(schemaFacet.fields().get("f2").type()).isEqualTo("INT"); + } + @Test void testScanProviderImplementsLineageVertexProvider() throws Exception { FileStoreTable table = @@ -193,16 +215,47 @@ void testScanProviderImplementsLineageVertexProvider() throws Exception { } @Test - void testSinkProviderImplementsLineageVertexProvider() throws Exception { + void testSinkLineageViaPaimonDiscardingSink() throws Exception { FileStoreTable table = createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); - PaimonDataStreamSinkProvider provider = - new PaimonDataStreamSinkProvider(dataStream -> null, "paimon.db.sink", table); + PaimonDiscardingSink sink = new PaimonDiscardingSink<>(table); - assertThat(provider).isInstanceOf(LineageVertexProvider.class); - LineageVertex vertex = provider.getLineageVertex(); + assertThat(sink).isInstanceOf(LineageVertexProvider.class); + LineageVertex vertex = sink.getLineageVertex(); + assertThat(vertex.datasets()).hasSize(1); + } + + @Test + void testPaimonDataStreamSourceWrapsMonitorSourceLineageVertex() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + PaimonDataStreamSource source = + new PaimonDataStreamSource<>( + new MonitorSource(table.newReadBuilder(), 10, false, true), table); + + assertThat(source).isInstanceOf(LineageVertexProvider.class); + SourceLineageVertex vertex = (SourceLineageVertex) source.getLineageVertex(); + assertThat(vertex.boundedness()).isEqualTo(Boundedness.BOUNDED); + assertThat(vertex.datasets()).hasSize(1); + assertThat(vertex.datasets().get(0).name()).isEqualTo(table.fullName()); + } + + @Test + void testPaimonDataStreamSourceWrapsFlinkSourceLineageVertex() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + PaimonDataStreamSource source = + new PaimonDataStreamSource<>( + new ContinuousFileStoreSource( + table.newReadBuilder(), table.options(), null), + table); + + SourceLineageVertex vertex = (SourceLineageVertex) source.getLineageVertex(); + assertThat(vertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED); assertThat(vertex.datasets()).hasSize(1); - assertThat(vertex.datasets().get(0).name()).isEqualTo("paimon.db.sink"); + assertThat(vertex.datasets().get(0).name()).isEqualTo(table.fullName()); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSinkTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSinkTest.java new file mode 100644 index 000000000000..b97792e82425 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSinkTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FormatTable; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; + +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.lang.reflect.Constructor; +import java.util.Collections; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link FlinkFormatTableDataStreamSink}. */ +class FlinkFormatTableDataStreamSinkTest { + + @TempDir java.nio.file.Path temp; + + @Test + void testFormatTableSinkLineageVertex() throws Exception { + FormatTable table = + FormatTable.builder() + .fileIO(LocalFileIO.create()) + .identifier(Identifier.create("test_db", "test_table")) + .rowType(RowType.of(new IntType())) + .partitionKeys(Collections.emptyList()) + .location(new Path(temp.toUri().toString()).toString()) + .format(FormatTable.Format.PARQUET) + .options(Collections.singletonMap("path", temp.toUri().toString())) + .catalogContext(CatalogContext.create(new Options())) + .build(); + + Class sinkClass = + Class.forName( + "org.apache.paimon.flink.sink.FlinkFormatTableDataStreamSink$FormatTableSink"); + Constructor constructor = + sinkClass.getDeclaredConstructor(FormatTable.class, boolean.class, Map.class); + constructor.setAccessible(true); + Object sink = constructor.newInstance(table, false, Collections.emptyMap()); + + assertThat(sink).isInstanceOf(LineageVertexProvider.class); + LineageVertex vertex = ((LineageVertexProvider) sink).getLineageVertex(); + assertThat(vertex.datasets()).hasSize(1); + assertThat(vertex.datasets().get(0).name()).isEqualTo(table.fullName()); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkBuilderLineageTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkBuilderLineageTest.java new file mode 100644 index 000000000000..a7ba7f687985 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkBuilderLineageTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for sink lineage in {@link FlinkSinkBuilder}. */ +class FlinkSinkBuilderLineageTest { + + @TempDir java.nio.file.Path temp; + + @Test + void testFlinkSinkBuilderUsesPaimonDiscardingSinkForLineage() throws Exception { + FileStoreTable table = createTable(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream input = + env.fromCollection( + Collections.singletonList((RowData) GenericRowData.of(1)), + InternalTypeInfo.of(toLogicalType(table.rowType()))); + + DataStreamSink sink = new FlinkSinkBuilder(table).forRowData(input).build(); + + assertThat(sink.getTransformation()).isInstanceOf(SinkTransformation.class); + SinkTransformation transformation = + (SinkTransformation) sink.getTransformation(); + assertThat(transformation.getSink()).isInstanceOf(PaimonDiscardingSink.class); + } + + private FileStoreTable createTable() throws Exception { + Path tablePath = new Path(temp.toUri().toString()); + Map options = new HashMap<>(); + options.put(CoreOptions.BUCKET.key(), "-1"); + new SchemaManager(LocalFileIO.create(), tablePath) + .createTable( + new Schema( + RowType.of(new IntType()).getFields(), + Collections.emptyList(), + Collections.emptyList(), + options, + "")); + return FileStoreTableFactory.create(LocalFileIO.create(), tablePath); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java index bc2ccb0fed13..9f6c46c2a793 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java @@ -22,16 +22,25 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.source.operator.MonitorSource; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.Table; import org.apache.paimon.types.DataTypes; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.nio.file.Path; +import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -114,4 +123,63 @@ public void testUnawareBucket() throws Exception { builder = new FlinkSourceBuilder(table); assertTrue(builder.isUnordered()); } + + @Test + public void testBuildWrapsStaticSourceWithPaimonDataStreamSource() throws Exception { + Table table = createTable("static_source", false, -1, true); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream dataStream = + new FlinkSourceBuilder(table).env(env).sourceBounded(true).build(); + + assertThat(dataStream.getTransformation()).isInstanceOf(SourceTransformation.class); + SourceTransformation transformation = + (SourceTransformation) dataStream.getTransformation(); + assertThat(transformation.getSource()).isInstanceOf(PaimonDataStreamSource.class); + } + + @Test + public void testBuildWrapsContinuousSourceWithPaimonDataStreamSource() throws Exception { + Table table = createTable("continuous_source", false, -1, true); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream dataStream = + new FlinkSourceBuilder(table).env(env).sourceBounded(false).build(); + + assertThat(dataStream.getTransformation()).isInstanceOf(SourceTransformation.class); + SourceTransformation transformation = + (SourceTransformation) dataStream.getTransformation(); + assertThat(transformation.getSource()).isInstanceOf(PaimonDataStreamSource.class); + } + + @Test + public void testMonitorSourceBuildSourceWrapsWithPaimonDataStreamSource() throws Exception { + Table table = createTable("monitor_source", false, -1, true); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream dataStream = + MonitorSource.buildSource( + env, + "source", + InternalTypeInfo.of(toLogicalType(table.rowType())), + table.newReadBuilder(), + 10, + false, + false, + false, + null, + true, + null, + table); + + assertThat(dataStream.getTransformation().getTransitivePredecessors()) + .filteredOn(Transformation.class::isInstance) + .filteredOn(transformation -> transformation instanceof SourceTransformation) + .anySatisfy( + transformation -> + assertThat( + ((SourceTransformation) transformation) + .getSource()) + .isInstanceOf(PaimonDataStreamSource.class)); + } }