Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public static String getNamespace(Table table) {
public static SourceLineageVertex sourceLineageVertex(
String name, boolean isBounded, Table table) {
LineageDataset dataset =
new PaimonLineageDataset(name, getNamespace(table), buildConfigMap(table));
new PaimonLineageDataset(
name, getNamespace(table), buildConfigMap(table), table.rowType());
Boundedness boundedness =
isBounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
return new PaimonSourceLineageVertex(boundedness, Collections.singletonList(dataset));
Expand All @@ -92,7 +93,8 @@ public static SourceLineageVertex sourceLineageVertex(
*/
public static LineageVertex sinkLineageVertex(String name, Table table) {
LineageDataset dataset =
new PaimonLineageDataset(name, getNamespace(table), buildConfigMap(table));
new PaimonLineageDataset(
name, getNamespace(table), buildConfigMap(table), table.rowType());
return new PaimonSinkLineageVertex(Collections.singletonList(dataset));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@

package org.apache.paimon.flink.lineage;

import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;

import org.apache.flink.streaming.api.lineage.DatasetConfigFacet;
import org.apache.flink.streaming.api.lineage.DatasetSchemaFacet;
import org.apache.flink.streaming.api.lineage.DatasetSchemaField;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

/**
Expand All @@ -34,11 +42,21 @@ public class PaimonLineageDataset implements LineageDataset {
private final String name;
private final String namespace;
private final Map<String, String> tableOptions;
@Nullable private final RowType rowType;

public PaimonLineageDataset(String name, String namespace, Map<String, String> tableOptions) {
this(name, namespace, tableOptions, null);
}

public PaimonLineageDataset(
String name,
String namespace,
Map<String, String> tableOptions,
@Nullable RowType rowType) {
this.name = name;
this.namespace = namespace;
this.tableOptions = tableOptions;
this.rowType = rowType;
}

@Override
Expand Down Expand Up @@ -67,6 +85,39 @@ public Map<String, String> config() {
return tableOptions;
}
});
if (rowType != null) {
facets.put(
"schema",
new DatasetSchemaFacet() {
@Override
public String name() {
return "schema";
}

@Override
public Map<String, DatasetSchemaField<String>> fields() {
Map<String, DatasetSchemaField<String>> result = new LinkedHashMap<>();
for (DataField field : rowType.getFields()) {
String fieldName = field.name();
String fieldType = field.type().asSQLString();
result.put(
fieldName,
new DatasetSchemaField<String>() {
@Override
public String name() {
return fieldName;
}

@Override
public String type() {
return fieldType;
}
});
}
return result;
}
});
}
return facets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.lineage.LineageUtils;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.format.FormatTableWrite;
import org.apache.paimon.table.sink.BatchTableCommit;
Expand All @@ -32,6 +33,8 @@
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.table.data.RowData;

import java.util.List;
Expand All @@ -55,7 +58,7 @@ public DataStreamSink<?> sinkFrom(DataStream<RowData> dataStream) {
return dataStream.sinkTo(new FormatTableSink(table, overwrite, staticPartitions));
}

private static class FormatTableSink implements Sink<RowData> {
private static class FormatTableSink implements Sink<RowData>, LineageVertexProvider {

private final FormatTable table;
private final boolean overwrite;
Expand All @@ -68,6 +71,11 @@ public FormatTableSink(
this.staticPartitions = staticPartitions;
}

@Override
public LineageVertex getLineageVertex() {
return LineageUtils.sinkLineageVertex(table.fullName(), table);
}

/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink
* 2.0+.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -240,7 +239,7 @@ public DataStreamSink<?> doCommit(DataStream<Committable> written, String commit
}
configureSlotSharingGroup(
committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY));
return committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
return committed.sinkTo(new PaimonDiscardingSink<>(table)).name("end").setParallelism(1);
}

public static void configureSlotSharingGroup(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink;

import org.apache.paimon.flink.lineage.LineageUtils;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;

/**
* A {@link DiscardingSink} that implements {@link LineageVertexProvider} so Flink's lineage graph
* discovers the Paimon sink table when using the DataStream API.
*/
public class PaimonDiscardingSink<T> extends DiscardingSink<T> implements LineageVertexProvider {

private static final long serialVersionUID = 1L;

private final FileStoreTable table;

public PaimonDiscardingSink(FileStoreTable table) {
this.table = table;
}

@Override
public LineageVertex getLineageVertex() {
return LineageUtils.sinkLineageVertex(table.fullName(), table);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private DataStream<RowData> buildAlignedContinuousFileSource() {
private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
DataStreamSource<RowData> dataStream =
env.fromSource(
source,
new PaimonDataStreamSource<>(source, table),
watermarkStrategy == null
? WatermarkStrategy.noWatermarks()
: watermarkStrategy,
Expand Down Expand Up @@ -354,7 +354,8 @@ private DataStream<RowData> buildDedicatedSplitGenSource(boolean isBounded) {
unordered,
outerProject(),
isBounded,
limit);
limit,
table);
if (parallelism != null) {
dataStream.getTransformation().setParallelism(parallelism);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.source;

import org.apache.paimon.flink.lineage.LineageUtils;
import org.apache.paimon.table.Table;

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;

/**
* A {@link Source} wrapper that preserves the wrapped source behavior and exposes Paimon lineage
* for sources built through {@link FlinkSourceBuilder}.
*/
public class PaimonDataStreamSource<T, SplitT extends SourceSplit, CheckpointT>
implements Source<T, SplitT, CheckpointT>, LineageVertexProvider {

private static final long serialVersionUID = 1L;

private final Source<T, SplitT, CheckpointT> source;
private final Table table;

public PaimonDataStreamSource(Source<T, SplitT, CheckpointT> source, Table table) {
this.source = source;
this.table = table;
}

@Override
public Boundedness getBoundedness() {
return source.getBoundedness();
}

@Override
public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext)
throws Exception {
return source.createReader(readerContext);
}

@Override
public SplitEnumerator<SplitT, CheckpointT> createEnumerator(
SplitEnumeratorContext<SplitT> enumContext) throws Exception {
return source.createEnumerator(enumContext);
}

@Override
public SplitEnumerator<SplitT, CheckpointT> restoreEnumerator(
SplitEnumeratorContext<SplitT> enumContext, CheckpointT checkpoint) throws Exception {
return source.restoreEnumerator(enumContext, checkpoint);
}

@Override
public SimpleVersionedSerializer<SplitT> getSplitSerializer() {
return source.getSplitSerializer();
}

@Override
public SimpleVersionedSerializer<CheckpointT> getEnumeratorCheckpointSerializer() {
return source.getEnumeratorCheckpointSerializer();
}

@Override
public LineageVertex getLineageVertex() {
return LineageUtils.sourceLineageVertex(
table.fullName(), getBoundedness() == Boundedness.BOUNDED, table);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import org.apache.paimon.flink.NestedProjectedRowData;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.NoOpEnumState;
import org.apache.paimon.flink.source.PaimonDataStreamSource;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.source.SplitListState;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
Expand All @@ -37,6 +40,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -242,13 +246,43 @@ public static DataStream<RowData> buildSource(
NestedProjectedRowData nestedProjectedRowData,
boolean isBounded,
@Nullable Long limit) {
return buildSource(
env,
name,
typeInfo,
readBuilder,
monitorInterval,
emitSnapshotWatermark,
shuffleBucketWithPartition,
unordered,
nestedProjectedRowData,
isBounded,
limit,
null);
}

public static DataStream<RowData> buildSource(
StreamExecutionEnvironment env,
String name,
TypeInformation<RowData> typeInfo,
ReadBuilder readBuilder,
long monitorInterval,
boolean emitSnapshotWatermark,
boolean shuffleBucketWithPartition,
boolean unordered,
NestedProjectedRowData nestedProjectedRowData,
boolean isBounded,
@Nullable Long limit,
@Nullable Table table) {
MonitorSource monitorSource =
new MonitorSource(readBuilder, monitorInterval, emitSnapshotWatermark, isBounded);
Source<Split, SimpleSourceSplit, NoOpEnumState> source = monitorSource;
if (table != null) {
source = new PaimonDataStreamSource<>(monitorSource, table);
}
SingleOutputStreamOperator<Split> operator =
env.fromSource(
new MonitorSource(
readBuilder,
monitorInterval,
emitSnapshotWatermark,
isBounded),
source,
WatermarkStrategy.noWatermarks(),
name + "-Monitor",
new JavaTypeInfo<>(Split.class))
Expand Down
Loading
Loading