diff --git a/arrow/extensions/extensions.go b/arrow/extensions/extensions.go index 04566c75..6f13aa64 100644 --- a/arrow/extensions/extensions.go +++ b/arrow/extensions/extensions.go @@ -21,11 +21,12 @@ import ( ) var canonicalExtensionTypes = []arrow.ExtensionType{ - NewBool8Type(), - NewUUIDType(), - &OpaqueType{}, &JSONType{}, + &OpaqueType{}, + &TimestampWithOffsetType{}, &VariantType{}, + NewBool8Type(), + NewUUIDType(), } func init() { diff --git a/arrow/extensions/timestamp_with_offset.go b/arrow/extensions/timestamp_with_offset.go new file mode 100644 index 00000000..c98885f2 --- /dev/null +++ b/arrow/extensions/timestamp_with_offset.go @@ -0,0 +1,568 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package extensions + +import ( + "errors" + "fmt" + "math" + "reflect" + "strings" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/internal/json" +) + +// TimestampWithOffsetType represents a timestamp column that stores a timezone offset per row instead of +// applying the same timezone offset to the entire column. +type TimestampWithOffsetType struct { + arrow.ExtensionBase +} + +func isOffsetTypeOk(offsetType arrow.DataType) bool { + switch offsetType := offsetType.(type) { + case *arrow.Int16Type: + return true + case *arrow.DictionaryType: + return arrow.IsInteger(offsetType.IndexType.ID()) && arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16) + case *arrow.RunEndEncodedType: + return offsetType.ValidRunEndsType(offsetType.RunEnds()) && + arrow.TypeEqual(offsetType.Encoded(), arrow.PrimitiveTypes.Int16) + // FIXME: Technically this should be non-nullable, but a Arrow IPC does not deserialize + // ValueNullable properly, so enforcing this here would always fail when reading from an IPC + // stream + // !offsetType.ValueNullable + default: + return false + } +} + +// Whether the storageType is compatible with TimestampWithOffset. +// +// Returns (time_unit, offset_type, ok). If ok is false, time_unit and offset_type are garbage. +func isDataTypeCompatible(storageType arrow.DataType) (arrow.TimeUnit, arrow.DataType, bool) { + timeUnit := arrow.Second + offsetType := arrow.PrimitiveTypes.Int16 + switch t := storageType.(type) { + case *arrow.StructType: + if t.NumFields() != 2 { + return timeUnit, offsetType, false + } + + maybeTimestamp := t.Field(0) + maybeOffset := t.Field(1) + + timestampOk := false + switch timestampType := maybeTimestamp.Type.(type) { + case *arrow.TimestampType: + if timestampType.TimeZone == "UTC" { + timestampOk = true + timeUnit = timestampType.TimeUnit() + } + default: + } + + offsetOk := isOffsetTypeOk(maybeOffset.Type) + + ok := maybeTimestamp.Name == "timestamp" && + timestampOk && + !maybeTimestamp.Nullable && + maybeOffset.Name == "offset_minutes" && + offsetOk && + !maybeOffset.Nullable + + return timeUnit, maybeOffset.Type, ok + default: + return timeUnit, offsetType, false + } +} + +// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the underlying storage type set correctly to +// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any TimeUnit and O is a valid offset type. +// +// The error will be populated if the data type is not a valid encoding of the offsets field. +func NewTimestampWithOffsetType(unit arrow.TimeUnit, offsetType arrow.DataType) (*TimestampWithOffsetType, error) { + if !isOffsetTypeOk(offsetType) { + return nil, errors.New(fmt.Sprintf("Invalid offset type %s", offsetType)) + } + + return &TimestampWithOffsetType{ + ExtensionBase: arrow.ExtensionBase{ + Storage: arrow.StructOf( + arrow.Field{ + Name: "timestamp", + Type: &arrow.TimestampType{ + Unit: unit, + TimeZone: "UTC", + }, + Nullable: false, + }, + arrow.Field{ + Name: "offset_minutes", + Type: offsetType, + Nullable: false, + }, + ), + }, + }, nil +} + + +// NewTimestampWithOffsetTypePrimitiveEncoded creates a new TimestampWithOffsetType with the underlying storage type set correctly to +// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any TimeUnit. +func NewTimestampWithOffsetTypePrimitiveEncoded(unit arrow.TimeUnit) *TimestampWithOffsetType { + v, _ := NewTimestampWithOffsetType(unit, arrow.PrimitiveTypes.Int16) + // SAFETY: This should never error as Int16 is always a valid offset type + + return v +} + +// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the underlying storage type set correctly to +// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)), where T is any TimeUnit and I is a +// valid Dictionary index type. +// +// The error will be populated if the index is not a valid dictionary-encoding index type. +func NewTimestampWithOffsetTypeDictionaryEncoded(unit arrow.TimeUnit, index arrow.DataType) (*TimestampWithOffsetType, error) { + offsetType := arrow.DictionaryType{ + IndexType: index, + ValueType: arrow.PrimitiveTypes.Int16, + Ordered: false, + } + return NewTimestampWithOffsetType(unit, &offsetType) +} + +// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the underlying storage type set correctly to +// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=RunEndEncoded(E, Int16)), where T is any TimeUnit and E is a +// valid run-ends type. +// +// The error will be populated if runEnds is not a valid run-end encoding run-ends type. +func NewTimestampWithOffsetTypeRunEndEncoded(unit arrow.TimeUnit, runEnds arrow.DataType) (*TimestampWithOffsetType, error) { + offsetType := arrow.RunEndEncodedOf(runEnds, arrow.PrimitiveTypes.Int16) + if !offsetType.ValidRunEndsType(runEnds) { + return nil, errors.New(fmt.Sprintf("Invalid run-ends type %s", runEnds)) + } + + return NewTimestampWithOffsetType(unit, offsetType) +} + + +func (b *TimestampWithOffsetType) ArrayType() reflect.Type { + return reflect.TypeOf(TimestampWithOffsetArray{}) +} + +func (b *TimestampWithOffsetType) ExtensionName() string { return "arrow.timestamp_with_offset" } + +func (b *TimestampWithOffsetType) String() string { + return fmt.Sprintf("extension<%s>", b.ExtensionName()) +} + +func (e *TimestampWithOffsetType) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`{"name":"%s","metadata":%s}`, e.ExtensionName(), e.Serialize())), nil +} + +func (b *TimestampWithOffsetType) Serialize() string { return "" } + +func (b *TimestampWithOffsetType) Deserialize(storageType arrow.DataType, data string) (arrow.ExtensionType, error) { + timeUnit, offsetType, ok := isDataTypeCompatible(storageType) + if !ok { + return nil, fmt.Errorf("invalid storage type for TimestampWithOffsetType: %s", storageType.Name()) + } + + v, _ := NewTimestampWithOffsetType(timeUnit, offsetType) + // SAFETY: the offsetType has already been checked by isDataTypeCompatible, so we can ignore the error + + return v, nil +} + +func (b *TimestampWithOffsetType) ExtensionEquals(other arrow.ExtensionType) bool { + return b.ExtensionName() == other.ExtensionName() +} + +func (b *TimestampWithOffsetType) TimeUnit() arrow.TimeUnit { + return b.ExtensionBase.Storage.(*arrow.StructType).Fields()[0].Type.(*arrow.TimestampType).TimeUnit() +} + +func (b *TimestampWithOffsetType) NewBuilder(mem memory.Allocator) array.Builder { + v, _ := NewTimestampWithOffsetBuilder(mem, b.TimeUnit(), arrow.PrimitiveTypes.Int16) + // SAFETY: This will never error as Int16 is always a valid type for the offset field + + return v +} + +// TimestampWithOffsetArray is a simple array of struct +type TimestampWithOffsetArray struct { + array.ExtensionArrayBase +} + +func (a *TimestampWithOffsetArray) String() string { + var o strings.Builder + o.WriteString("[") + for i := 0; i < a.Len(); i++ { + if i > 0 { + o.WriteString(" ") + } + switch { + case a.IsNull(i): + o.WriteString(array.NullValueStr) + default: + fmt.Fprintf(&o, "\"%s\"", a.Value(i)) + } + } + o.WriteString("]") + return o.String() +} + +func timeFromFieldValues(utcTimestamp arrow.Timestamp, offsetMinutes int16, unit arrow.TimeUnit) time.Time { + hours := offsetMinutes / 60 + minutes := offsetMinutes % 60 + if minutes < 0 { + minutes = -minutes + } + + loc := time.FixedZone(fmt.Sprintf("UTC%+03d:%02d", hours, minutes), int(offsetMinutes)*60) + return utcTimestamp.ToTime(unit).In(loc) +} + +func fieldValuesFromTime(t time.Time, unit arrow.TimeUnit) (arrow.Timestamp, int16) { + // naive "bitwise" conversion to UTC, keeping the underlying date the same + utc := t.UTC() + naiveUtc := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) + offsetMinutes := int16(naiveUtc.Sub(t).Minutes()) + // SAFETY: unit MUST have been validated to a valid arrow.TimeUnit value before + // this function. Otherwise, ignoring this error is not safe. + timestamp, _ := arrow.TimestampFromTime(utc, unit) + return timestamp, offsetMinutes +} + +// Get the raw arrow values at the given index +// +// SAFETY: the value at i must not be nil +func (a *TimestampWithOffsetArray) rawValueUnsafe(i int) (arrow.Timestamp, int16, arrow.TimeUnit) { + structs := a.Storage().(*array.Struct) + + timestampField := structs.Field(0) + timestamps := timestampField.(*array.Timestamp) + + timeUnit := timestampField.DataType().(*arrow.TimestampType).Unit + utcTimestamp := timestamps.Value(i) + + var offsetMinutes int16 + + switch offsets := structs.Field(1).(type) { + case *array.Int16: + offsetMinutes = offsets.Value(i) + case *array.Dictionary: + offsetMinutes = offsets.Dictionary().(*array.Int16).Value(offsets.GetValueIndex(i)) + case *array.RunEndEncoded: + offsetMinutes = offsets.Values().(*array.Int16).Value(offsets.GetPhysicalIndex(i)) + } + + return utcTimestamp, offsetMinutes, timeUnit +} + +func (a *TimestampWithOffsetArray) Value(i int) time.Time { + if a.IsNull(i) { + return time.Unix(0, 0) + } + utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i) + return timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit) +} + +// Iterates over the array and calls the callback with the timestamp at each position. If it's null, +// the timestamp will be nil. +// +// This will iterate using the fastest method given the underlying storage array +func (a* TimestampWithOffsetArray) iterValues(callback func(i int, utcTimestamp *time.Time)) { + structs := a.Storage().(*array.Struct) + offsets := structs.Field(1) + if reeOffsets, isRee := offsets.(*array.RunEndEncoded); isRee { + timestampField := structs.Field(0) + timeUnit := timestampField.DataType().(*arrow.TimestampType).Unit + timestamps := timestampField.(*array.Timestamp) + + offsetValues := reeOffsets.Values().(*array.Int16) + offsetPhysicalIdx := 0 + + var getRunEnd (func(int) int) + switch arr := reeOffsets.RunEndsArr().(type) { + case *array.Int16: + getRunEnd = func(idx int) int { return int(arr.Value(idx)) } + case *array.Int32: + getRunEnd = func(idx int) int { return int(arr.Value(idx)) } + case *array.Int64: + getRunEnd = func(idx int) int { return int(arr.Value(idx)) } + } + + for i := 0; i < a.Len(); i++ { + if i >= getRunEnd(offsetPhysicalIdx) { + offsetPhysicalIdx += 1 + } + + timestamp := (*time.Time)(nil) + if a.IsValid(i) { + utcTimestamp := timestamps.Value(i) + offsetMinutes := offsetValues.Value(offsetPhysicalIdx) + v := timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit) + timestamp = &v + } + + callback(i, timestamp) + } + } else { + for i := 0; i < a.Len(); i++ { + timestamp := (*time.Time)(nil) + if a.IsValid(i) { + utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i) + v := timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit) + timestamp = &v + } + + callback(i, timestamp) + } + } +} + + +func (a *TimestampWithOffsetArray) Values() []time.Time { + values := make([]time.Time, a.Len()) + a.iterValues(func(i int, timestamp *time.Time) { + if timestamp == nil { + values[i] = time.Unix(0, 0) + } else { + values[i] = *timestamp + } + }) + return values +} + +func (a *TimestampWithOffsetArray) ValueStr(i int) string { + switch { + case a.IsNull(i): + return array.NullValueStr + default: + return a.Value(i).String() + } +} + +func (a *TimestampWithOffsetArray) MarshalJSON() ([]byte, error) { + values := make([]interface{}, a.Len()) + a.iterValues(func(i int, timestamp *time.Time) { + values[i] = timestamp + }) + return json.Marshal(values) +} + +func (a *TimestampWithOffsetArray) GetOneForMarshal(i int) interface{} { + if a.IsNull(i) { + return nil + } + return a.Value(i) +} + +// TimestampWithOffsetBuilder is a convenience builder for the TimestampWithOffset extension type, +// allowing arrays to be built with boolean values rather than the underlying storage type. +type TimestampWithOffsetBuilder struct { + *array.ExtensionBuilder + + // The layout used to parse any timestamps from strings. Defaults to time.RFC3339 + Layout string + unit arrow.TimeUnit + offsetType arrow.DataType + // lastOffset is only used to determine when to start new runs with run-end encoded offsets + lastOffset int16 +} + +// NewTimestampWithOffsetBuilder creates a new TimestampWithOffsetBuilder, exposing a convenient and efficient interface +// for writing time.Time values to the underlying storage array. +func NewTimestampWithOffsetBuilder(mem memory.Allocator, unit arrow.TimeUnit, offsetType arrow.DataType) (*TimestampWithOffsetBuilder, error) { + dataType, err := NewTimestampWithOffsetType(unit, offsetType) + if err != nil { + return nil, err + } + + return &TimestampWithOffsetBuilder{ + unit: unit, + offsetType: offsetType, + lastOffset: math.MaxInt16, + Layout: time.RFC3339, + ExtensionBuilder: array.NewExtensionBuilder(mem, dataType), + }, nil +} + +func (b *TimestampWithOffsetBuilder) Append(v time.Time) { + timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) + offsetMinutes16 := int16(offsetMinutes) + structBuilder := b.ExtensionBuilder.Builder.(*array.StructBuilder) + + structBuilder.Append(true) + structBuilder.FieldBuilder(0).(*array.TimestampBuilder).Append(timestamp) + + switch offsets := structBuilder.FieldBuilder(1).(type) { + case *array.Int16Builder: + offsets.Append(offsetMinutes16) + case *array.Int16DictionaryBuilder: + offsets.Append(offsetMinutes16) + case *array.RunEndEncodedBuilder: + if offsetMinutes != b.lastOffset { + offsets.Append(1) + offsets.ValueBuilder().(*array.Int16Builder).Append(offsetMinutes16) + } else { + offsets.ContinueRun(1) + } + + b.lastOffset = offsetMinutes16 + } + +} + +func (b *TimestampWithOffsetBuilder) UnsafeAppend(v time.Time) { + timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) + offsetMinutes16 := int16(offsetMinutes) + structBuilder := b.ExtensionBuilder.Builder.(*array.StructBuilder) + + structBuilder.Append(true) + structBuilder.FieldBuilder(0).(*array.TimestampBuilder).UnsafeAppend(timestamp) + + switch offsets := structBuilder.FieldBuilder(1).(type) { + case *array.Int16Builder: + offsets.UnsafeAppend(offsetMinutes16) + case *array.Int16DictionaryBuilder: + offsets.Append(offsetMinutes16) + case *array.RunEndEncodedBuilder: + if offsetMinutes != b.lastOffset { + offsets.Append(1) + offsets.ValueBuilder().(*array.Int16Builder).Append(offsetMinutes16) + } else { + offsets.ContinueRun(1) + } + + b.lastOffset = offsetMinutes16 + } +} + +// By default, this will try to parse the string using the RFC3339 layout. +// +// You can change the default layout by using builder.SetLayout() +func (b *TimestampWithOffsetBuilder) AppendValueFromString(s string) error { + if s == array.NullValueStr { + b.AppendNull() + return nil + } + + parsed, err := time.Parse(b.Layout, s) + if err != nil { + return err + } + + b.Append(parsed) + return nil +} + +func (b *TimestampWithOffsetBuilder) AppendValues(values []time.Time, valids []bool) { + structBuilder := b.ExtensionBuilder.Builder.(*array.StructBuilder) + timestamps := structBuilder.FieldBuilder(0).(*array.TimestampBuilder) + + structBuilder.AppendValues(valids) + // SAFETY: by this point we know all buffers have available space given the earlier + // call to structBuilder.AppendValues which calls Reserve internally, so it's OK to + // call UnsafeAppend on inner builders + + switch offsets := structBuilder.FieldBuilder(1).(type) { + case *array.Int16Builder: + for i, v := range values { + timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) + if valids[i] { + timestamps.UnsafeAppend(timestamp) + offsets.UnsafeAppend(offsetMinutes) + } else { + timestamps.UnsafeAppendBoolToBitmap(false) + offsets.UnsafeAppendBoolToBitmap(false) + } + } + case *array.Int16DictionaryBuilder: + for i, v := range values { + timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) + if valids[i] { + timestamps.UnsafeAppend(timestamp) + offsets.Append(offsetMinutes) + } else { + timestamps.UnsafeAppendBoolToBitmap(false) + offsets.UnsafeAppendBoolToBitmap(false) + } + } + case *array.RunEndEncodedBuilder: + offsetValuesBuilder := offsets.ValueBuilder().(*array.Int16Builder) + for i, v := range values { + timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) + if valids[i] { + timestamps.UnsafeAppend(timestamp) + offsetMinutes16 := int16(offsetMinutes) + if offsetMinutes != b.lastOffset { + offsets.Append(1) + offsetValuesBuilder.Append(offsetMinutes16) + } else { + offsets.ContinueRun(1) + } + b.lastOffset = offsetMinutes16 + } else { + timestamps.UnsafeAppendBoolToBitmap(false) + offsets.UnsafeAppendBoolToBitmap(false) + } + } + } +} + +func (b *TimestampWithOffsetBuilder) UnmarshalOne(dec *json.Decoder) error { + tok, err := dec.Token() + if err != nil { + return fmt.Errorf("failed to decode json: %w", err) + } + + switch raw := tok.(type) { + case string: + t, err := time.Parse(b.Layout, raw) + if err != nil { + return fmt.Errorf("failed to parse string \"%s\" as time.Time using layout \"%s\"", raw, b.Layout) + } + b.Append(t) + case nil: + b.AppendNull() + default: + return fmt.Errorf("expected date string") + } + + return nil +} + +func (b *TimestampWithOffsetBuilder) Unmarshal(dec *json.Decoder) error { + for dec.More() { + if err := b.UnmarshalOne(dec); err != nil { + return err + } + } + return nil +} + +var ( + _ arrow.ExtensionType = (*TimestampWithOffsetType)(nil) + _ array.CustomExtensionBuilder = (*TimestampWithOffsetType)(nil) + _ array.ExtensionArray = (*TimestampWithOffsetArray)(nil) + _ array.Builder = (*TimestampWithOffsetBuilder)(nil) +) diff --git a/arrow/extensions/timestamp_with_offset_test.go b/arrow/extensions/timestamp_with_offset_test.go new file mode 100644 index 00000000..f7e6c278 --- /dev/null +++ b/arrow/extensions/timestamp_with_offset_test.go @@ -0,0 +1,354 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package extensions_test + +import ( + "bytes" + "fmt" + "testing" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/extensions" + "github.com/apache/arrow-go/v18/arrow/ipc" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/internal/json" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var testTimeUnit = arrow.Microsecond + +var testDate1 = time.Date(2025, 01, 01, 00, 00, 00, 00, time.FixedZone("UTC+00:00", 0)) + +var testZone1 = time.FixedZone("UTC-08:00", -8*60*60) +var testDate2 = testDate1.In(testZone1) + +var testZone2 = time.FixedZone("UTC+06:00", +6*60*60) +var testDate3 = testDate1.In(testZone2) + +func dict(index arrow.DataType) arrow.DataType { + return &arrow.DictionaryType{ + IndexType: index, + ValueType: arrow.PrimitiveTypes.Int16, + Ordered: false, + } +} + +func ree(runEnds arrow.DataType) arrow.DataType { + v := arrow.RunEndEncodedOf(runEnds, arrow.PrimitiveTypes.Int16) + v.ValueNullable = false + return v +} + +// All tests use this in a for loop to make sure everything works for every possible +// encoding of offsets (primitive, dictionary, run-end) +var allAllowedOffsetTypes = []arrow.DataType{ + // primitive offsetType + arrow.PrimitiveTypes.Int16, + + // dict-encoded offsetType + dict(arrow.PrimitiveTypes.Uint8), + dict(arrow.PrimitiveTypes.Uint16), + dict(arrow.PrimitiveTypes.Uint32), + dict(arrow.PrimitiveTypes.Uint64), + dict(arrow.PrimitiveTypes.Int8), + dict(arrow.PrimitiveTypes.Int16), + dict(arrow.PrimitiveTypes.Int32), + dict(arrow.PrimitiveTypes.Int64), + + // run-end encoded offsetType + ree(arrow.PrimitiveTypes.Int16), + ree(arrow.PrimitiveTypes.Int32), + ree(arrow.PrimitiveTypes.Int64), +} + +func TestTimestampWithOffsetTypePrimitiveBasics(t *testing.T) { + typ := extensions.NewTimestampWithOffsetTypePrimitiveEncoded(testTimeUnit) + + assert.Equal(t, "arrow.timestamp_with_offset", typ.ExtensionName()) + assert.True(t, typ.ExtensionEquals(typ)) + + assert.True(t, arrow.TypeEqual(typ, typ)) + assert.True(t, arrow.TypeEqual( + arrow.StructOf( + arrow.Field{ + Name: "timestamp", + Type: &arrow.TimestampType{ + Unit: testTimeUnit, + TimeZone: "UTC", + }, + Nullable: false, + }, + arrow.Field{ + Name: "offset_minutes", + Type: arrow.PrimitiveTypes.Int16, + Nullable: false, + }, + ), + typ.StorageType())) + + assert.Equal(t, "extension", typ.String()) +} + +func TestTimestampWithOffsetTypeDictionaryEncodedBasics(t *testing.T) { + invalidIndexType := arrow.PrimitiveTypes.Float32 + _, err := extensions.NewTimestampWithOffsetTypeDictionaryEncoded(testTimeUnit, invalidIndexType) + assert.True(t, err != nil, "Err should not be nil if index type is invalid dict key") + + indexTypes := []arrow.DataType{ + arrow.PrimitiveTypes.Uint8, + arrow.PrimitiveTypes.Uint16, + arrow.PrimitiveTypes.Uint32, + arrow.PrimitiveTypes.Uint64, + arrow.PrimitiveTypes.Int8, + arrow.PrimitiveTypes.Int16, + arrow.PrimitiveTypes.Int32, + arrow.PrimitiveTypes.Int64, + }; + + for _, indexType := range indexTypes { + typ, err := extensions.NewTimestampWithOffsetTypeDictionaryEncoded(testTimeUnit, indexType) + assert.True(t, err == nil, "Err should be nil") + + assert.Equal(t, "arrow.timestamp_with_offset", typ.ExtensionName()) + assert.True(t, typ.ExtensionEquals(typ)) + + assert.True(t, arrow.TypeEqual(typ, typ)) + assert.True(t, arrow.TypeEqual( + arrow.StructOf( + arrow.Field{ + Name: "timestamp", + Type: &arrow.TimestampType{ + Unit: testTimeUnit, + TimeZone: "UTC", + }, + Nullable: false, + }, + arrow.Field{ + Name: "offset_minutes", + Type: dict(indexType), + Nullable: false, + }, + ), + typ.StorageType())) + + assert.Equal(t, "extension", typ.String()) + } +} + +func TestTimestampWithOffsetTypeRunEndEncodedBasics(t *testing.T) { + invalidRunEndsType := arrow.PrimitiveTypes.Float32 + _, err := extensions.NewTimestampWithOffsetTypeRunEndEncoded(testTimeUnit, invalidRunEndsType) + assert.True(t, err != nil, "Err should not be nil if run ends type is invalid") + + runEndsTypes := []arrow.DataType{ + arrow.PrimitiveTypes.Int16, + arrow.PrimitiveTypes.Int32, + arrow.PrimitiveTypes.Int64, + }; + + for _, indexType := range runEndsTypes { + typ, err := extensions.NewTimestampWithOffsetTypeRunEndEncoded(testTimeUnit, indexType) + assert.True(t, err == nil, "Err should be nil") + + assert.Equal(t, "arrow.timestamp_with_offset", typ.ExtensionName()) + assert.True(t, typ.ExtensionEquals(typ)) + + assert.True(t, arrow.TypeEqual(typ, typ)) + assert.True(t, arrow.TypeEqual( + arrow.StructOf( + arrow.Field{ + Name: "timestamp", + Type: &arrow.TimestampType{ + Unit: testTimeUnit, + TimeZone: "UTC", + }, + Nullable: false, + }, + arrow.Field{ + Name: "offset_minutes", + Type: ree(indexType), + Nullable: false, + }, + ), + typ.StorageType())) + + assert.Equal(t, "extension", typ.String()) + } +} + +func TestTimestampWithOffsetExtensionBuilder(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + // NOTE: we need to compare the arrays parsed from JSON with a primitive-encoded array, since that will always + // use that encoding (there is no way to pass a flag to array.FromJSON to say explicitly what storage type you want) + primitiveBuilder, err := extensions.NewTimestampWithOffsetBuilder(mem, testTimeUnit, arrow.PrimitiveTypes.Int16) + assert.NoError(t, err) + primitiveBuilder.Append(testDate1) + primitiveBuilder.AppendNull() + primitiveBuilder.Append(testDate2) + primitiveBuilder.Append(testDate3) + jsonComparisonArr := primitiveBuilder.NewArray() + defer jsonComparisonArr.Release() + + for _, offsetType := range allAllowedOffsetTypes { + builder, _ := extensions.NewTimestampWithOffsetBuilder(mem, testTimeUnit, offsetType) + + builder.Append(testDate1) + builder.AppendNull() + builder.Append(testDate2) + builder.Append(testDate3) + + // it should build the array with the correct size + arr := builder.NewArray() + typedArr := arr.(*extensions.TimestampWithOffsetArray) + assert.Equal(t, 4, arr.Data().Len()) + defer arr.Release() + + // typedArr.Value(i) should return values adjusted for their original timezone + assert.Equal(t, testDate1, typedArr.Value(0)) + assert.Equal(t, testDate2, typedArr.Value(2)) + assert.Equal(t, testDate3, typedArr.Value(3)) + + // storage TimeUnit should be the same as we pass in to the builder, and storage timezone should be UTC + timestampStructField := typedArr.Storage().(*array.Struct).Field(0) + timestampStructDataType := timestampStructField.DataType().(*arrow.TimestampType) + assert.Equal(t, timestampStructDataType.Unit, testTimeUnit) + assert.Equal(t, timestampStructDataType.TimeZone, "UTC") + + // stored values should be equivalent to the raw values in UTC + timestampsArr := timestampStructField.(*array.Timestamp) + assert.Equal(t, testDate1.In(time.UTC), timestampsArr.Value(0).ToTime(testTimeUnit)) + assert.Equal(t, testDate2.In(time.UTC), timestampsArr.Value(2).ToTime(testTimeUnit)) + assert.Equal(t, testDate3.In(time.UTC), timestampsArr.Value(3).ToTime(testTimeUnit)) + + // the array should encode itself as JSON and string + arrStr := arr.String() + assert.Equal(t, fmt.Sprintf(`["%[1]s" (null) "%[2]s" "%[3]s"]`, testDate1, testDate2, testDate3), arrStr) + jsonStr, err := json.Marshal(arr) + assert.NoError(t, err) + + // roundtripping from JSON with array.FromJSON should work + expectedDataType, _ := extensions.NewTimestampWithOffsetType(testTimeUnit, offsetType) + roundtripped, _, err := array.FromJSON(mem, expectedDataType, bytes.NewReader(jsonStr)) + defer roundtripped.Release() + assert.NoError(t, err) + assert.Truef(t, array.Equal(jsonComparisonArr, roundtripped), "expected %s\n\ngot %s", jsonComparisonArr, roundtripped) + } +} + +func TestTimestampWithOffsetExtensionRecordBuilder(t *testing.T) { + for _, offsetType := range allAllowedOffsetTypes { + dataType, _ := extensions.NewTimestampWithOffsetType(testTimeUnit, offsetType) + schema := arrow.NewSchema([]arrow.Field{ + { + Name: "timestamp_with_offset", + Nullable: true, + Type: dataType, + }, + }, nil) + builder := array.NewRecordBuilder(memory.DefaultAllocator, schema) + defer builder.Release() + + fieldBuilder := builder.Field(0).(*extensions.TimestampWithOffsetBuilder) + + // append a simple time.Time + fieldBuilder.Append(testDate1) + + // append a null and 2 time.Time all at once + values := []time.Time{ + time.Unix(0, 0).In(time.UTC), + testDate2, + testDate3, + } + valids := []bool{false, true, true} + fieldBuilder.AppendValues(values, valids) + + // append a value from RFC3339 string + fieldBuilder.AppendValueFromString(testDate1.Format(time.RFC3339)) + + // append value formatted in a different string layout + fieldBuilder.Layout = time.RFC3339Nano + fieldBuilder.AppendValueFromString(testDate2.Format(time.RFC3339Nano)) + + record := builder.NewRecordBatch() + + // Record batch should JSON-encode values containing per-row timezone info + json, err := record.MarshalJSON() + require.NoError(t, err) + expect := `[{"timestamp_with_offset":"2025-01-01T00:00:00Z"} +,{"timestamp_with_offset":null} +,{"timestamp_with_offset":"2024-12-31T16:00:00-08:00"} +,{"timestamp_with_offset":"2025-01-01T06:00:00+06:00"} +,{"timestamp_with_offset":"2025-01-01T00:00:00Z"} +,{"timestamp_with_offset":"2024-12-31T16:00:00-08:00"} +]` + require.Equal(t, expect, string(json)) + + // Record batch roundtrip to JSON should work + roundtripped, _, err := array.RecordFromJSON(memory.DefaultAllocator, schema, bytes.NewReader(json)) + require.NoError(t, err) + defer roundtripped.Release() + require.Equal(t, schema, roundtripped.Schema()) + assert.Truef(t, array.RecordEqual(record, roundtripped), "expected %s\n\ngot %s", record, roundtripped) + } +} + +func TestTimestampWithOffsetTypeBatchIPCRoundTrip(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + for _, offsetType := range allAllowedOffsetTypes { + builder, _ := extensions.NewTimestampWithOffsetBuilder(mem, testTimeUnit, offsetType) + builder.Append(testDate1) + builder.AppendNull() + builder.Append(testDate2) + builder.Append(testDate3) + arr := builder.NewArray() + defer arr.Release() + + typ, _ := extensions.NewTimestampWithOffsetType(testTimeUnit, offsetType) + + batch := array.NewRecordBatch(arrow.NewSchema([]arrow.Field{{Name: "timestamp_with_offset", Type: typ, Nullable: true}}, nil), []arrow.Array{arr}, -1) + defer batch.Release() + + var written arrow.RecordBatch + { + var buf bytes.Buffer + wr := ipc.NewWriter(&buf, ipc.WithSchema(batch.Schema())) + require.NoError(t, wr.Write(batch)) + require.NoError(t, wr.Close()) + + rdr, err := ipc.NewReader(&buf) + require.NoError(t, err) + written, err = rdr.Read() + require.NoError(t, err) + written.Retain() + defer written.Release() + rdr.Release() + } + + assert.Truef(t, batch.Schema().Equal(written.Schema()), "expected: %s\n\ngot: %s", + batch.Schema(), written.Schema()) + + assert.Truef(t, array.RecordEqual(batch, written), "expected: %s\n\ngot: %s", + batch, written) + } +}