Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,30 +224,6 @@ protected void validateSchema() {
mainRowType,
otherBranch,
otherRowType);

List<String> mainPrimaryKeys = wrapped.schema().primaryKeys();
List<String> otherPrimaryKeys = other.schema().primaryKeys();
if (!mainPrimaryKeys.isEmpty()) {
if (otherPrimaryKeys.isEmpty()) {
throw new IllegalArgumentException(
"Branch "
+ mainBranch
+ " has primary keys while branch "
+ otherBranch
+ " does not. This is not allowed.");
}
Preconditions.checkArgument(
mainPrimaryKeys.equals(otherPrimaryKeys),
"Branch %s and %s both have primary keys but are not the same.\n"
+ "Primary keys of %s are %s.\n"
+ "Primary keys of %s are %s.",
mainBranch,
otherBranch,
mainBranch,
mainPrimaryKeys,
otherBranch,
otherPrimaryKeys);
}
}

private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType otherRowType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,48 @@ public void testRenameBranch() throws Exception {
.hasMessageContaining("Branch");
}

@Test
public void testPrimaryBranchBatchRead() throws Exception {
// Create non-PK table, then create branch, then ALTER main to add PKs.
// This results in main = PK table, branch = non-PK table.
sql(
"CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) "
+ "PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )");

sql("CALL sys.create_branch('default.t', 'nb')");
sql("ALTER TABLE t SET ( 'primary-key' = 'pt, k', 'bucket' = '2' )");
sql("ALTER TABLE t SET ( 'scan.primary-branch' = 'nb' )");

// Insert into non-PK branch (primary, has priority)
sql("INSERT INTO `t$branch_nb` VALUES (1, 20, 'cat'), (1, 30, 'dog')");
// Insert overlapping partition into PK main (fallback)
sql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')");

// pt=1 exists in primary branch → read from branch
assertThat(collectResult("SELECT v, k FROM t"))
.containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]");
assertThat(collectResult("SELECT v, k FROM `t$branch_nb`"))
.containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]");

// Insert pt=2 into primary branch, pt=3 only into main
sql("INSERT INTO `t$branch_nb` VALUES (2, 10, 'tiger'), (2, 20, 'wolf')");
sql("INSERT INTO t VALUES (3, 10, 'horse')");

// pt=1,2 from primary branch; pt=3 from main (fallback)
assertThat(collectResult("SELECT v, k FROM t"))
.containsExactlyInAnyOrder(
"+I[cat, 20]",
"+I[dog, 30]",
"+I[tiger, 10]",
"+I[wolf, 20]",
"+I[horse, 10]");

// Unset scan.primary-branch, main table should show its own data
sql("ALTER TABLE t RESET ( 'scan.primary-branch' )");
assertThat(collectResult("SELECT v, k FROM t"))
.containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]", "+I[horse, 10]");
}

private List<String> collectResult(String sql) throws Exception {
List<String> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
Expand Down