Skip to content

[flink] Implement FLIP-314 LineageVertexProvider for non-table source & sink APIs#8001

Open
jsingh-yelp wants to merge 1 commit into
apache:masterfrom
jsingh-yelp:add-lineage-for-non-table-apis
Open

[flink] Implement FLIP-314 LineageVertexProvider for non-table source & sink APIs#8001
jsingh-yelp wants to merge 1 commit into
apache:masterfrom
jsingh-yelp:add-lineage-for-non-table-apis

Conversation

@jsingh-yelp
Copy link
Copy Markdown
Contributor

@jsingh-yelp jsingh-yelp commented May 27, 2026

Purpose

What is covered?

Component Approach Covers
DataStream Sources PaimonDataStreamSource wraps any Paimon Source with LineageVertexProvider StaticFileStoreSourceContinuousFileStoreSourceAlignedContinuousFileStoreSourceMonitorSource (via FlinkSourceBuilder)
DataStream Sinks PaimonDiscardingSink extends DiscardingSink with lineage All FlinkSink subclasses (append, upsert, CDC fixed/dynamic bucket)
Format Table Sink FormatTableSink implements LineageVertexProvider directly FlinkFormatTableDataStreamSink (Parquet/ORC direct writes)

What is still not covered?

Not covered Reason
Multi-table CDC sinks (FlinkCdcMultiTableSink) Tables are created dynamically at runtime, while getLineageVertex() is called during graph construction before the output tables are known. This can be revisited in a future improvement.
CompactorSourceBuilder / SystemTableSource Maintenance or metadata-oriented paths rather than regular source-to-sink user data pipelines.

Tests

  • Added tests for the changes done in this PR.
  • Did manual testing for various use cases I had readily available for both Paimon as a source and sink:

Example paimon table looks like this inside the lineage event:

{
      "namespace": "file:///tmp/paimon-warehouse",
      "name": "jaskaran_test.default_value",
      "facets": {
        "schema": {
          "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.47.1/integration/flink",
          "_schemaURL": "https://openlineage.io/spec/facets/1-2-0/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet",
          "fields": [
            {
              "name": "id",
              "type": "STRING NOT NULL"
            },
            {
              "name": "age",
              "type": "INT"
            },
            {
              "name": "source",
              "type": "STRING"
            }
          ]
        },
        "symlinks": {
          "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.47.1/integration/flink",
          "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet",
          "identifiers": []
        },
        "config": {
          "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.47.1/integration/flink",
          "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/DatasetFacet",
          "bucket": "-1",
          "catalog.warehouse": "file:///tmp/paimon-warehouse",
          "path": "file:/tmp/paimon-warehouse/jaskaran_test.db/default_value",
          "write-only": "true",
          "changelog-producer": "input",
          "partition-keys": "",
          "type": "paimon",
          "primary-keys": "id"
        }
      }
    }

@jsingh-yelp jsingh-yelp force-pushed the add-lineage-for-non-table-apis branch from feac25e to 202acc6 Compare May 27, 2026 18:47
@jsingh-yelp jsingh-yelp changed the title [flink] Implement FLIP-314 LineageVertexProvider for non-table APIs [flink] Implement FLIP-314 LineageVertexProvider for non-table source & sink APIs May 27, 2026
@jsingh-yelp
Copy link
Copy Markdown
Contributor Author

cc: @JingsongLi and @yunfengzhou-hub can I please a review on this PR, this is extension to work done previously in:
#7311

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant