diff --git a/lang/java/avro/src/main/java/org/apache/avro/Conversions.java b/lang/java/avro/src/main/java/org/apache/avro/Conversions.java index 1c28c9adb81..776b7eac2d5 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/Conversions.java +++ b/lang/java/avro/src/main/java/org/apache/avro/Conversions.java @@ -27,10 +27,13 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.Collection; import java.util.Map; import java.util.UUID; +import java.time.Duration; public class Conversions { @@ -146,6 +149,111 @@ private static BigDecimal validate(final LogicalTypes.Decimal decimal, BigDecima } } + public static class DurationConversion extends Conversion { + + private static final int MONTH_DAYS = 30; + private static final int GRANULARITY_BYTES = 4; + private static final String DOC_URL = "http://avro.apache.org/docs/current/spec.html#Duration"; + private static final String DOC_STR = "For more information on the duration logical type, please refer to " + DOC_URL; + + private static final byte[] EMPTY_BYTE_ARRAY = new byte[12]; + + private static byte[] toBytes(int value){ + return ByteBuffer.allocate(GRANULARITY_BYTES).order(ByteOrder.LITTLE_ENDIAN).putInt(value).array(); + } + + private static int toInt(byte[] bytes) { + return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getInt(); + } + + private static byte[] buildByteArray(int months, int days, int milliSeconds) { + byte[] monthBytes = toBytes(months); + byte[] dayBytes = toBytes(days); + byte[] milliSecondBytes = toBytes(milliSeconds); + + return ByteBuffer + .allocate(monthBytes.length + dayBytes.length + milliSecondBytes.length) + .put(monthBytes) + .put(dayBytes) + .put(milliSecondBytes) + .array(); + } + + @Override + public Class getConvertedType() { + return Duration.class; + } + + @Override + public String getLogicalTypeName() { + return "duration"; + } + + @Override + public Duration fromFixed(GenericFixed value, Schema schema, LogicalType type) { + byte[] payload = value.bytes(); + int payloadLength = payload.length; + + if (payloadLength != GRANULARITY_BYTES * 3) { + throw new IllegalArgumentException("Duration must be stored on a 12-byte long value, but " + payloadLength + " bytes given. " + DOC_STR); + } + + if (Arrays.equals(payload, EMPTY_BYTE_ARRAY)) { + return Duration.ZERO; + } + + byte[] monthBytes = new byte[]{ payload[0], payload[1], payload[2], payload[3] }; + byte[] dayBytes = new byte[]{ payload[4], payload[5], payload[6], payload[7] }; + byte[] milliSecondBytes = new byte[]{ payload[8], payload[9], payload[10], payload[11] }; + + int months = toInt(monthBytes); + int days = toInt(dayBytes); + int milliSeconds = toInt(milliSecondBytes); + + return Duration.ofDays((long) months * MONTH_DAYS + days).plusMillis(milliSeconds); + } + + @Override + public GenericFixed toFixed(Duration value, Schema schema, LogicalType type) { + + if (value == null || value.isZero()) { + return new GenericData.Fixed(schema, EMPTY_BYTE_ARRAY); + } + + long totalDays = value.toDays(); + + int months; + int days; + int milliSeconds; + + try { + months = (int) (totalDays / MONTH_DAYS); + } catch (Throwable e) { + throw new IllegalArgumentException("The months part of a duration must fit a 4-byte int, longer duration given. " + DOC_STR, e); + } + + try { + days = (int) (totalDays % MONTH_DAYS); + } catch (Throwable e) { + throw new IllegalArgumentException("The days part of a duration must fit a 4-byte int, longer duration given. " + DOC_STR, e); + } + + try { + milliSeconds = (int) value.minus(totalDays, ChronoUnit.DAYS).toMillis(); + } catch (Throwable e) { + throw new IllegalArgumentException("The milliseconds part of a duration must fit a 4-byte int, longer duration given. " + DOC_STR, e); + } + + if (value.getNano() - milliSeconds * 1_000_000 > 0) { + milliSeconds += 1; + } + + byte[] result = buildByteArray(months, days, milliSeconds); + + return new GenericData.Fixed(schema, result); + } + } + /** * Convert a underlying representation of a logical type (such as a ByteBuffer) * to a higher level object (such as a BigDecimal). diff --git a/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java b/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java index ef9c69dc511..35fddf7ede2 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java +++ b/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java @@ -102,6 +102,9 @@ private static LogicalType fromSchemaImpl(Schema schema, boolean throwErrors) { case LOCAL_TIMESTAMP_MILLIS: logicalType = LOCAL_TIMESTAMP_MILLIS_TYPE; break; + case DURATION: + logicalType = DURATION_TYPE; + break; default: final LogicalTypeFactory typeFactory = REGISTERED_TYPES.get(typeName); logicalType = (typeFactory == null) ? null : typeFactory.fromSchema(schema); @@ -134,6 +137,7 @@ private static LogicalType fromSchemaImpl(Schema schema, boolean throwErrors) { private static final String TIMESTAMP_MICROS = "timestamp-micros"; private static final String LOCAL_TIMESTAMP_MILLIS = "local-timestamp-millis"; private static final String LOCAL_TIMESTAMP_MICROS = "local-timestamp-micros"; + private static final String DURATION = "duration"; /** Create a Decimal LogicalType with the given precision and scale 0 */ public static Decimal decimal(int precision) { @@ -193,6 +197,13 @@ public static LocalTimestampMicros localTimestampMicros() { return LOCAL_TIMESTAMP_MICROS_TYPE; } + private static final Duration DURATION_TYPE = new Duration(); + + public static Duration duration() { + return DURATION_TYPE; + } + + /** Decimal represents arbitrary-precision fixed-scale decimal numbers */ public static class Decimal extends LogicalType { private static final String PRECISION_PROP = "precision"; @@ -410,4 +421,19 @@ public void validate(Schema schema) { } } + /** Duration represents an amount of time defined by a number of months, days and milliseconds */ + public static class Duration extends LogicalType { + private Duration() { + super(DURATION); + } + + @Override + public void validate(Schema schema) { + super.validate(schema); + if (schema.getType() != Schema.Type.FIXED || schema.getFixedSize() != 12) { + throw new IllegalArgumentException("Duration can only be used with an underlying fixed type of size 12"); + } + } + } + } diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestDurationConversion.java b/lang/java/avro/src/test/java/org/apache/avro/TestDurationConversion.java new file mode 100644 index 00000000000..1cd46d0118d --- /dev/null +++ b/lang/java/avro/src/test/java/org/apache/avro/TestDurationConversion.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro; + +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericFixed; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Arrays; + +import static java.math.RoundingMode.HALF_EVEN; +import static org.junit.Assert.*; + +/* + * TODO test for negative duration + * TODO test for empty byte array (0 duration) + */ +public class TestDurationConversion { + + private static final Conversion CONVERSION = new Conversions.DurationConversion(); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private Schema schema; + private LogicalType logicalType; + + @Before + public void setup() { + schema = Schema.createFixed("aFixed", null, null, 12); + schema.addProp("logicalType", "duration"); + logicalType = LogicalTypes.fromSchema(schema); + } + + @Test + public void testToFromFixed() { + final Duration value = Duration.ofDays(10).plusMillis(100); + final GenericFixed fixed = CONVERSION.toFixed(value, schema, logicalType); + final Duration result = CONVERSION.fromFixed(fixed, schema, logicalType); + assertEquals(value, result); + } + + @Test + public void testConvertingMillisecondsFromBytes() { + LogicalTypes.Duration duration = (LogicalTypes.Duration) logicalType; + + final Duration durationValue = Duration.ofMillis(200); + + final byte[] bytes = new byte[]{ + 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, + -0x38, 0x0, 0x0, 0x0 + }; + + final GenericFixed dd = new GenericData.Fixed(schema, bytes); + + final Duration fromBytes = CONVERSION.fromFixed(dd, schema, duration); + + assertEquals(durationValue, fromBytes); + } + + @Test + public void testConvertingDaysFromBytes() { + LogicalTypes.Duration duration = (LogicalTypes.Duration) logicalType; + + final Duration durationValue = Duration.ofDays(29); + + final byte[] bytes = new byte[]{ + 0x0, 0x0, 0x0, 0x0, + 0x1D, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0 + }; + + final GenericFixed dd = new GenericData.Fixed(schema, bytes); + + final Duration fromBytes = CONVERSION.fromFixed(dd, schema, duration); + + assertEquals(durationValue, fromBytes); + } + + @Test + public void testConvertingMonthsFromBytes() { + LogicalTypes.Duration duration = (LogicalTypes.Duration) logicalType; + + final Duration durationValue = Duration.ofDays(60); + + final byte[] bytes = new byte[]{ + 0x02, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0 + }; + + final GenericFixed dd = new GenericData.Fixed(schema, bytes); + + final Duration fromBytes = CONVERSION.fromFixed(dd, schema, duration); + + assertEquals(durationValue, fromBytes); + } + + @Test + public void testConvertingMillisecondsToBytes() { + LogicalTypes.Duration duration = (LogicalTypes.Duration) logicalType; + + final Duration durationValue = Duration.ofMillis(200); + + final byte[] bytes = new byte[]{ + 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, + -0x38, 0x0, 0x0, 0x0 + }; + + final GenericFixed fixed = CONVERSION.toFixed(durationValue, schema, duration); + + assertArrayEquals(bytes, fixed.bytes()); + } + + @Test + public void testConvertingMillisecondsWithNanosecondAdjustmentToBytes() { + LogicalTypes.Duration duration = (LogicalTypes.Duration) logicalType; + + final Duration durationValue = Duration.ofMillis(200).plusNanos(10); + + final byte[] bytes = new byte[]{ + 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, + -0x37, 0x0, 0x0, 0x0 + }; + + final GenericFixed fixed = CONVERSION.toFixed(durationValue, schema, duration); + + assertArrayEquals(bytes, fixed.bytes()); + } + + @Test + public void testConvertingDaysToBytes() { + LogicalTypes.Duration duration = (LogicalTypes.Duration) logicalType; + + final Duration durationValue = Duration.ofDays(29); + + final byte[] bytes = new byte[]{ + 0x0, 0x0, 0x0, 0x0, + 0x1D, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0 + }; + + final GenericFixed fixed = CONVERSION.toFixed(durationValue, schema, duration); + + assertArrayEquals(bytes, fixed.bytes()); + } + + @Test + public void testConvertingWeeksToBytes() { + LogicalTypes.Duration duration = (LogicalTypes.Duration) logicalType; + + final Duration durationValue = Duration.ofDays(60); + + final byte[] bytes = new byte[]{ + 0x02, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0 + }; + + final GenericFixed fixed = CONVERSION.toFixed(durationValue, schema, duration); + + assertArrayEquals(bytes, fixed.bytes()); + } +} diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestLogicalType.java b/lang/java/avro/src/test/java/org/apache/avro/TestLogicalType.java index d830db68775..22ff2045dde 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/TestLogicalType.java +++ b/lang/java/avro/src/test/java/org/apache/avro/TestLogicalType.java @@ -226,6 +226,34 @@ public void testLogicalTypeInSchemaEquals() { assertEqualsFalse("Different logical type", schema1, schema3); } + @Test + public void testDurationFromSchema() { + Schema schema = Schema.createFixed("aFixed", null, null, 12); + schema.addProp("logicalType", "duration"); + LogicalType logicalType = LogicalTypes.fromSchemaIgnoreInvalid(schema); + + Assert.assertTrue("Should be a Duration", logicalType instanceof LogicalTypes.Duration); + } + + @Test + public void testDurationValidation() { + assertThrows("Should reject too short type", IllegalArgumentException.class, + "Duration can only be used with an underlying fixed type of size 12", () -> { + LogicalTypes.duration().addToSchema( + Schema.createFixed("aFixed", null, null, 11) + ); + return null; + }); + + assertThrows("Should reject too long type", IllegalArgumentException.class, + "Duration can only be used with an underlying fixed type of size 12", () -> { + LogicalTypes.duration().addToSchema( + Schema.createFixed("anotherFixed", null, null, 13) + ); + return null; + }); + } + public static void assertEqualsTrue(String message, Object o1, Object o2) { Assert.assertTrue("Should be equal (forward): " + message, o1.equals(o2)); Assert.assertTrue("Should be equal (reverse): " + message, o2.equals(o1));