Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static org.apache.paimon.spark.util.OptionUtils.checkRequiredConfigurations;
import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
import static org.apache.paimon.spark.util.OptionUtils.withBranchFromOptions;
import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace;
import static org.apache.paimon.spark.utils.CatalogUtils.checkNoDefaultValue;
import static org.apache.paimon.spark.utils.CatalogUtils.isUpdateColumnDefaultValue;
Expand Down Expand Up @@ -759,7 +760,9 @@ public void dropV1Function(FunctionIdentifier funcIdent, boolean ifExists) throw
protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
Identifier ident, Map<String, String> extraOptions) throws NoSuchTableException {
try {
org.apache.paimon.catalog.Identifier tblIdent = toIdentifier(ident, catalogName);
org.apache.paimon.catalog.Identifier tblIdent =
withBranchFromOptions(
catalogName, toIdentifier(ident, catalogName), extraOptions);
org.apache.paimon.table.Table table =
copyWithSQLConf(
catalog.getTable(tblIdent), catalogName, tblIdent, extraOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark.util

import org.apache.paimon.CoreOptions
import org.apache.paimon.catalog.Identifier
import org.apache.paimon.options.ConfigOption
import org.apache.paimon.spark.{SparkCatalogOptions, SparkConnectorOptions}
Expand Down Expand Up @@ -165,15 +166,41 @@ object OptionUtils extends SQLConfHelper with Logging {
catalogName: String = null,
ident: Identifier = null,
extraOptions: JMap[String, String] = new JHashMap[String, String]()): T = {
val mergedOptions = if (catalogName != null && ident != null) {
mergeSQLConfWithIdentifier(extraOptions, catalogName, ident)
} else {
mergeSQLConf(extraOptions)
}
val mergedOptions = getMergedOptions(catalogName, ident, extraOptions)
if (mergedOptions.isEmpty) {
table
} else {
table.copy(mergedOptions).asInstanceOf[T]
}
}

private def getMergedOptions(
catalogName: String = null,
ident: Identifier = null,
extraOptions: JMap[String, String] = new JHashMap[String, String]()): JMap[String, String] = {
if (catalogName != null && ident != null) {
mergeSQLConfWithIdentifier(extraOptions, catalogName, ident)
} else {
mergeSQLConf(extraOptions)
}
}

def withBranchFromOptions(
catalogName: String = null,
identifier: Identifier = null,
extraOptions: JMap[String, String] = new JHashMap[String, String]()
): Identifier = {
if (identifier != null && !identifier.isSystemTable) {
val branch =
getMergedOptions(catalogName, identifier, extraOptions).get(CoreOptions.BRANCH.key)
if (branch != null && identifier.getBranchName == null) {
logWarning(
s"Using deprecated 'spark.paimon.branch=$branch' to access table '${identifier.getTableName}'. " +
s"Please migrate to '${identifier.getTableName}$$branch_$branch' syntax, as 'spark.paimon.branch' " +
s"will be removed in a future version.")
return new Identifier(identifier.getDatabaseName, identifier.getTableName, branch)
}
}
identifier
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2298,4 +2298,68 @@ public void testChainTableWithMultiGroupPartition(@TempDir java.nio.file.Path te

spark.close();
}

@Test
public void testChainTableWithBranchOption(@TempDir java.nio.file.Path tempDir)
throws IOException {
Path warehousePath = new Path("file:" + tempDir.toString());
SparkSession.Builder builder = createSparkSessionBuilder(warehousePath);
SparkSession spark = builder.getOrCreate();
spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
spark.sql("USE spark_catalog.my_db1");
spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
spark.sql(
"CREATE TABLE IF NOT EXISTS `chain_test` (\n"
+ " `t1` BIGINT,\n"
+ " `t2` BIGINT,\n"
+ " `t3` STRING\n"
+ ") PARTITIONED BY (`dt` STRING)\n"
+ "TBLPROPERTIES (\n"
+ " 'bucket-key' = 't1',\n"
+ " 'primary-key' = 'dt,t1',\n"
+ " 'partition.timestamp-pattern' = '$dt',\n"
+ " 'partition.timestamp-formatter' = 'yyyyMMdd',\n"
+ " 'chain-table.enabled' = 'true',\n"
+ " 'bucket' = '1',\n"
+ " 'merge-engine' = 'deduplicate',\n"
+ " 'sequence.field' = 't2'\n"
+ ")");
setupChainTableBranches(spark, "chain_test");
// Write main branch
spark.sql(
"INSERT OVERWRITE TABLE `my_db1`.`chain_test` PARTITION (dt = '20250810') VALUES (1, 3, '0')");
// Write delta branch
spark.sql("SET spark.paimon.branch = delta");
spark.sql(
"INSERT OVERWRITE TABLE `my_db1`.`chain_test` PARTITION (dt = '20250810') VALUES (1, 2, '1')");
spark.sql(
"INSERT OVERWRITE TABLE `my_db1`.`chain_test$branch_delta` PARTITION (dt = '20250811') VALUES (2, 2, '1')");
assertThat(spark.sql("SELECT * FROM `my_db1`.`chain_test$snapshots`").count()).isEqualTo(2);
spark.sql("RESET spark.paimon.branch");
assertThat(
spark.sql("SELECT * FROM `my_db1`.`chain_test` where dt = '20250811'")
.collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList()))
.containsExactlyInAnyOrder("[1,2,1,20250811]", "[2,2,1,20250811]");
assertThat(
spark
.sql(
"SELECT * FROM `my_db1`.`chain_test$branch_snapshot` WHERE dt = '20250811'")
.collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList()))
.isEmpty();

spark.sql("SET spark.paimon.branch = snapshot");
assertThat(
spark.sql("SELECT * FROM `my_db1`.`chain_test` where dt = '20250811'")
.collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList()))
.isEmpty();
assertThat(spark.sql("SELECT * FROM `my_db1`.`chain_test$snapshots`").count()).isEqualTo(2);
spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
spark.close();
}
}
Loading