diff --git a/docs/content/primary-key-table/changelog-producer.md b/docs/content/primary-key-table/changelog-producer.md index 916e89a01000..d472bddf9ee9 100644 --- a/docs/content/primary-key-table/changelog-producer.md +++ b/docs/content/primary-key-table/changelog-producer.md @@ -42,7 +42,7 @@ However, these merged changes cannot form a complete changelog, because we can't Consider a consumer which calculates the sum on some grouping keys (might not be equal to the primary keys). If the consumer only sees a new value `5`, it cannot determine what values should be added to the summing result. For example, if the old value is `4`, it should add `1` to the result. But if the old value is `6`, it should in turn subtract `1` from the result. Old values are important for these types of consumers. -To conclude, `none` changelog producers are best suited for consumers such as a database system. Flink also has a +To conclude, `none` changelog producers are best suited for consumers such as a database system. Flink also has a built-in "normalize" operator which persists the values of each key in states. As one can easily tell, this operator will be very costly and should be avoided. (You can force removing "normalize" operator via `'scan.remove-normalize'`.) @@ -131,6 +131,10 @@ efficient as the input changelog producer and the latency to produce changelog m Full-compaction changelog-producer supports `changelog-producer.row-deduplicate` to avoid generating -U, +U changelog for the same record. +The `full-compaction.delta-commits` property is only supported with full-compaction changlog +producers. If you need both efficient lookup based changelog generation +and periodic full compaction (e.g., Iceberg), `'full-compaction'` can be selected as the changelog producer. + ## Changelog Merging For `input`, `lookup`, `full-compaction` 'changelog-producer'. diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 064630023d95..998ce1644220 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -139,6 +139,14 @@ public static void validateTableSchema(TableSchema schema) { ChangelogProducer.LOOKUP)); } + if (options.toMap().get(FULL_COMPACTION_DELTA_COMMITS.key()) != null + && changelogProducer == ChangelogProducer.LOOKUP) { + throw new UnsupportedOperationException( + String.format( + "'%s' property is not supported for '%s' changelog producer type.", + FULL_COMPACTION_DELTA_COMMITS.key(), ChangelogProducer.LOOKUP)); + } + checkArgument( options.snapshotNumRetainMin() > 0, SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1"); diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index 501b9f588dbc..262849d4265b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -122,4 +122,27 @@ public void testRecordLevelTimeField() { .hasMessageContaining( "The record level time field type should be one of INT, BIGINT, or TIMESTAMP, but field type is STRING."); } + + @Test + void testValidateFullCompactionDeltaCommitsWithLookupChangelogProducer() { + Map options = new HashMap<>(); + + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup"); + options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1"); + + assertThatThrownBy(() -> validateTableSchemaExec(options)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining( + "'full-compaction.delta-commits' property is not supported for 'lookup' changelog producer type"); + + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction"); + options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1"); + + assertThatNoException().isThrownBy(() -> validateTableSchemaExec(options)); + + options.clear(); + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup"); + + assertThatNoException().isThrownBy(() -> validateTableSchemaExec(options)); + } }