From b8a274e659560875663850b8593d01857fdd3c2f Mon Sep 17 00:00:00 2001 From: arnavb Date: Thu, 4 Sep 2025 15:21:22 +0000 Subject: [PATCH 1/3] update --- .../apache/parquet/avro/AvroConverters.java | 73 +++++++++++++++++++ .../parquet/avro/AvroRecordConverter.java | 14 ++++ .../apache/parquet/avro/TestReadWrite.java | 57 +++++++++++++++ 3 files changed, 144 insertions(+) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java index f3e8c21483..0f11c3871a 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java @@ -20,6 +20,8 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.math.BigDecimal; +import java.math.BigInteger; import java.nio.ByteBuffer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -29,6 +31,7 @@ import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveStringifier; import org.apache.parquet.schema.PrimitiveType; @@ -339,4 +342,74 @@ public String convert(Binary binary) { return stringifier.stringify(binary); } } + + static final class FieldDecimalIntConverter extends AvroPrimitiveConverter { + private final int scale; + private int[] dict = null; + + public FieldDecimalIntConverter(ParentValueContainer parent, PrimitiveType type) { + super(parent); + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType = + (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation(); + this.scale = decimalType.getScale(); + } + + @Override + public void addInt(int value) { + parent.add(new BigDecimal(BigInteger.valueOf(value), scale)); + } + + @Override + public boolean hasDictionarySupport() { + return true; + } + + @Override + public void setDictionary(Dictionary dictionary) { + dict = new int[dictionary.getMaxId() + 1]; + for (int i = 0; i <= dictionary.getMaxId(); i++) { + dict[i] = dictionary.decodeToInt(i); + } + } + + @Override + public void addValueFromDictionary(int dictionaryId) { + addInt(dict[dictionaryId]); + } + } + + static final class FieldDecimalLongConverter extends AvroPrimitiveConverter { + private final int scale; + private long[] dict = null; + + public FieldDecimalLongConverter(ParentValueContainer parent, PrimitiveType type) { + super(parent); + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType = + (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation(); + this.scale = decimalType.getScale(); + } + + @Override + public void addLong(long value) { + parent.add(new BigDecimal(BigInteger.valueOf(value), scale)); + } + + @Override + public boolean hasDictionarySupport() { + return true; + } + + @Override + public void setDictionary(Dictionary dictionary) { + dict = new long[dictionary.getMaxId() + 1]; + for (int i = 0; i <= dictionary.getMaxId(); i++) { + dict[i] = dictionary.decodeToLong(i); + } + } + + @Override + public void addValueFromDictionary(int dictionaryId) { + addLong(dict[dictionaryId]); + } + } } diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java index 340dc77220..66ffe64f6b 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java @@ -337,6 +337,14 @@ private static Converter newConverter( return newConverter(schema, type, model, null, setter, validator); } + private static boolean isDecimalType(Type type) { + if (!type.isPrimitive()) { + return false; + } + LogicalTypeAnnotation annotation = type.getLogicalTypeAnnotation(); + return annotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; + } + private static Converter newConverter( Schema schema, Type type, @@ -359,6 +367,9 @@ private static Converter newConverter( case BOOLEAN: return new AvroConverters.FieldBooleanConverter(parent); case INT: + if (isDecimalType(type)) { + return new AvroConverters.FieldDecimalIntConverter(parent, type.asPrimitiveType()); + } Class intDatumClass = getDatumClass(conversion, knownClass, schema, model); if (intDatumClass == null) { return new AvroConverters.FieldIntegerConverter(parent); @@ -374,6 +385,9 @@ private static Converter newConverter( } return new AvroConverters.FieldIntegerConverter(parent); case LONG: + if (isDecimalType(type)) { + return new AvroConverters.FieldDecimalLongConverter(parent, type.asPrimitiveType()); + } return new AvroConverters.FieldLongConverter(parent); case FLOAT: return new AvroConverters.FieldFloatConverter(parent); diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index a8cb1214ac..bd95b17a5e 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -19,6 +19,8 @@ package org.apache.parquet.avro; import static org.apache.parquet.avro.AvroTestUtil.optional; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -61,9 +63,12 @@ import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.conf.PlainParquetConfiguration; import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.util.HadoopCodecs; import org.apache.parquet.io.InputFile; @@ -71,7 +76,10 @@ import org.apache.parquet.io.LocalOutputFile; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.PrimitiveType; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -400,6 +408,55 @@ public void testFixedDecimalValues() throws Exception { Assert.assertEquals("Content should match", expected, records); } + @Test + public void testDecimalInt64Values() throws Exception { + + File file = temp.newFile("test_decimal_int64_values.parquet"); + file.delete(); + Path path = new Path(file.toString()); + + MessageType parquetSchema = new MessageType( + "test_decimal_int64_values", + new PrimitiveType(REQUIRED, INT64, "decimal_salary").withLogicalTypeAnnotation(LogicalTypeAnnotation.decimalType(1, 10))); + + try (ParquetWriter writer = + ExampleParquetWriter.builder(path).withType(parquetSchema).build()) { + + GroupFactory factory = new SimpleGroupFactory(parquetSchema); + + Group group1 = factory.newGroup(); + group1.add("decimal_salary", 234L); + writer.write(group1); + + Group group2 = factory.newGroup(); + group2.add("decimal_salary", 1203L); + writer.write(group2); + } + + GenericData decimalSupport = new GenericData(); + decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion()); + + List records = Lists.newArrayList(); + try (ParquetReader reader = AvroParquetReader.builder(path) + .withDataModel(decimalSupport) + .build()) { + GenericRecord rec; + while ((rec = reader.read()) != null) { + records.add(rec); + } + } + + Assert.assertEquals("Should read 2 records", 2, records.size()); + + Object firstSalary = records.get(0).get("decimal_salary"); + Object secondSalary = records.get(1).get("decimal_salary"); + + Assert.assertTrue( + "Should be BigDecimal, but is " + firstSalary.getClass(), firstSalary instanceof BigDecimal); + Assert.assertEquals("Should be 23.4, but is " + firstSalary, new BigDecimal("23.4"), firstSalary); + Assert.assertEquals("Should be 120.3, but is " + secondSalary, new BigDecimal("120.3"), secondSalary); + } + @Test public void testAll() throws Exception { Schema schema = From 2454f473a37847e4bd421de60264539b6ad8cc94 Mon Sep 17 00:00:00 2001 From: arnavb Date: Thu, 4 Sep 2025 16:15:53 +0000 Subject: [PATCH 2/3] lint --- .../test/java/org/apache/parquet/avro/TestReadWrite.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index bd95b17a5e..034d844957 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -417,7 +417,8 @@ public void testDecimalInt64Values() throws Exception { MessageType parquetSchema = new MessageType( "test_decimal_int64_values", - new PrimitiveType(REQUIRED, INT64, "decimal_salary").withLogicalTypeAnnotation(LogicalTypeAnnotation.decimalType(1, 10))); + new PrimitiveType(REQUIRED, INT64, "decimal_salary") + .withLogicalTypeAnnotation(LogicalTypeAnnotation.decimalType(1, 10))); try (ParquetWriter writer = ExampleParquetWriter.builder(path).withType(parquetSchema).build()) { @@ -451,8 +452,7 @@ public void testDecimalInt64Values() throws Exception { Object firstSalary = records.get(0).get("decimal_salary"); Object secondSalary = records.get(1).get("decimal_salary"); - Assert.assertTrue( - "Should be BigDecimal, but is " + firstSalary.getClass(), firstSalary instanceof BigDecimal); + Assert.assertTrue("Should be BigDecimal, but is " + firstSalary.getClass(), firstSalary instanceof BigDecimal); Assert.assertEquals("Should be 23.4, but is " + firstSalary, new BigDecimal("23.4"), firstSalary); Assert.assertEquals("Should be 120.3, but is " + secondSalary, new BigDecimal("120.3"), secondSalary); } From e2454a0de8e8cc84170daa216c180024aac1f304 Mon Sep 17 00:00:00 2001 From: arnavb Date: Fri, 5 Sep 2025 13:22:50 +0000 Subject: [PATCH 3/3] update --- .../apache/parquet/avro/AvroConverters.java | 38 ------------------- .../apache/parquet/avro/TestReadWrite.java | 20 ++++++++-- 2 files changed, 17 insertions(+), 41 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java index 0f11c3871a..e34cc9b0b2 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java @@ -345,7 +345,6 @@ public String convert(Binary binary) { static final class FieldDecimalIntConverter extends AvroPrimitiveConverter { private final int scale; - private int[] dict = null; public FieldDecimalIntConverter(ParentValueContainer parent, PrimitiveType type) { super(parent); @@ -358,29 +357,10 @@ public FieldDecimalIntConverter(ParentValueContainer parent, PrimitiveType type) public void addInt(int value) { parent.add(new BigDecimal(BigInteger.valueOf(value), scale)); } - - @Override - public boolean hasDictionarySupport() { - return true; - } - - @Override - public void setDictionary(Dictionary dictionary) { - dict = new int[dictionary.getMaxId() + 1]; - for (int i = 0; i <= dictionary.getMaxId(); i++) { - dict[i] = dictionary.decodeToInt(i); - } - } - - @Override - public void addValueFromDictionary(int dictionaryId) { - addInt(dict[dictionaryId]); - } } static final class FieldDecimalLongConverter extends AvroPrimitiveConverter { private final int scale; - private long[] dict = null; public FieldDecimalLongConverter(ParentValueContainer parent, PrimitiveType type) { super(parent); @@ -393,23 +373,5 @@ public FieldDecimalLongConverter(ParentValueContainer parent, PrimitiveType type public void addLong(long value) { parent.add(new BigDecimal(BigInteger.valueOf(value), scale)); } - - @Override - public boolean hasDictionarySupport() { - return true; - } - - @Override - public void setDictionary(Dictionary dictionary) { - dict = new long[dictionary.getMaxId() + 1]; - for (int i = 0; i <= dictionary.getMaxId(); i++) { - dict[i] = dictionary.decodeToLong(i); - } - } - - @Override - public void addValueFromDictionary(int dictionaryId) { - addLong(dict[dictionaryId]); - } } } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index 034d844957..4fb5b72b44 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -19,6 +19,7 @@ package org.apache.parquet.avro; import static org.apache.parquet.avro.AvroTestUtil.optional; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.junit.Assert.assertEquals; @@ -409,14 +410,16 @@ public void testFixedDecimalValues() throws Exception { } @Test - public void testDecimalInt64Values() throws Exception { + public void testDecimalIntegerValues() throws Exception { - File file = temp.newFile("test_decimal_int64_values.parquet"); + File file = temp.newFile("test_decimal_integer_values.parquet"); file.delete(); Path path = new Path(file.toString()); MessageType parquetSchema = new MessageType( - "test_decimal_int64_values", + "test_decimal_integer_values", + new PrimitiveType(REQUIRED, INT32, "decimal_age") + .withLogicalTypeAnnotation(LogicalTypeAnnotation.decimalType(2, 5)), new PrimitiveType(REQUIRED, INT64, "decimal_salary") .withLogicalTypeAnnotation(LogicalTypeAnnotation.decimalType(1, 10))); @@ -426,10 +429,12 @@ public void testDecimalInt64Values() throws Exception { GroupFactory factory = new SimpleGroupFactory(parquetSchema); Group group1 = factory.newGroup(); + group1.add("decimal_age", 2534); group1.add("decimal_salary", 234L); writer.write(group1); Group group2 = factory.newGroup(); + group2.add("decimal_age", 4267); group2.add("decimal_salary", 1203L); writer.write(group2); } @@ -449,6 +454,15 @@ public void testDecimalInt64Values() throws Exception { Assert.assertEquals("Should read 2 records", 2, records.size()); + // INT32 values + Object firstAge = records.get(0).get("decimal_age"); + Object secondAge = records.get(1).get("decimal_age"); + + Assert.assertTrue("Should be BigDecimal, but is " + firstAge.getClass(), firstAge instanceof BigDecimal); + Assert.assertEquals("Should be 25.34, but is " + firstAge, new BigDecimal("25.34"), firstAge); + Assert.assertEquals("Should be 42.67, but is " + secondAge, new BigDecimal("42.67"), secondAge); + + // INT64 values Object firstSalary = records.get(0).get("decimal_salary"); Object secondSalary = records.get(1).get("decimal_salary");