Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/content/primary-key-table/changelog-producer.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ However, these merged changes cannot form a complete changelog, because we can't

Consider a consumer which calculates the sum on some grouping keys (might not be equal to the primary keys). If the consumer only sees a new value `5`, it cannot determine what values should be added to the summing result. For example, if the old value is `4`, it should add `1` to the result. But if the old value is `6`, it should in turn subtract `1` from the result. Old values are important for these types of consumers.

To conclude, `none` changelog producers are best suited for consumers such as a database system. Flink also has a
To conclude, `none` changelog producers are best suited for consumers such as a database system. Flink also has a
built-in "normalize" operator which persists the values of each key in states. As one can easily tell, this operator
will be very costly and should be avoided. (You can force removing "normalize" operator via `'scan.remove-normalize'`.)

Expand Down Expand Up @@ -131,6 +131,10 @@ efficient as the input changelog producer and the latency to produce changelog m
Full-compaction changelog-producer supports `changelog-producer.row-deduplicate` to avoid generating -U, +U
changelog for the same record.

The `full-compaction.delta-commits` property is only supported with full-compaction changlog
producers. If you need both efficient lookup based changelog generation
and periodic full compaction (e.g., Iceberg), `'full-compaction'` can be selected as the changelog producer.

## Changelog Merging

For `input`, `lookup`, `full-compaction` 'changelog-producer'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ public static void validateTableSchema(TableSchema schema) {
ChangelogProducer.LOOKUP));
}

if (options.toMap().get(FULL_COMPACTION_DELTA_COMMITS.key()) != null
&& changelogProducer == ChangelogProducer.LOOKUP) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the condition should be changelogProducer != ChangelogProducer.FULL_COMPACTION as this property only works with full-compaction changelog producer.

throw new UnsupportedOperationException(
String.format(
"'%s' property is not supported for '%s' changelog producer type.",
FULL_COMPACTION_DELTA_COMMITS.key(), ChangelogProducer.LOOKUP));
}

checkArgument(
options.snapshotNumRetainMin() > 0,
SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,27 @@ public void testRecordLevelTimeField() {
.hasMessageContaining(
"The record level time field type should be one of INT, BIGINT, or TIMESTAMP, but field type is STRING.");
}

@Test
void testValidateFullCompactionDeltaCommitsWithLookupChangelogProducer() {
Map<String, String> options = new HashMap<>();

options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup");
options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");

assertThatThrownBy(() -> validateTableSchemaExec(options))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining(
"'full-compaction.delta-commits' property is not supported for 'lookup' changelog producer type");

options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");

assertThatNoException().isThrownBy(() -> validateTableSchemaExec(options));

options.clear();
options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup");

assertThatNoException().isThrownBy(() -> validateTableSchemaExec(options));
}
}
Loading