Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ protected Schema createFieldSchema(Field field, Map<String, Schema> names) {

private static final ReflectData INSTANCE = new ReflectData();

static {
addLogicalTypeConversions(INSTANCE);
}

/** For subclasses. Applications normally use {@link ReflectData#get()}. */
public ReflectData() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Conversions;
import org.apache.avro.Protocol;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
Expand Down Expand Up @@ -58,6 +60,28 @@ public class SpecificData extends GenericData {

private static final SpecificData INSTANCE = new SpecificData();

static {
addLogicalTypeConversions(INSTANCE);
}

protected static void addLogicalTypeConversions(SpecificData instance) {
instance.addLogicalTypeConversion(new Conversions.UUIDConversion());
// Disable DecimalConversion since it's gated behind
// `compiler.setEnableDecimalLogicalType`
// INSTANCE.addLogicalTypeConversion(new Conversions.DecimalConversion());
instance.addLogicalTypeConversion(new Conversions.BigDecimalConversion());
instance.addLogicalTypeConversion(new Conversions.DurationConversion());
instance.addLogicalTypeConversion(new TimeConversions.DateConversion());
instance.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
instance.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
instance.addLogicalTypeConversion(new TimeConversions.LocalTimestampNanosConversion());
instance.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
instance.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
instance.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
instance.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
instance.addLogicalTypeConversion(new TimeConversions.TimestampNanosConversion());
}

private static final Class<?>[] NO_ARG = new Class[] {};
private static final Class<?>[] SCHEMA_ARG = new Class[] { Schema.class };

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,36 +423,37 @@ void readUUIDMissingLogicalTypeReflect() throws IOException {
r1.uuid = u1.toString();

File test = write(ReflectData.get().getSchema(RecordWithStringUUID.class), r1);
assertThrows(IllegalArgumentException.class,
() -> read(ReflectData.get().createDatumReader(uuidSchema), test).get(0));
RecordWithUUID result = (RecordWithUUID) read(ReflectData.get().createDatumReader(uuidSchema), test).get(0);
assertEquals(u1, result.uuid);
}

@Test
void writeUUIDMissingLogicalType() throws IOException {
assertThrows(DataFileWriter.AppendWriteException.class, () -> {
Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName()).fields().requiredString("uuid")
.endRecord();
LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName()).fields().requiredString("uuid")
.endRecord();
LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());

UUID u1 = UUID.randomUUID();
UUID u2 = UUID.randomUUID();
UUID u1 = UUID.randomUUID();
UUID u2 = UUID.randomUUID();

RecordWithUUID r1 = new RecordWithUUID();
r1.uuid = u1;
RecordWithUUID r2 = new RecordWithUUID();
r2.uuid = u2;
RecordWithUUID r1 = new RecordWithUUID();
r1.uuid = u1;
RecordWithUUID r2 = new RecordWithUUID();
r2.uuid = u2;

// write without using REFLECT, which has the logical type
File test = write(uuidSchema, r1, r2);
// write without using REFLECT, which has the logical type
File test = write(uuidSchema, r1, r2);

// verify that the field's type overrides the logical type
Schema uuidStringSchema = SchemaBuilder.record(RecordWithStringUUID.class.getName()).fields()
.requiredString("uuid").endRecord();
// verify that the field's type overrides the logical type
Schema uuidStringSchema = SchemaBuilder.record(RecordWithStringUUID.class.getName()).fields().requiredString("uuid")
.endRecord();

// this fails with an AppendWriteException wrapping ClassCastException
// because the UUID isn't converted to a CharSequence expected internally
read(ReflectData.get().createDatumReader(uuidStringSchema), test);
});
// this fails with an AppendWriteException wrapping ClassCastException
// because the UUID isn't converted to a CharSequence expected internally
List<RecordWithStringUUID> items = (List<RecordWithStringUUID>) read(
ReflectData.get().createDatumReader(uuidStringSchema), test);
assertEquals(r1.uuid.toString(), items.get(0).uuid);
assertEquals(r2.uuid.toString(), items.get(1).uuid);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.io.File;
import java.io.IOException;
import java.time.Instant;

