Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -120,6 +121,21 @@ public Collection<MultiTableCommittable> prepareCommit() {
.getValue()
.prepareCommit(false, lastCheckpointId + 1)
.stream()
// Filter out empty Committable to avoid unnecessary
// commit operations.
.filter(
committable -> {
Object wrapped =
committable
.wrappedCommittable();
if (wrapped
instanceof CommitMessageImpl) {
return !((CommitMessageImpl)
wrapped)
.isEmpty();
}
return true;
})
.map(
committable ->
MultiTableCommittable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,139 @@ public void testSinkWithHiveMetastorePartitionedTable(String metastore) throws E
Row.ofKind(RowKind.INSERT, "20250604", "2", "Bob"));
}

@ParameterizedTest
@CsvSource({"filesystem, true"})
public void testEmptyCommitFilter(String metastore, boolean enableDeleteVector)
throws IOException,
InterruptedException,
Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException,
SchemaEvolveException {
initialize(metastore);
PaimonSink<Event> paimonSink =
new PaimonSink<>(
catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
PaimonWriter<Event> writer = paimonSink.createWriter(new MockInitContext());
Committer<MultiTableCommittable> committer = paimonSink.createCommitter();

// Create table and insert initial data
writeAndCommit(
writer, committer, createTestEvents(enableDeleteVector).toArray(new Event[0]));
Assertions.assertThat(fetchResults(table1))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2"));

// Get snapshot count before empty commits
List<Row> snapshotsBefore = new ArrayList<>();
tEnv.sqlQuery("select * from paimon_catalog.test.`table1$snapshots`")
.execute()
.collect()
.forEachRemaining(snapshotsBefore::add);
int snapshotCountBefore = snapshotsBefore.size();

// Prepare commit with no data written (should filter out empty commits)
Collection<MultiTableCommittable> emptyCommittables = writer.prepareCommit();
// Verify that empty commits are filtered out
Assertions.assertThat(emptyCommittables).isEmpty();

// Write and commit new data
writeAndCommit(
writer,
committer,
generateInsert(
table1, Arrays.asList(Tuple2.of(STRING(), "3"), Tuple2.of(STRING(), "3"))));

// Get snapshot count after data commit
List<Row> snapshotsAfter = new ArrayList<>();
tEnv.sqlQuery("select * from paimon_catalog.test.`table1$snapshots`")
.execute()
.collect()
.forEachRemaining(snapshotsAfter::add);
int snapshotCountAfter = snapshotsAfter.size();

// When deletion-vectors is enabled, each APPEND triggers a COMPACT, creating 2 snapshots
// When disabled, only 1 snapshot is created per data commit
int expectedSnapshotCount =
enableDeleteVector ? snapshotCountBefore + 2 : snapshotCountBefore + 1;
// Verify that only the data commit created snapshots, empty commits were filtered
Assertions.assertThat(snapshotCountAfter).isEqualTo(expectedSnapshotCount);

// Verify data is correctly written
Assertions.assertThat(fetchResults(table1))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, "1", "1"),
Row.ofKind(RowKind.INSERT, "2", "2"),
Row.ofKind(RowKind.INSERT, "3", "3"));
}

@ParameterizedTest
@CsvSource({"filesystem, true"})
public void testMultipleEmptyCommits(String metastore, boolean enableDeleteVector)
throws IOException,
InterruptedException,
Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException,
SchemaEvolveException {
initialize(metastore);
PaimonSink<Event> paimonSink =
new PaimonSink<>(
catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
PaimonWriter<Event> writer = paimonSink.createWriter(new MockInitContext());
Committer<MultiTableCommittable> committer = paimonSink.createCommitter();

// Create table and insert initial data
writeAndCommit(
writer, committer, createTestEvents(enableDeleteVector).toArray(new Event[0]));

// Get snapshot count before multiple empty commits
List<Row> snapshotsBefore = new ArrayList<>();
tEnv.sqlQuery("select * from paimon_catalog.test.`table1$snapshots`")
.execute()
.collect()
.forEachRemaining(snapshotsBefore::add);
int snapshotCountBefore = snapshotsBefore.size();

// Prepare multiple empty commits (should all be filtered out)
for (int i = 0; i < 3; i++) {
Collection<MultiTableCommittable> emptyCommittables = writer.prepareCommit();
Assertions.assertThat(emptyCommittables).isEmpty();
}

// Verify no snapshots were created by empty commits
List<Row> snapshotsDuring = new ArrayList<>();
tEnv.sqlQuery("select * from paimon_catalog.test.`table1$snapshots`")
.execute()
.collect()
.forEachRemaining(snapshotsDuring::add);
Assertions.assertThat(snapshotsDuring).hasSameSizeAs(snapshotsBefore);

// Write and commit new data
writeAndCommit(
writer,
committer,
generateInsert(
table1, Arrays.asList(Tuple2.of(STRING(), "4"), Tuple2.of(STRING(), "4"))));

// When deletion-vectors is enabled, each APPEND triggers a COMPACT, creating 2 snapshots
// When disabled, only 1 snapshot is created per data commit
int expectedSnapshotCount =
enableDeleteVector ? snapshotCountBefore + 2 : snapshotCountBefore + 1;
// Verify only expected snapshots were created for the data commit
List<Row> snapshotsAfter = new ArrayList<>();
tEnv.sqlQuery("select * from paimon_catalog.test.`table1$snapshots`")
.execute()
.collect()
.forEachRemaining(snapshotsAfter::add);
Assertions.assertThat(snapshotsAfter).hasSize(expectedSnapshotCount);

// Verify data is correctly written
Assertions.assertThat(fetchResults(table1))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, "1", "1"),
Row.ofKind(RowKind.INSERT, "2", "2"),
Row.ofKind(RowKind.INSERT, "4", "4"));
}

private void runJobWithEvents(List<Event> events, boolean isBatchMode) throws Exception {
DataStream<Event> stream = env.fromCollection(events, TypeInformation.of(Event.class));

Expand Down
Loading