diff --git a/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java b/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java index ab702a60e87..c8c1d92d1d3 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java +++ b/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java @@ -104,6 +104,10 @@ protected Schema createFieldSchema(Field field, Map names) { private static final ReflectData INSTANCE = new ReflectData(); + static { + addLogicalTypeConversions(INSTANCE); + } + /** For subclasses. Applications normally use {@link ReflectData#get()}. */ public ReflectData() { } diff --git a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java index c7b5eaed8a7..83a9f76ffaf 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java +++ b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java @@ -19,9 +19,11 @@ import org.apache.avro.AvroRuntimeException; import org.apache.avro.AvroTypeException; +import org.apache.avro.Conversions; import org.apache.avro.Protocol; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; +import org.apache.avro.data.TimeConversions; import org.apache.avro.generic.GenericData; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryEncoder; @@ -58,6 +60,28 @@ public class SpecificData extends GenericData { private static final SpecificData INSTANCE = new SpecificData(); + static { + addLogicalTypeConversions(INSTANCE); + } + + protected static void addLogicalTypeConversions(SpecificData instance) { + instance.addLogicalTypeConversion(new Conversions.UUIDConversion()); + // Disable DecimalConversion since it's gated behind + // `compiler.setEnableDecimalLogicalType` + // INSTANCE.addLogicalTypeConversion(new Conversions.DecimalConversion()); + instance.addLogicalTypeConversion(new Conversions.BigDecimalConversion()); + instance.addLogicalTypeConversion(new Conversions.DurationConversion()); + instance.addLogicalTypeConversion(new TimeConversions.DateConversion()); + instance.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion()); + instance.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion()); + instance.addLogicalTypeConversion(new TimeConversions.LocalTimestampNanosConversion()); + instance.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion()); + instance.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); + instance.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); + instance.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); + instance.addLogicalTypeConversion(new TimeConversions.TimestampNanosConversion()); + } + private static final Class[] NO_ARG = new Class[] {}; private static final Class[] SCHEMA_ARG = new Class[] { Schema.class }; diff --git a/lang/java/avro/src/test/java/org/apache/avro/reflect/TestReflectLogicalTypes.java b/lang/java/avro/src/test/java/org/apache/avro/reflect/TestReflectLogicalTypes.java index 851ab95e3ea..d95f83f7abb 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/reflect/TestReflectLogicalTypes.java +++ b/lang/java/avro/src/test/java/org/apache/avro/reflect/TestReflectLogicalTypes.java @@ -423,36 +423,37 @@ void readUUIDMissingLogicalTypeReflect() throws IOException { r1.uuid = u1.toString(); File test = write(ReflectData.get().getSchema(RecordWithStringUUID.class), r1); - assertThrows(IllegalArgumentException.class, - () -> read(ReflectData.get().createDatumReader(uuidSchema), test).get(0)); + RecordWithUUID result = (RecordWithUUID) read(ReflectData.get().createDatumReader(uuidSchema), test).get(0); + assertEquals(u1, result.uuid); } @Test void writeUUIDMissingLogicalType() throws IOException { - assertThrows(DataFileWriter.AppendWriteException.class, () -> { - Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName()).fields().requiredString("uuid") - .endRecord(); - LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema()); + Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName()).fields().requiredString("uuid") + .endRecord(); + LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema()); - UUID u1 = UUID.randomUUID(); - UUID u2 = UUID.randomUUID(); + UUID u1 = UUID.randomUUID(); + UUID u2 = UUID.randomUUID(); - RecordWithUUID r1 = new RecordWithUUID(); - r1.uuid = u1; - RecordWithUUID r2 = new RecordWithUUID(); - r2.uuid = u2; + RecordWithUUID r1 = new RecordWithUUID(); + r1.uuid = u1; + RecordWithUUID r2 = new RecordWithUUID(); + r2.uuid = u2; - // write without using REFLECT, which has the logical type - File test = write(uuidSchema, r1, r2); + // write without using REFLECT, which has the logical type + File test = write(uuidSchema, r1, r2); - // verify that the field's type overrides the logical type - Schema uuidStringSchema = SchemaBuilder.record(RecordWithStringUUID.class.getName()).fields() - .requiredString("uuid").endRecord(); + // verify that the field's type overrides the logical type + Schema uuidStringSchema = SchemaBuilder.record(RecordWithStringUUID.class.getName()).fields().requiredString("uuid") + .endRecord(); - // this fails with an AppendWriteException wrapping ClassCastException - // because the UUID isn't converted to a CharSequence expected internally - read(ReflectData.get().createDatumReader(uuidStringSchema), test); - }); + // this fails with an AppendWriteException wrapping ClassCastException + // because the UUID isn't converted to a CharSequence expected internally + List items = (List) read( + ReflectData.get().createDatumReader(uuidStringSchema), test); + assertEquals(r1.uuid.toString(), items.get(0).uuid); + assertEquals(r2.uuid.toString(), items.get(1).uuid); } @Test diff --git a/lang/java/ipc/src/test/java/org/apache/avro/TestDataFileReflect.java b/lang/java/ipc/src/test/java/org/apache/avro/TestDataFileReflect.java new file mode 100644 index 00000000000..4585d33fb1a --- /dev/null +++ b/lang/java/ipc/src/test/java/org/apache/avro/TestDataFileReflect.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.time.Instant; + +import example.avro.Bar; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificDatumReader; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestDataFileReflect { + + @TempDir + public File DIR; + + @Test + public void reflectDatumReaderUnionWithLogicalType() throws IOException { + File file = new File(DIR.getPath(), "testReflectDatumReaderUnionWithLogicalType"); + Schema schema = Bar.SCHEMA$; + // Create test data + Instant value = Instant.now(); + try (DataFileWriter writer = new DataFileWriter<>( + new GenericDatumWriter(schema)).create(schema, file)) { + for (int i = 0; i < 10; i++) { + GenericData.Record r = new GenericData.Record(schema); + r.put("title", "title" + i); + r.put("created_at", value.toEpochMilli() + i * 1000); + writer.append(r); + } + } + + // read using a 'new ReflectDatumReader()' to force inference of + // reader's schema from runtime + try (DataFileReader reader = new DataFileReader<>(file, new ReflectDatumReader<>())) { + int i = 0; + for (Bar instance : reader) { + assertEquals("title" + i, instance.getTitle()); + assertEquals(Instant.ofEpochMilli(value.plusSeconds(i).toEpochMilli()), instance.getCreatedAt()); + i++; + } + assertEquals(10, i); + } + } + + @Test + public void reflectDatumWriterUnionWithLogicalType() throws IOException { + File file = new File(DIR.getPath(), "testReflectDatumWriterUnionWithLogicalType"); + + // Create test data + Instant value = Instant.now(); + try (DataFileWriter writer = new DataFileWriter<>(new ReflectDatumWriter()).create(Bar.SCHEMA$, file)) { + for (int i = 0; i < 10; i++) { + Bar r = Bar.newBuilder().setTitle("title" + i).setCreatedAt(value.plusSeconds(i)).build(); + writer.append(r); + } + } + + // read using a 'new SpecificDatumReader()' to force inference of + // reader's schema from runtime + try (DataFileReader reader = new DataFileReader<>(file, new SpecificDatumReader<>())) { + int i = 0; + for (Bar instance : reader) { + assertEquals("title" + i, instance.getTitle()); + assertEquals(Instant.ofEpochMilli(value.plusSeconds(i).toEpochMilli()), instance.getCreatedAt()); + i++; + } + assertEquals(10, i); + } + } +} diff --git a/lang/java/ipc/src/test/java/org/apache/avro/TestDataFileSpecific.java b/lang/java/ipc/src/test/java/org/apache/avro/TestDataFileSpecific.java index f591e9e16c5..fe5ec65a7ab 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/TestDataFileSpecific.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/TestDataFileSpecific.java @@ -21,13 +21,16 @@ import java.io.File; import java.io.IOException; +import java.time.Instant; +import example.avro.Bar; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -70,4 +73,60 @@ void specificDatumReaderDefaultCtor() throws IOException { } } + @Test + public void specificDatumReaderUnionWithLogicalType() throws IOException { + File file = new File(DIR.getPath(), "testSpecificDatumReaderUnionWithLogicalType"); + Schema schema = Bar.SCHEMA$; + + // Create test data + Instant value = Instant.now(); + try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter(schema)).create(schema, + file)) { + for (int i = 0; i < 10; i++) { + Record r = new Record(schema); + r.put("title", "title" + i); + r.put("created_at", value.toEpochMilli() + i * 1000); + writer.append(r); + } + } + + // read using a 'new SpecificDatumReader()' to force inference of + // reader's schema from runtime + try (DataFileReader reader = new DataFileReader<>(file, new SpecificDatumReader<>())) { + int i = 0; + for (Bar instance : reader) { + assertEquals("title" + i, instance.getTitle()); + assertEquals(Instant.ofEpochMilli(value.plusSeconds(i).toEpochMilli()), instance.getCreatedAt()); + i++; + } + assertEquals(10, i); + } + } + + @Test + public void specificDatumWriterUnionWithLogicalType() throws IOException { + File file = new File(DIR.getPath(), "testSpecificDatumWriterUnionWithLogicalType"); + Schema schema = Bar.SCHEMA$; + + // Create test data + Instant value = Instant.now(); + try (DataFileWriter writer = new DataFileWriter<>(new SpecificDatumWriter()).create(schema, file)) { + for (int i = 0; i < 10; i++) { + Bar r = Bar.newBuilder().setTitle("title" + i).setCreatedAt(value.plusSeconds(i)).build(); + writer.append(r); + } + } + + // read using a 'new SpecificDatumReader()' to force inference of + // reader's schema from runtime + try (DataFileReader reader = new DataFileReader<>(file, new SpecificDatumReader<>())) { + int i = 0; + for (Bar instance : reader) { + assertEquals("title" + i, instance.getTitle()); + assertEquals(Instant.ofEpochMilli(value.plusSeconds(i).toEpochMilli()), instance.getCreatedAt()); + i++; + } + assertEquals(10, i); + } + } } diff --git a/share/test/schemas/fooBar.avsc b/share/test/schemas/fooBar.avsc new file mode 100644 index 00000000000..d0d1b383730 --- /dev/null +++ b/share/test/schemas/fooBar.avsc @@ -0,0 +1,21 @@ +{ + "fields" : [ + { + "name" : "title", + "type" : "string" + }, + { + "name" : "created_at", + "type" : [ + "null", + { + "logicalType" : "timestamp-millis", + "type" : "long" + } + ] + } + ], + "name" : "Bar", + "namespace" : "example.avro", + "type" : "record" +}