[spark] Add startup mode for batch read#2532
Conversation
|
@YannByron @wuchong please help take a look, thank you! |
wuchong
left a comment
There was a problem hiding this comment.
Thanks, @Yohahaha! It looks like no new tests have been added, should we include some tests for the new configuration option?
Also, as a best practice, we recommend first creating a dedicated issue to describe the feature and proposed APIs before submitting a pull request. The PR can then be linked to that issue. This helps us better track progress and maintain visibility across all subtasks of the umbrella initiative.
| ConfigBuilder | ||
| .key("scan.startup.mode") | ||
| .stringType() | ||
| .defaultValue(StartUpMode.LATEST.toString) |
There was a problem hiding this comment.
Should we use the default FULL mode to stay aligned with the Flink connector?
Using LATEST by default may result in empty results if the user doesn’t explicitly specify a startup mode for the query.
...uss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
Show resolved
Hide resolved
...uss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
Outdated
Show resolved
Hide resolved
...uss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
Show resolved
Hide resolved
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala
Outdated
Show resolved
Hide resolved
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
Outdated
Show resolved
Hide resolved
7c323dd to
9f47100
Compare
| val FULL, EARLIEST, LATEST, TIMESTAMP = Value | ||
| } | ||
|
|
||
| val SCAN_START_UP_MODE: ConfigOption[String] = |
There was a problem hiding this comment.
I suggest to place these common options used both spark and flink in ConfigOptions, like https://github.com/apache/paimon/blob/a10a44892cd5e9dbac705762ed6774674357692f/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java#L947 in paimon. We can do this in separate pr maybe.
There was a problem hiding this comment.
good advice, let's refactor these common config into fluss-common module in another PR, flink and spark can share it. cc @wuchong
There was a problem hiding this comment.
Good point. We can introduce ConnectorOptions in fluss-common package org.apache.fluss.config to share the common options for different connectors.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
Outdated
Show resolved
Hide resolved
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
Outdated
Show resolved
Hide resolved
| new Configuration() | ||
| } | ||
|
|
||
| override protected def beforeEach(): Unit = { |
There was a problem hiding this comment.
All the configs are default values, this's meaningless, so I suggest to remove this.
There was a problem hiding this comment.
@YannByron WithFlussAdmin use singleton fluss config that will affect integration test, if we dont reset config here, the fluss config working scope need to rethink.
There was a problem hiding this comment.
UT launched by mvn will fail without it.
2d08347 to
67105fa
Compare
|
@YannByron any more comments? |
|
+1. cc @wuchong |
Purpose
Add a new option
start.up.modeto read different offset from fluss.This PR only changes batch read related class.
Linked issue: close #2549
Brief change log
Tests
API and Format
Documentation