import example.avro.Bar;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificDatumReader;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestDataFileReflect {

@TempDir
public File DIR;

@Test
public void reflectDatumReaderUnionWithLogicalType() throws IOException {
File file = new File(DIR.getPath(), "testReflectDatumReaderUnionWithLogicalType");
Schema schema = Bar.SCHEMA$;
// Create test data
Instant value = Instant.now();
try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(
new GenericDatumWriter<GenericData.Record>(schema)).create(schema, file)) {
for (int i = 0; i < 10; i++) {
GenericData.Record r = new GenericData.Record(schema);
r.put("title", "title" + i);
r.put("created_at", value.toEpochMilli() + i * 1000);
writer.append(r);
}
}

// read using a 'new ReflectDatumReader<T>()' to force inference of
// reader's schema from runtime
try (DataFileReader<Bar> reader = new DataFileReader<>(file, new ReflectDatumReader<>())) {
int i = 0;
for (Bar instance : reader) {
assertEquals("title" + i, instance.getTitle());
assertEquals(Instant.ofEpochMilli(value.plusSeconds(i).toEpochMilli()), instance.getCreatedAt());
i++;
}
assertEquals(10, i);
}
}

@Test
public void reflectDatumWriterUnionWithLogicalType() throws IOException {
File file = new File(DIR.getPath(), "testReflectDatumWriterUnionWithLogicalType");

// Create test data
Instant value = Instant.now();
try (DataFileWriter<Bar> writer = new DataFileWriter<>(new ReflectDatumWriter<Bar>()).create(Bar.SCHEMA$, file)) {
for (int i = 0; i < 10; i++) {
Bar r = Bar.newBuilder().setTitle("title" + i).setCreatedAt(value.plusSeconds(i)).build();
writer.append(r);
}
}

// read using a 'new SpecificDatumReader<T>()' to force inference of
// reader's schema from runtime
try (DataFileReader<Bar> reader = new DataFileReader<>(file, new SpecificDatumReader<>())) {
int i = 0;
for (Bar instance : reader) {
assertEquals("title" + i, instance.getTitle());
assertEquals(Instant.ofEpochMilli(value.plusSeconds(i).toEpochMilli()), instance.getCreatedAt());
i++;
}
assertEquals(10, i);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@

import java.io.File;
import java.io.IOException;
import java.time.Instant;

import example.avro.Bar;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.specific.SpecificDatumReader;

import org.apache.avro.specific.SpecificDatumWriter;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand Down Expand Up @@ -70,4 +73,60 @@ void specificDatumReaderDefaultCtor() throws IOException {
}
}

@Test
public void specificDatumReaderUnionWithLogicalType() throws IOException {
File file = new File(DIR.getPath(), "testSpecificDatumReaderUnionWithLogicalType");
Schema schema = Bar.SCHEMA$;

// Create test data
Instant value = Instant.now();
try (DataFileWriter<Record> writer = new DataFileWriter<>(new GenericDatumWriter<Record>(schema)).create(schema,
file)) {
for (int i = 0; i < 10; i++) {
Record r = new Record(schema);
r.put("title", "title" + i);
r.put("created_at", value.toEpochMilli() + i * 1000);
writer.append(r);
}
}

// read using a 'new SpecificDatumReader<T>()' to force inference of
// reader's schema from runtime
try (DataFileReader<Bar> reader = new DataFileReader<>(file, new SpecificDatumReader<>())) {
int i = 0;
for (Bar instance : reader) {
assertEquals("title" + i, instance.getTitle());
assertEquals(Instant.ofEpochMilli(value.plusSeconds(i).toEpochMilli()), instance.getCreatedAt());
i++;
}
assertEquals(10, i);
}
}

@Test
public void specificDatumWriterUnionWithLogicalType() throws IOException {
File file = new File(DIR.getPath(), "testSpecificDatumWriterUnionWithLogicalType");
Schema schema = Bar.SCHEMA$;

// Create test data
Instant value = Instant.now();
try (DataFileWriter<Bar> writer = new DataFileWriter<>(new SpecificDatumWriter<Bar>()).create(schema, file)) {
for (int i = 0; i < 10; i++) {
Bar r = Bar.newBuilder().setTitle("title" + i).setCreatedAt(value.plusSeconds(i)).build();
writer.append(r);
}
}

// read using a 'new SpecificDatumReader<T>()' to force inference of
// reader's schema from runtime
try (DataFileReader<Bar> reader = new DataFileReader<>(file, new SpecificDatumReader<>())) {
int i = 0;
for (Bar instance : reader) {
assertEquals("title" + i, instance.getTitle());
assertEquals(Instant.ofEpochMilli(value.plusSeconds(i).toEpochMilli()), instance.getCreatedAt());
i++;
}
assertEquals(10, i);
}
}
}
21 changes: 21 additions & 0 deletions share/test/schemas/fooBar.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"fields" : [
{
"name" : "title",
"type" : "string"
},
{
"name" : "created_at",
"type" : [
"null",
{
"logicalType" : "timestamp-millis",
"type" : "long"
}
]
}
],
"name" : "Bar",
"namespace" : "example.avro",
"type" : "record"
}