From 618c1389f676161bafaffd5764c49e2eeae2fc57 Mon Sep 17 00:00:00 2001 From: arnavb Date: Mon, 1 Sep 2025 14:14:31 +0000 Subject: [PATCH 1/2] update --- .../primary-key-table/changelog-producer.md | 6 ++++- .../paimon/schema/SchemaValidation.java | 8 +++++++ .../paimon/schema/SchemaValidationTest.java | 23 +++++++++++++++++++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/docs/content/primary-key-table/changelog-producer.md b/docs/content/primary-key-table/changelog-producer.md index 916e89a01000..80d4e68a14a5 100644 --- a/docs/content/primary-key-table/changelog-producer.md +++ b/docs/content/primary-key-table/changelog-producer.md @@ -42,7 +42,7 @@ However, these merged changes cannot form a complete changelog, because we can't Consider a consumer which calculates the sum on some grouping keys (might not be equal to the primary keys). If the consumer only sees a new value `5`, it cannot determine what values should be added to the summing result. For example, if the old value is `4`, it should add `1` to the result. But if the old value is `6`, it should in turn subtract `1` from the result. Old values are important for these types of consumers. -To conclude, `none` changelog producers are best suited for consumers such as a database system. Flink also has a +To conclude, `none` changelog producers are best suited for consumers such as a database system. Flink also has a built-in "normalize" operator which persists the values of each key in states. As one can easily tell, this operator will be very costly and should be avoided. (You can force removing "normalize" operator via `'scan.remove-normalize'`.) @@ -131,6 +131,10 @@ efficient as the input changelog producer and the latency to produce changelog m Full-compaction changelog-producer supports `changelog-producer.row-deduplicate` to avoid generating -U, +U changelog for the same record. +The `full-compaction.delta-commits` property is only supported with lookup changlog +producers. If you need both efficient lookup based changelog generation +and periodic full compaction (e.g., Iceberg), `'full-compaction'` can be selected as the changelog producer. + ## Changelog Merging For `input`, `lookup`, `full-compaction` 'changelog-producer'. diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 064630023d95..998ce1644220 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -139,6 +139,14 @@ public static void validateTableSchema(TableSchema schema) { ChangelogProducer.LOOKUP)); } + if (options.toMap().get(FULL_COMPACTION_DELTA_COMMITS.key()) != null + && changelogProducer == ChangelogProducer.LOOKUP) { + throw new UnsupportedOperationException( + String.format( + "'%s' property is not supported for '%s' changelog producer type.", + FULL_COMPACTION_DELTA_COMMITS.key(), ChangelogProducer.LOOKUP)); + } + checkArgument( options.snapshotNumRetainMin() > 0, SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1"); diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index 501b9f588dbc..262849d4265b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -122,4 +122,27 @@ public void testRecordLevelTimeField() { .hasMessageContaining( "The record level time field type should be one of INT, BIGINT, or TIMESTAMP, but field type is STRING."); } + + @Test + void testValidateFullCompactionDeltaCommitsWithLookupChangelogProducer() { + Map options = new HashMap<>(); + + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup"); + options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1"); + + assertThatThrownBy(() -> validateTableSchemaExec(options)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining( + "'full-compaction.delta-commits' property is not supported for 'lookup' changelog producer type"); + + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction"); + options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1"); + + assertThatNoException().isThrownBy(() -> validateTableSchemaExec(options)); + + options.clear(); + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup"); + + assertThatNoException().isThrownBy(() -> validateTableSchemaExec(options)); + } } From 85edfdf87c72317859b4ff8037ee9f293ec33b38 Mon Sep 17 00:00:00 2001 From: arnavb Date: Mon, 1 Sep 2025 14:27:47 +0000 Subject: [PATCH 2/2] update --- docs/content/primary-key-table/changelog-producer.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/primary-key-table/changelog-producer.md b/docs/content/primary-key-table/changelog-producer.md index 80d4e68a14a5..d472bddf9ee9 100644 --- a/docs/content/primary-key-table/changelog-producer.md +++ b/docs/content/primary-key-table/changelog-producer.md @@ -131,7 +131,7 @@ efficient as the input changelog producer and the latency to produce changelog m Full-compaction changelog-producer supports `changelog-producer.row-deduplicate` to avoid generating -U, +U changelog for the same record. -The `full-compaction.delta-commits` property is only supported with lookup changlog +The `full-compaction.delta-commits` property is only supported with full-compaction changlog producers. If you need both efficient lookup based changelog generation and periodic full compaction (e.g., Iceberg), `'full-compaction'` can be selected as the changelog producer.