From 209829207fb0fb3c2ba92517121b1943419992ab Mon Sep 17 00:00:00 2001 From: tsreaper Date: Thu, 9 Apr 2026 14:45:06 +0800 Subject: [PATCH 1/3] [core] When scan.primary-branch is set, allow primary branch to be an append only table --- .../table/FallbackReadFileStoreTable.java | 53 ++++++++++--------- .../apache/paimon/flink/BranchSqlITCase.java | 43 +++++++++++++++ 2 files changed, 71 insertions(+), 25 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index 5c839a558107..0a0ad0a6caef 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -209,44 +209,47 @@ public DataTableScan newScan(Function scanCreator } protected void validateSchema() { - String mainBranch = wrapped.coreOptions().branch(); - String otherBranch = other.coreOptions().branch(); - RowType mainRowType = wrapped.schema().logicalRowType(); - RowType otherRowType = other.schema().logicalRowType(); + FileStoreTable first = wrappedFirst ? wrapped : other; + FileStoreTable second = wrappedFirst ? other : wrapped; + + String firstBranch = first.coreOptions().branch(); + String secondBranch = second.coreOptions().branch(); + RowType firstRowType = first.schema().logicalRowType(); + RowType secondRowType = second.schema().logicalRowType(); Preconditions.checkArgument( - sameRowTypeIgnoreNullable(mainRowType, otherRowType), + sameRowTypeIgnoreNullable(firstRowType, secondRowType), "Branch %s and %s does not have the same row type.\n" + "Row type of branch %s is %s.\n" + "Row type of branch %s is %s.", - mainBranch, - otherBranch, - mainBranch, - mainRowType, - otherBranch, - otherRowType); - - List mainPrimaryKeys = wrapped.schema().primaryKeys(); - List otherPrimaryKeys = other.schema().primaryKeys(); - if (!mainPrimaryKeys.isEmpty()) { - if (otherPrimaryKeys.isEmpty()) { + firstBranch, + secondBranch, + firstBranch, + firstRowType, + secondBranch, + secondRowType); + + List firstPrimaryKeys = first.schema().primaryKeys(); + List secondPrimaryKeys = second.schema().primaryKeys(); + if (!firstPrimaryKeys.isEmpty()) { + if (secondPrimaryKeys.isEmpty()) { throw new IllegalArgumentException( "Branch " - + mainBranch + + firstBranch + " has primary keys while branch " - + otherBranch + + secondBranch + " does not. This is not allowed."); } Preconditions.checkArgument( - mainPrimaryKeys.equals(otherPrimaryKeys), + firstPrimaryKeys.equals(secondPrimaryKeys), "Branch %s and %s both have primary keys but are not the same.\n" + "Primary keys of %s are %s.\n" + "Primary keys of %s are %s.", - mainBranch, - otherBranch, - mainBranch, - mainPrimaryKeys, - otherBranch, - otherPrimaryKeys); + firstBranch, + secondBranch, + firstBranch, + firstPrimaryKeys, + secondBranch, + secondPrimaryKeys); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 641efb733595..279e245d4877 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -923,6 +923,49 @@ public void testRenameBranch() throws Exception { .hasMessageContaining("Branch"); } + @Test + public void testPrimaryBranchBatchRead() throws Exception { + // Create non-PK table, then create branch, then ALTER main to add PKs. + // This results in main = PK table, branch = non-PK table. + sql( + "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) " + + "PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )"); + + sql("CALL sys.create_branch('default.t', 'nb')"); + sql("ALTER TABLE t SET ( 'primary-key' = 'pt, k', 'bucket' = '2' )"); + sql("ALTER TABLE t SET ( 'scan.primary-branch' = 'nb' )"); + + // Insert into non-PK branch (primary, has priority) + sql("INSERT INTO `t$branch_nb` VALUES (1, 20, 'cat'), (1, 30, 'dog')"); + // Insert overlapping partition into PK main (fallback) + sql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')"); + + // pt=1 exists in primary branch → read from branch + assertThat(collectResult("SELECT v, k FROM t")) + .containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]"); + assertThat(collectResult("SELECT v, k FROM `t$branch_nb`")) + .containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]"); + + // Insert pt=2 into primary branch, pt=3 only into main + sql("INSERT INTO `t$branch_nb` VALUES (2, 10, 'tiger'), (2, 20, 'wolf')"); + sql("INSERT INTO t VALUES (3, 10, 'horse')"); + + // pt=1,2 from primary branch; pt=3 from main (fallback) + assertThat(collectResult("SELECT v, k FROM t")) + .containsExactlyInAnyOrder( + "+I[cat, 20]", + "+I[dog, 30]", + "+I[tiger, 10]", + "+I[wolf, 20]", + "+I[horse, 10]"); + + // Unset scan.primary-branch, main table should show its own data + sql("ALTER TABLE t RESET ( 'scan.primary-branch' )"); + assertThat(collectResult("SELECT v, k FROM t")) + .containsExactlyInAnyOrder( + "+I[apple, 10]", "+I[banana, 20]", "+I[horse, 10]"); + } + private List collectResult(String sql) throws Exception { List result = new ArrayList<>(); try (CloseableIterator it = tEnv.executeSql(sql).collect()) { From 0439761a85685750e6b1563e1fda3aad89c9dca7 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Thu, 9 Apr 2026 17:25:25 +0800 Subject: [PATCH 2/3] [fix] Fix checkstyle --- .../src/test/java/org/apache/paimon/flink/BranchSqlITCase.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 279e245d4877..7ecfbf77b2ea 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -962,8 +962,7 @@ public void testPrimaryBranchBatchRead() throws Exception { // Unset scan.primary-branch, main table should show its own data sql("ALTER TABLE t RESET ( 'scan.primary-branch' )"); assertThat(collectResult("SELECT v, k FROM t")) - .containsExactlyInAnyOrder( - "+I[apple, 10]", "+I[banana, 20]", "+I[horse, 10]"); + .containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]", "+I[horse, 10]"); } private List collectResult(String sql) throws Exception { From 5a4ca1963646740975e9608526e2c7b0f59d214b Mon Sep 17 00:00:00 2001 From: tsreaper Date: Thu, 9 Apr 2026 21:48:33 +0800 Subject: [PATCH 3/3] [fix] Fix comments --- .../table/FallbackReadFileStoreTable.java | 49 +++++-------------- 1 file changed, 11 insertions(+), 38 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index 0a0ad0a6caef..1d1c6214cec6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -209,48 +209,21 @@ public DataTableScan newScan(Function scanCreator } protected void validateSchema() { - FileStoreTable first = wrappedFirst ? wrapped : other; - FileStoreTable second = wrappedFirst ? other : wrapped; - - String firstBranch = first.coreOptions().branch(); - String secondBranch = second.coreOptions().branch(); - RowType firstRowType = first.schema().logicalRowType(); - RowType secondRowType = second.schema().logicalRowType(); + String mainBranch = wrapped.coreOptions().branch(); + String otherBranch = other.coreOptions().branch(); + RowType mainRowType = wrapped.schema().logicalRowType(); + RowType otherRowType = other.schema().logicalRowType(); Preconditions.checkArgument( - sameRowTypeIgnoreNullable(firstRowType, secondRowType), + sameRowTypeIgnoreNullable(mainRowType, otherRowType), "Branch %s and %s does not have the same row type.\n" + "Row type of branch %s is %s.\n" + "Row type of branch %s is %s.", - firstBranch, - secondBranch, - firstBranch, - firstRowType, - secondBranch, - secondRowType); - - List firstPrimaryKeys = first.schema().primaryKeys(); - List secondPrimaryKeys = second.schema().primaryKeys(); - if (!firstPrimaryKeys.isEmpty()) { - if (secondPrimaryKeys.isEmpty()) { - throw new IllegalArgumentException( - "Branch " - + firstBranch - + " has primary keys while branch " - + secondBranch - + " does not. This is not allowed."); - } - Preconditions.checkArgument( - firstPrimaryKeys.equals(secondPrimaryKeys), - "Branch %s and %s both have primary keys but are not the same.\n" - + "Primary keys of %s are %s.\n" - + "Primary keys of %s are %s.", - firstBranch, - secondBranch, - firstBranch, - firstPrimaryKeys, - secondBranch, - secondPrimaryKeys); - } + mainBranch, + otherBranch, + mainBranch, + mainRowType, + otherBranch, + otherRowType); } private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType otherRowType) {