diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index 5c839a558107..1d1c6214cec6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -224,30 +224,6 @@ protected void validateSchema() { mainRowType, otherBranch, otherRowType); - - List mainPrimaryKeys = wrapped.schema().primaryKeys(); - List otherPrimaryKeys = other.schema().primaryKeys(); - if (!mainPrimaryKeys.isEmpty()) { - if (otherPrimaryKeys.isEmpty()) { - throw new IllegalArgumentException( - "Branch " - + mainBranch - + " has primary keys while branch " - + otherBranch - + " does not. This is not allowed."); - } - Preconditions.checkArgument( - mainPrimaryKeys.equals(otherPrimaryKeys), - "Branch %s and %s both have primary keys but are not the same.\n" - + "Primary keys of %s are %s.\n" - + "Primary keys of %s are %s.", - mainBranch, - otherBranch, - mainBranch, - mainPrimaryKeys, - otherBranch, - otherPrimaryKeys); - } } private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType otherRowType) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 641efb733595..7ecfbf77b2ea 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -923,6 +923,48 @@ public void testRenameBranch() throws Exception { .hasMessageContaining("Branch"); } + @Test + public void testPrimaryBranchBatchRead() throws Exception { + // Create non-PK table, then create branch, then ALTER main to add PKs. + // This results in main = PK table, branch = non-PK table. + sql( + "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) " + + "PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )"); + + sql("CALL sys.create_branch('default.t', 'nb')"); + sql("ALTER TABLE t SET ( 'primary-key' = 'pt, k', 'bucket' = '2' )"); + sql("ALTER TABLE t SET ( 'scan.primary-branch' = 'nb' )"); + + // Insert into non-PK branch (primary, has priority) + sql("INSERT INTO `t$branch_nb` VALUES (1, 20, 'cat'), (1, 30, 'dog')"); + // Insert overlapping partition into PK main (fallback) + sql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')"); + + // pt=1 exists in primary branch → read from branch + assertThat(collectResult("SELECT v, k FROM t")) + .containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]"); + assertThat(collectResult("SELECT v, k FROM `t$branch_nb`")) + .containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]"); + + // Insert pt=2 into primary branch, pt=3 only into main + sql("INSERT INTO `t$branch_nb` VALUES (2, 10, 'tiger'), (2, 20, 'wolf')"); + sql("INSERT INTO t VALUES (3, 10, 'horse')"); + + // pt=1,2 from primary branch; pt=3 from main (fallback) + assertThat(collectResult("SELECT v, k FROM t")) + .containsExactlyInAnyOrder( + "+I[cat, 20]", + "+I[dog, 30]", + "+I[tiger, 10]", + "+I[wolf, 20]", + "+I[horse, 10]"); + + // Unset scan.primary-branch, main table should show its own data + sql("ALTER TABLE t RESET ( 'scan.primary-branch' )"); + assertThat(collectResult("SELECT v, k FROM t")) + .containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]", "+I[horse, 10]"); + } + private List collectResult(String sql) throws Exception { List result = new ArrayList<>(); try (CloseableIterator it = tEnv.executeSql(sql).collect()) {