From ff19ac658ea9ae8dc75b3009eed1312494f0cbe7 Mon Sep 17 00:00:00 2001 From: Juntao Zhang Date: Thu, 28 May 2026 13:12:25 +0800 Subject: [PATCH] [spark] Align spark.paimon.branch option with explicit branch syntax for chain tables --- .../org/apache/paimon/spark/SparkCatalog.java | 5 +- .../paimon/spark/util/OptionUtils.scala | 37 +++++++++-- .../paimon/spark/SparkChainTableITCase.java | 64 +++++++++++++++++++ 3 files changed, 100 insertions(+), 6 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 913d4f582af5..62c98e391a5b 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -99,6 +99,7 @@ import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType; import static org.apache.paimon.spark.util.OptionUtils.checkRequiredConfigurations; import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf; +import static org.apache.paimon.spark.util.OptionUtils.withBranchFromOptions; import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace; import static org.apache.paimon.spark.utils.CatalogUtils.checkNoDefaultValue; import static org.apache.paimon.spark.utils.CatalogUtils.isUpdateColumnDefaultValue; @@ -759,7 +760,9 @@ public void dropV1Function(FunctionIdentifier funcIdent, boolean ifExists) throw protected org.apache.spark.sql.connector.catalog.Table loadSparkTable( Identifier ident, Map extraOptions) throws NoSuchTableException { try { - org.apache.paimon.catalog.Identifier tblIdent = toIdentifier(ident, catalogName); + org.apache.paimon.catalog.Identifier tblIdent = + withBranchFromOptions( + catalogName, toIdentifier(ident, catalogName), extraOptions); org.apache.paimon.table.Table table = copyWithSQLConf( catalog.getTable(tblIdent), catalogName, tblIdent, extraOptions); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala index 7a6fa547c2a9..7716eb114e02 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.util +import org.apache.paimon.CoreOptions import org.apache.paimon.catalog.Identifier import org.apache.paimon.options.ConfigOption import org.apache.paimon.spark.{SparkCatalogOptions, SparkConnectorOptions} @@ -165,15 +166,41 @@ object OptionUtils extends SQLConfHelper with Logging { catalogName: String = null, ident: Identifier = null, extraOptions: JMap[String, String] = new JHashMap[String, String]()): T = { - val mergedOptions = if (catalogName != null && ident != null) { - mergeSQLConfWithIdentifier(extraOptions, catalogName, ident) - } else { - mergeSQLConf(extraOptions) - } + val mergedOptions = getMergedOptions(catalogName, ident, extraOptions) if (mergedOptions.isEmpty) { table } else { table.copy(mergedOptions).asInstanceOf[T] } } + + private def getMergedOptions( + catalogName: String = null, + ident: Identifier = null, + extraOptions: JMap[String, String] = new JHashMap[String, String]()): JMap[String, String] = { + if (catalogName != null && ident != null) { + mergeSQLConfWithIdentifier(extraOptions, catalogName, ident) + } else { + mergeSQLConf(extraOptions) + } + } + + def withBranchFromOptions( + catalogName: String = null, + identifier: Identifier = null, + extraOptions: JMap[String, String] = new JHashMap[String, String]() + ): Identifier = { + if (identifier != null && !identifier.isSystemTable) { + val branch = + getMergedOptions(catalogName, identifier, extraOptions).get(CoreOptions.BRANCH.key) + if (branch != null && identifier.getBranchName == null) { + logWarning( + s"Using deprecated 'spark.paimon.branch=$branch' to access table '${identifier.getTableName}'. " + + s"Please migrate to '${identifier.getTableName}$$branch_$branch' syntax, as 'spark.paimon.branch' " + + s"will be removed in a future version.") + return new Identifier(identifier.getDatabaseName, identifier.getTableName, branch) + } + } + identifier + } } diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java index 1907c7fcf3cc..c2833c223ea0 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java @@ -2298,4 +2298,68 @@ public void testChainTableWithMultiGroupPartition(@TempDir java.nio.file.Path te spark.close(); } + + @Test + public void testChainTableWithBranchOption(@TempDir java.nio.file.Path tempDir) + throws IOException { + Path warehousePath = new Path("file:" + tempDir.toString()); + SparkSession.Builder builder = createSparkSessionBuilder(warehousePath); + SparkSession spark = builder.getOrCreate(); + spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); + spark.sql("USE spark_catalog.my_db1"); + spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;"); + spark.sql( + "CREATE TABLE IF NOT EXISTS `chain_test` (\n" + + " `t1` BIGINT,\n" + + " `t2` BIGINT,\n" + + " `t3` STRING\n" + + ") PARTITIONED BY (`dt` STRING)\n" + + "TBLPROPERTIES (\n" + + " 'bucket-key' = 't1',\n" + + " 'primary-key' = 'dt,t1',\n" + + " 'partition.timestamp-pattern' = '$dt',\n" + + " 'partition.timestamp-formatter' = 'yyyyMMdd',\n" + + " 'chain-table.enabled' = 'true',\n" + + " 'bucket' = '1',\n" + + " 'merge-engine' = 'deduplicate',\n" + + " 'sequence.field' = 't2'\n" + + ")"); + setupChainTableBranches(spark, "chain_test"); + // Write main branch + spark.sql( + "INSERT OVERWRITE TABLE `my_db1`.`chain_test` PARTITION (dt = '20250810') VALUES (1, 3, '0')"); + // Write delta branch + spark.sql("SET spark.paimon.branch = delta"); + spark.sql( + "INSERT OVERWRITE TABLE `my_db1`.`chain_test` PARTITION (dt = '20250810') VALUES (1, 2, '1')"); + spark.sql( + "INSERT OVERWRITE TABLE `my_db1`.`chain_test$branch_delta` PARTITION (dt = '20250811') VALUES (2, 2, '1')"); + assertThat(spark.sql("SELECT * FROM `my_db1`.`chain_test$snapshots`").count()).isEqualTo(2); + spark.sql("RESET spark.paimon.branch"); + assertThat( + spark.sql("SELECT * FROM `my_db1`.`chain_test` where dt = '20250811'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("[1,2,1,20250811]", "[2,2,1,20250811]"); + assertThat( + spark + .sql( + "SELECT * FROM `my_db1`.`chain_test$branch_snapshot` WHERE dt = '20250811'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .isEmpty(); + + spark.sql("SET spark.paimon.branch = snapshot"); + assertThat( + spark.sql("SELECT * FROM `my_db1`.`chain_test` where dt = '20250811'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .isEmpty(); + assertThat(spark.sql("SELECT * FROM `my_db1`.`chain_test$snapshots`").count()).isEqualTo(2); + spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;"); + spark.close(); + } }