diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 3d02ede61743..e6657c850634 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -78,6 +78,7 @@ import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN; import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE; import static org.apache.paimon.format.FileFormat.vectorFileFormat; +import static org.apache.paimon.schema.TableSchema.PAIMON_07_VERSION; import static org.apache.paimon.table.PrimaryKeyTableUtils.createMergeFunctionFactory; import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX; import static org.apache.paimon.table.SpecialFields.SYSTEM_FIELD_NAMES; @@ -681,7 +682,9 @@ private static void validateBucket(TableSchema schema, CoreOptions options) { } else if (bucket < 1 && !isPostponeBucketTable(schema, bucket)) { throw new RuntimeException("The number of buckets needs to be greater than 0."); } else { - if (schema.primaryKeys().isEmpty() && schema.bucketKeys().isEmpty()) { + if (schema.primaryKeys().isEmpty() + && schema.bucketKeys().isEmpty() + && (bucket != 1 || schema.version() != PAIMON_07_VERSION)) { throw new RuntimeException( "You should define a 'bucket-key' for bucketed append mode."); } diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index c3a79d91fdf1..14652ec883c2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -40,6 +40,8 @@ import static org.apache.paimon.CoreOptions.VECTOR_FIELD; import static org.apache.paimon.CoreOptions.VECTOR_FILE_FORMAT; import static org.apache.paimon.schema.SchemaValidation.validateTableSchema; +import static org.apache.paimon.schema.TableSchema.CURRENT_VERSION; +import static org.apache.paimon.schema.TableSchema.PAIMON_07_VERSION; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -545,6 +547,69 @@ public void testMergeOnReadCoexistsWithVisibilityCallbackAndPostponeBucket() { .doesNotThrowAnyException(); } + @Test + public void testBucketAppendBackwardCompatibility() { + List fields = + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.STRING())); + + Map legacyOptions = new HashMap<>(); + legacyOptions.put(BUCKET.key(), "1"); + + TableSchema legacySchema = + new TableSchema( + PAIMON_07_VERSION, + 0L, + fields, + 1, + emptyList(), + emptyList(), + legacyOptions, + "", + 0L); + + assertThatCode(() -> validateTableSchema(legacySchema)).doesNotThrowAnyException(); + + Map currentOptions = new HashMap<>(); + currentOptions.put(BUCKET.key(), "1"); + + TableSchema currentSchema = + new TableSchema( + CURRENT_VERSION, + 0L, + fields, + 1, + emptyList(), + emptyList(), + currentOptions, + "", + 0L); + + assertThatThrownBy(() -> validateTableSchema(currentSchema)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("bucket-key"); + + Map legacyMultiBucketOptions = new HashMap<>(); + legacyMultiBucketOptions.put(BUCKET.key(), "2"); + + TableSchema legacyMultiBucketSchema = + new TableSchema( + PAIMON_07_VERSION, + 0L, + fields, + 1, + emptyList(), + emptyList(), + legacyMultiBucketOptions, + "", + 0L); + + assertThatThrownBy(() -> validateTableSchema(legacyMultiBucketSchema)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("bucket-key"); + } + @Test public void testMergeOnReadRequiresDvEnabled() { Map options = new HashMap<>();