From 14dec8b3ce7b31a9548d75732d80cc555151f158 Mon Sep 17 00:00:00 2001 From: serramatutu Date: Tue, 28 Oct 2025 13:47:35 +0100 Subject: [PATCH 1/4] Add `TimestampWithOffset` extension type This commit adds a new `TimestampWithOffset` extension type that can be used to represent timestamps with per-row timezone information. It stores information in a `struct` with 2 fields, `timestamp=[T, "UTC"]`, where `T` can be any `arrow.TimeUnit` and `offset_minutes=int16`, which represents the offset in minutes from the UTC timestamp. --- arrow/extensions/extensions.go | 7 +- arrow/extensions/timestamp_with_offset.go | 354 ++++++++++++++++++ .../extensions/timestamp_with_offset_test.go | 209 +++++++++++ 3 files changed, 567 insertions(+), 3 deletions(-) create mode 100644 arrow/extensions/timestamp_with_offset.go create mode 100644 arrow/extensions/timestamp_with_offset_test.go diff --git a/arrow/extensions/extensions.go b/arrow/extensions/extensions.go index 04566c75..6f13aa64 100644 --- a/arrow/extensions/extensions.go +++ b/arrow/extensions/extensions.go @@ -21,11 +21,12 @@ import ( ) var canonicalExtensionTypes = []arrow.ExtensionType{ - NewBool8Type(), - NewUUIDType(), - &OpaqueType{}, &JSONType{}, + &OpaqueType{}, + &TimestampWithOffsetType{}, &VariantType{}, + NewBool8Type(), + NewUUIDType(), } func init() { diff --git a/arrow/extensions/timestamp_with_offset.go b/arrow/extensions/timestamp_with_offset.go new file mode 100644 index 00000000..f84ad3c8 --- /dev/null +++ b/arrow/extensions/timestamp_with_offset.go @@ -0,0 +1,354 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package extensions + +import ( + _ "bytes" + "fmt" + "reflect" + "strings" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/internal/json" +) + +// TimestampWithOffsetType represents a timestamp column that stores a timezone offset per row instead of +// applying the same timezone offset to the entire column. +type TimestampWithOffsetType struct { + arrow.ExtensionBase +} + +// Whether the storageType is compatible with TimestampWithOffset. +// +// Returns (time_unit, ok). If ok is false, time unit is garbage. +func isDataTypeCompatible(storageType arrow.DataType) (arrow.TimeUnit, bool) { + timeUnit := arrow.Second + switch t := storageType.(type) { + case *arrow.StructType: + if t.NumFields() != 2 { + return timeUnit, false + } + + maybeTimestamp := t.Field(0); + maybeOffset := t.Field(1); + + timestampOk := false + switch timestampType := maybeTimestamp.Type.(type) { + case *arrow.TimestampType: + if timestampType.TimeZone == "UTC" { + timestampOk = true + timeUnit = timestampType.TimeUnit() + } + default: + } + + ok := maybeTimestamp.Name == "timestamp" && + timestampOk && + !maybeTimestamp.Nullable && + maybeOffset.Name == "offset_minutes" && + arrow.TypeEqual(maybeOffset.Type, arrow.PrimitiveTypes.Int16) && + !maybeOffset.Nullable + + return timeUnit, ok + default: + return timeUnit, false + } +} + + +// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the underlying storage type set correctly to +// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any TimeUnit. +func NewTimestampWithOffsetType(unit arrow.TimeUnit) *TimestampWithOffsetType { + return &TimestampWithOffsetType{ + ExtensionBase: arrow.ExtensionBase{ + Storage: arrow.StructOf( + arrow.Field{ + Name: "timestamp", + Type: &arrow.TimestampType{ + Unit: unit, + TimeZone: "UTC", + }, + Nullable: false, + }, + arrow.Field{ + Name: "offset_minutes", + Type: arrow.PrimitiveTypes.Int16, + Nullable: false, + }, + ), + }, + } +} + +func (b *TimestampWithOffsetType) ArrayType() reflect.Type { return reflect.TypeOf(TimestampWithOffsetArray{}) } + +func (b *TimestampWithOffsetType) ExtensionName() string { return "arrow.timestamp_with_offset" } + +func (b *TimestampWithOffsetType) String() string { return fmt.Sprintf("extension<%s>", b.ExtensionName()) } + +func (e *TimestampWithOffsetType) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`{"name":"%s","metadata":%s}`, e.ExtensionName(), e.Serialize())), nil +} + +func (b *TimestampWithOffsetType) Serialize() string { return "" } + +func (b *TimestampWithOffsetType) Deserialize(storageType arrow.DataType, data string) (arrow.ExtensionType, error) { + timeUnit, ok := isDataTypeCompatible(storageType) + if !ok { + return nil, fmt.Errorf("invalid storage type for TimestampWithOffsetType: %s", storageType.Name()) + } + return NewTimestampWithOffsetType(timeUnit), nil +} + +func (b *TimestampWithOffsetType) ExtensionEquals(other arrow.ExtensionType) bool { + return b.ExtensionName() == other.ExtensionName() +} + +func (b *TimestampWithOffsetType) TimeUnit() arrow.TimeUnit { + return b.ExtensionBase.Storage.(*arrow.StructType).Fields()[0].Type.(*arrow.TimestampType).TimeUnit() +} + +func (b *TimestampWithOffsetType) NewBuilder(mem memory.Allocator) array.Builder { + return NewTimestampWithOffsetBuilder(mem, b.TimeUnit()) +} + +// TimestampWithOffsetArray is a simple array of struct +type TimestampWithOffsetArray struct { + array.ExtensionArrayBase +} + +func (a *TimestampWithOffsetArray) String() string { + var o strings.Builder + o.WriteString("[") + for i := 0; i < a.Len(); i++ { + if i > 0 { + o.WriteString(" ") + } + switch { + case a.IsNull(i): + o.WriteString(array.NullValueStr) + default: + fmt.Fprintf(&o, "\"%s\"", a.Value(i)) + } + } + o.WriteString("]") + return o.String() +} + +func timeFromFieldValues(utcTimestamp arrow.Timestamp, offsetMinutes int16, unit arrow.TimeUnit) time.Time { + hours := offsetMinutes / 60; + minutes := offsetMinutes % 60 + if minutes < 0 { + minutes = -minutes + } + + loc := time.FixedZone(fmt.Sprintf("UTC%+03d:%02d", hours, minutes), int(offsetMinutes) * 60) + return utcTimestamp.ToTime(unit).In(loc) +} + +func fieldValuesFromTime(t time.Time, unit arrow.TimeUnit) (arrow.Timestamp, int16) { + // naive "bitwise" conversion to UTC, keeping the underlying date the same + utc := t.UTC() + naiveUtc := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) + offsetMinutes := int16(naiveUtc.Sub(t).Minutes()) + // SAFETY: unit MUST have been validated to a valid arrow.TimeUnit value before + // this function. Otherwise, ignoring this error is not safe. + timestamp, _ := arrow.TimestampFromTime(utc, unit) + return timestamp, offsetMinutes +} + +// Get the raw arrow values at the given index +// +// SAFETY: the value at i must not be nil +func (a *TimestampWithOffsetArray) rawValueUnsafe(i int) (arrow.Timestamp, int16, arrow.TimeUnit) { + structs := a.Storage().(*array.Struct) + + timestampField := structs.Field(0) + timestamps := timestampField.(*array.Timestamp) + offsets := structs.Field(1).(*array.Int16) + + timeUnit := timestampField.DataType().(*arrow.TimestampType).Unit + utcTimestamp := timestamps.Value(i) + offsetMinutes := offsets.Value(i) + + return utcTimestamp, offsetMinutes, timeUnit +} + +func (a *TimestampWithOffsetArray) Value(i int) time.Time { + if a.IsNull(i) { + return time.Unix(0, 0) + } + utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i) + return timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit) +} + +func (a *TimestampWithOffsetArray) Values() []time.Time { + values := make([]time.Time, a.Len()) + for i := range a.Len() { + val := a.Value(i) + values[i] = val + } + return values +} + +func (a *TimestampWithOffsetArray) ValueStr(i int) string { + switch { + case a.IsNull(i): + return array.NullValueStr + default: + return a.Value(i).String() + } +} + +func (a *TimestampWithOffsetArray) MarshalJSON() ([]byte, error) { + values := make([]interface{}, a.Len()) + for i := 0; i < a.Len(); i++ { + if a.IsValid(i) { + utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i) + values[i] = timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit) + } else { + values[i] = nil + } + } + return json.Marshal(values) +} + +func (a *TimestampWithOffsetArray) GetOneForMarshal(i int) interface{} { + if a.IsNull(i) { + return nil + } + return a.Value(i) +} + +// TimestampWithOffsetBuilder is a convenience builder for the TimestampWithOffset extension type, +// allowing arrays to be built with boolean values rather than the underlying storage type. +type TimestampWithOffsetBuilder struct { + *array.ExtensionBuilder + + // The layout used to parse any timestamps from strings. Defaults to time.RFC3339 + Layout string + unit arrow.TimeUnit +} + +// NewTimestampWithOffsetBuilder creates a new TimestampWithOffsetBuilder, exposing a convenient and efficient interface +// for writing time.Time values to the underlying storage array. +func NewTimestampWithOffsetBuilder(mem memory.Allocator, unit arrow.TimeUnit) *TimestampWithOffsetBuilder { + return &TimestampWithOffsetBuilder{ + unit: unit, + Layout: time.RFC3339, + ExtensionBuilder: array.NewExtensionBuilder(mem, NewTimestampWithOffsetType(unit)), + } +} + +func (b *TimestampWithOffsetBuilder) Append(v time.Time) { + timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) + structBuilder := b.ExtensionBuilder.Builder.(*array.StructBuilder) + + structBuilder.Append(true) + structBuilder.FieldBuilder(0).(*array.TimestampBuilder).Append(timestamp) + structBuilder.FieldBuilder(1).(*array.Int16Builder).Append(int16(offsetMinutes)) +} + +func (b *TimestampWithOffsetBuilder) UnsafeAppend(v time.Time) { + timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) + structBuilder := b.ExtensionBuilder.Builder.(*array.StructBuilder) + + structBuilder.Append(true) + structBuilder.FieldBuilder(0).(*array.TimestampBuilder).UnsafeAppend(timestamp) + structBuilder.FieldBuilder(1).(*array.Int16Builder).UnsafeAppend(int16(offsetMinutes)) +} + +// By default, this will try to parse the string using the RFC3339 layout. +// +// You can change the default layout by using builder.SetLayout() +func (b *TimestampWithOffsetBuilder) AppendValueFromString(s string) error { + if s == array.NullValueStr { + b.AppendNull() + return nil + } + + parsed, err := time.Parse(b.Layout, s) + if err != nil { + return err + } + + b.Append(parsed) + return nil +} + +func (b *TimestampWithOffsetBuilder) AppendValues(values []time.Time, valids []bool) { + structBuilder := b.ExtensionBuilder.Builder.(*array.StructBuilder) + timestamps := structBuilder.FieldBuilder(0).(*array.TimestampBuilder); + offsets := structBuilder.FieldBuilder(1).(*array.Int16Builder); + + structBuilder.AppendValues(valids) + + for i, v := range values { + timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) + + // SAFETY: by this point we know all buffers have available space given the earlier + // call to structBuilder.AppendValues which calls Reserve internally + if valids[i] { + timestamps.UnsafeAppend(timestamp) + offsets.UnsafeAppend(offsetMinutes) + } else { + timestamps.UnsafeAppendBoolToBitmap(false) + offsets.UnsafeAppendBoolToBitmap(false) + } + } +} + +func (b *TimestampWithOffsetBuilder) UnmarshalOne(dec *json.Decoder) error { + tok, err := dec.Token() + if err != nil { + return fmt.Errorf("failed to decode json: %w", err) + } + + switch raw := tok.(type) { + case string: + t, err := time.Parse(b.Layout, raw) + if err != nil { + return fmt.Errorf("failed to parse string \"%s\" as time.Time using layout \"%s\"", raw, b.Layout) + } + b.Append(t) + case nil: + b.AppendNull() + default: + return fmt.Errorf("expected date string") + } + + return nil +} + +func (b *TimestampWithOffsetBuilder) Unmarshal(dec *json.Decoder) error { + for dec.More() { + if err := b.UnmarshalOne(dec); err != nil { + return err + } + } + return nil +} + +var ( + _ arrow.ExtensionType = (*TimestampWithOffsetType)(nil) + _ array.CustomExtensionBuilder = (*TimestampWithOffsetType)(nil) + _ array.ExtensionArray = (*TimestampWithOffsetArray)(nil) + _ array.Builder = (*TimestampWithOffsetBuilder)(nil) +) diff --git a/arrow/extensions/timestamp_with_offset_test.go b/arrow/extensions/timestamp_with_offset_test.go new file mode 100644 index 00000000..9c198afd --- /dev/null +++ b/arrow/extensions/timestamp_with_offset_test.go @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package extensions_test + +import ( + "bytes" + _ "bytes" + "fmt" + "strings" + _ "strings" + "testing" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/extensions" + "github.com/apache/arrow-go/v18/arrow/ipc" + _ "github.com/apache/arrow-go/v18/arrow/ipc" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/internal/json" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var testTimeUnit = arrow.Microsecond + +var testDate1 = time.Date(2025, 01, 01, 00, 00, 00, 00, time.FixedZone("UTC+00:00", 0)) + +var testZone1 = time.FixedZone("UTC-08:00", -8*60*60) +var testDate2 = testDate1.In(testZone1) + +var testZone2 = time.FixedZone("UTC+06:00", +6*60*60) +var testDate3 = testDate1.In(testZone2) + +func TestTimestampWithOffsetTypeBasics(t *testing.T) { + typ := extensions.NewTimestampWithOffsetType(testTimeUnit) + + assert.Equal(t, "arrow.timestamp_with_offset", typ.ExtensionName()) + assert.True(t, typ.ExtensionEquals(typ)) + + assert.True(t, arrow.TypeEqual(typ, typ)) + assert.True(t, arrow.TypeEqual( + arrow.StructOf( + arrow.Field{ + Name: "timestamp", + Type: &arrow.TimestampType{ + Unit: testTimeUnit, + TimeZone: "UTC", + }, + Nullable: false, + }, + arrow.Field{ + Name: "offset_minutes", + Type: arrow.PrimitiveTypes.Int16, + Nullable: false, + }, + ), + typ.StorageType())) + + assert.Equal(t, "extension", typ.String()) +} + +func TestTimestampWithOffsetExtensionBuilder(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + builder := extensions.NewTimestampWithOffsetBuilder(mem, testTimeUnit) + builder.Append(testDate1) + builder.AppendNull() + builder.Append(testDate2) + builder.Append(testDate3) + + // it should build the array with the correct size + arr := builder.NewArray() + typedArr := arr.(*extensions.TimestampWithOffsetArray) + assert.Equal(t, 4, arr.Data().Len()) + defer arr.Release() + + // typedArr.Value(i) should return values adjusted for their original timezone + assert.Equal(t, testDate1, typedArr.Value(0)) + assert.Equal(t, testDate2, typedArr.Value(2)) + assert.Equal(t, testDate3, typedArr.Value(3)) + + // storage TimeUnit should be the same as we pass in to the builder, and storage timezone should be UTC + timestampStructField := typedArr.Storage().(*array.Struct).Field(0) + timestampStructDataType := timestampStructField.DataType().(*arrow.TimestampType) + assert.Equal(t, timestampStructDataType.Unit, testTimeUnit) + assert.Equal(t, timestampStructDataType.TimeZone, "UTC") + + // stored values should be equivalent to the raw values in UTC + timestampsArr := timestampStructField.(*array.Timestamp) + assert.Equal(t, testDate1.In(time.UTC), timestampsArr.Value(0).ToTime(testTimeUnit)) + assert.Equal(t, testDate2.In(time.UTC), timestampsArr.Value(2).ToTime(testTimeUnit)) + assert.Equal(t, testDate3.In(time.UTC), timestampsArr.Value(3).ToTime(testTimeUnit)) + + // the array should encode itself as JSON and string + arrStr := arr.String() + assert.Equal(t, fmt.Sprintf(`["%[1]s" (null) "%[2]s" "%[3]s"]`, testDate1, testDate2, testDate3), arrStr) + jsonStr, err := json.Marshal(arr) + assert.NoError(t, err) + + // roundtripping from JSON with array.FromJSON should work + roundtripped, _, err := array.FromJSON(mem, extensions.NewTimestampWithOffsetType(testTimeUnit), bytes.NewReader(jsonStr)) + defer roundtripped.Release() + assert.NoError(t, err) + assert.Truef(t, array.Equal(arr, roundtripped), "expected %s got %s", arr, roundtripped) +} + +func TestTimestampWithOffsetExtensionRecordBuilder(t *testing.T) { + schema := arrow.NewSchema([]arrow.Field{ + { + Name: "timestamp_with_offset", + Nullable: true, + Type: extensions.NewTimestampWithOffsetType(testTimeUnit), + }, + }, nil) + builder := array.NewRecordBuilder(memory.DefaultAllocator, schema) + defer builder.Release() + + fieldBuilder := builder.Field(0).(*extensions.TimestampWithOffsetBuilder) + + // append a simple time.Time + fieldBuilder.Append(testDate1) + + // append a null and 2 time.Time all at once + values := []time.Time{ + time.Unix(0, 0).In(time.UTC), + testDate2, + testDate3, + } + valids := []bool{false, true, true} + fieldBuilder.AppendValues(values, valids) + + // append a value from RFC3339 string + fieldBuilder.AppendValueFromString(testDate1.Format(time.RFC3339)) + + // append value formatted in a different string layout + fieldBuilder.Layout = time.RFC3339Nano + fieldBuilder.AppendValueFromString(testDate2.Format(time.RFC3339Nano)) + + record := builder.NewRecordBatch() + + // Record batch should JSON-encode values containing per-row timezone info + json, err := record.MarshalJSON() + require.NoError(t, err) + expect := `[{"timestamp_with_offset":"2025-01-01T00:00:00Z"} +,{"timestamp_with_offset":null} +,{"timestamp_with_offset":"2024-12-31T16:00:00-08:00"} +,{"timestamp_with_offset":"2025-01-01T06:00:00+06:00"} +,{"timestamp_with_offset":"2025-01-01T00:00:00Z"} +,{"timestamp_with_offset":"2024-12-31T16:00:00-08:00"} +]` + require.Equal(t, expect, string(json)) + + // Record batch roundtrip to JSON should work + roundtripped, _, err := array.RecordFromJSON(memory.DefaultAllocator, schema, bytes.NewReader(json)) + require.NoError(t, err) + defer roundtripped.Release() + require.Equal(t, schema, roundtripped.Schema()) + assert.Truef(t, array.RecordEqual(record, roundtripped), "expected %s\n\ngot %s", record, roundtripped) +} + +func TestTimestampWithOffsetTypeBatchIPCRoundTrip(t *testing.T) { + raw := `["2025-01-01T00:00:00Z",null,"2024-12-31T16:00:00-08:00","2025-01-01T06:00:00+06:00","2025-01-01T00:00:00Z","2024-12-31T16:00:00-08:00"]` + typ := extensions.NewTimestampWithOffsetType(testTimeUnit) + + arr, _, err := array.FromJSON(memory.DefaultAllocator, typ, strings.NewReader(raw)) + require.NoError(t, err) + defer arr.Release() + + batch := array.NewRecordBatch(arrow.NewSchema([]arrow.Field{{Name: "timestamp_with_offset", Type: typ, Nullable: true}}, nil), []arrow.Array{arr}, -1) + defer batch.Release() + + var written arrow.RecordBatch + { + var buf bytes.Buffer + wr := ipc.NewWriter(&buf, ipc.WithSchema(batch.Schema())) + require.NoError(t, wr.Write(batch)) + require.NoError(t, wr.Close()) + + rdr, err := ipc.NewReader(&buf) + require.NoError(t, err) + written, err = rdr.Read() + require.NoError(t, err) + written.Retain() + defer written.Release() + rdr.Release() + } + + assert.Truef(t, batch.Schema().Equal(written.Schema()), "expected: %s\n\ngot: %s", + batch.Schema(), written.Schema()) + + assert.Truef(t, array.RecordEqual(batch, written), "expected: %s\n\ngot: %s", + batch, written) +} From 85a7265885706bb855089b738b4cf06aca6c3df1 Mon Sep 17 00:00:00 2001 From: serramatutu Date: Mon, 22 Dec 2025 11:44:08 +0200 Subject: [PATCH 2/4] Allow dictionary encoding of `TimestampWithOffset` This commit allows `TimestampWithOffset` to be dict-encoded. - I made `NewTimestampWithOffsetType` take in an input `offsetType arrow.DataType`. It returns an error if the data type is not valid. - I added a new infallible `NewTimestampWithOffsetTypePrimitiveEncoded` to make the encoding explicit. - I added `NewTimestampWithOffsetTypeDictionaryEncoded` which returns an error in case the given type is not a valid dictionary key type. - I made all tests run in a for loop with all possible allowed encoding types, ensuring all encodings work. --- arrow/extensions/timestamp_with_offset.go | 196 ++++++++--- .../extensions/timestamp_with_offset_test.go | 317 +++++++++++------- 2 files changed, 350 insertions(+), 163 deletions(-) diff --git a/arrow/extensions/timestamp_with_offset.go b/arrow/extensions/timestamp_with_offset.go index f84ad3c8..e579fab7 100644 --- a/arrow/extensions/timestamp_with_offset.go +++ b/arrow/extensions/timestamp_with_offset.go @@ -17,7 +17,7 @@ package extensions import ( - _ "bytes" + "errors" "fmt" "reflect" "strings" @@ -35,19 +35,31 @@ type TimestampWithOffsetType struct { arrow.ExtensionBase } +func isOffsetTypeOk(offsetType arrow.DataType) bool { + switch offsetType := offsetType.(type) { + case *arrow.Int16Type: + return true + case *arrow.DictionaryType: + return arrow.IsInteger(offsetType.IndexType.ID()) && arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16) + default: + return false + } +} + // Whether the storageType is compatible with TimestampWithOffset. // -// Returns (time_unit, ok). If ok is false, time unit is garbage. -func isDataTypeCompatible(storageType arrow.DataType) (arrow.TimeUnit, bool) { +// Returns (time_unit, offset_type, ok). If ok is false, time_unit and offset_type are garbage. +func isDataTypeCompatible(storageType arrow.DataType) (arrow.TimeUnit, arrow.DataType, bool) { timeUnit := arrow.Second + offsetType := arrow.PrimitiveTypes.Int16 switch t := storageType.(type) { case *arrow.StructType: if t.NumFields() != 2 { - return timeUnit, false + return timeUnit, offsetType, false } - maybeTimestamp := t.Field(0); - maybeOffset := t.Field(1); + maybeTimestamp := t.Field(0) + maybeOffset := t.Field(1) timestampOk := false switch timestampType := maybeTimestamp.Type.(type) { @@ -59,49 +71,84 @@ func isDataTypeCompatible(storageType arrow.DataType) (arrow.TimeUnit, bool) { default: } + offsetOk := isOffsetTypeOk(maybeOffset.Type) + ok := maybeTimestamp.Name == "timestamp" && timestampOk && !maybeTimestamp.Nullable && maybeOffset.Name == "offset_minutes" && - arrow.TypeEqual(maybeOffset.Type, arrow.PrimitiveTypes.Int16) && + offsetOk && !maybeOffset.Nullable - return timeUnit, ok + return timeUnit, maybeOffset.Type, ok default: - return timeUnit, false + return timeUnit, offsetType, false } } - // NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the underlying storage type set correctly to -// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any TimeUnit. -func NewTimestampWithOffsetType(unit arrow.TimeUnit) *TimestampWithOffsetType { +// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any TimeUnit and O is a valid offset type. +// +// The error will be populated if the data type is not a valid encoding of the offsets field. +func NewTimestampWithOffsetType(unit arrow.TimeUnit, offsetType arrow.DataType) (*TimestampWithOffsetType, error) { + if !isOffsetTypeOk(offsetType) { + return nil, errors.New(fmt.Sprintf("Invalid offset type %s", offsetType)) + } + return &TimestampWithOffsetType{ ExtensionBase: arrow.ExtensionBase{ Storage: arrow.StructOf( arrow.Field{ Name: "timestamp", Type: &arrow.TimestampType{ - Unit: unit, + Unit: unit, TimeZone: "UTC", }, Nullable: false, }, arrow.Field{ - Name: "offset_minutes", - Type: arrow.PrimitiveTypes.Int16, + Name: "offset_minutes", + Type: offsetType, Nullable: false, }, ), }, + }, nil +} + + +// NewTimestampWithOffsetTypePrimitiveEncoded creates a new TimestampWithOffsetType with the underlying storage type set correctly to +// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any TimeUnit. +func NewTimestampWithOffsetTypePrimitiveEncoded(unit arrow.TimeUnit) *TimestampWithOffsetType { + v, _ := NewTimestampWithOffsetType(unit, arrow.PrimitiveTypes.Int16) + // SAFETY: This should never error as Int16 is always a valid offset type + + return v +} + +// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the underlying storage type set correctly to +// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)), where T is any TimeUnit and I is a +// valid Dictionary index type. +// +// The error will be populated if the index is not a valid dictionary-encoding index type. +func NewTimestampWithOffsetTypeDictionaryEncoded(unit arrow.TimeUnit, index arrow.DataType) (*TimestampWithOffsetType, error) { + offsetType := arrow.DictionaryType{ + IndexType: index, + ValueType: arrow.PrimitiveTypes.Int16, + Ordered: false, } + return NewTimestampWithOffsetType(unit, &offsetType) } -func (b *TimestampWithOffsetType) ArrayType() reflect.Type { return reflect.TypeOf(TimestampWithOffsetArray{}) } +func (b *TimestampWithOffsetType) ArrayType() reflect.Type { + return reflect.TypeOf(TimestampWithOffsetArray{}) +} func (b *TimestampWithOffsetType) ExtensionName() string { return "arrow.timestamp_with_offset" } -func (b *TimestampWithOffsetType) String() string { return fmt.Sprintf("extension<%s>", b.ExtensionName()) } +func (b *TimestampWithOffsetType) String() string { + return fmt.Sprintf("extension<%s>", b.ExtensionName()) +} func (e *TimestampWithOffsetType) MarshalJSON() ([]byte, error) { return []byte(fmt.Sprintf(`{"name":"%s","metadata":%s}`, e.ExtensionName(), e.Serialize())), nil @@ -110,23 +157,30 @@ func (e *TimestampWithOffsetType) MarshalJSON() ([]byte, error) { func (b *TimestampWithOffsetType) Serialize() string { return "" } func (b *TimestampWithOffsetType) Deserialize(storageType arrow.DataType, data string) (arrow.ExtensionType, error) { - timeUnit, ok := isDataTypeCompatible(storageType) + timeUnit, offsetType, ok := isDataTypeCompatible(storageType) if !ok { return nil, fmt.Errorf("invalid storage type for TimestampWithOffsetType: %s", storageType.Name()) } - return NewTimestampWithOffsetType(timeUnit), nil + + v, _ := NewTimestampWithOffsetType(timeUnit, offsetType) + // SAFETY: the offsetType has already been checked by isDataTypeCompatible, so we can ignore the error + + return v, nil } func (b *TimestampWithOffsetType) ExtensionEquals(other arrow.ExtensionType) bool { return b.ExtensionName() == other.ExtensionName() } -func (b *TimestampWithOffsetType) TimeUnit() arrow.TimeUnit { +func (b *TimestampWithOffsetType) TimeUnit() arrow.TimeUnit { return b.ExtensionBase.Storage.(*arrow.StructType).Fields()[0].Type.(*arrow.TimestampType).TimeUnit() } func (b *TimestampWithOffsetType) NewBuilder(mem memory.Allocator) array.Builder { - return NewTimestampWithOffsetBuilder(mem, b.TimeUnit()) + v, _ := NewTimestampWithOffsetBuilder(mem, b.TimeUnit(), arrow.PrimitiveTypes.Int16) + // SAFETY: This will never error as Int16 is always a valid type for the offset field + + return v } // TimestampWithOffsetArray is a simple array of struct @@ -153,13 +207,13 @@ func (a *TimestampWithOffsetArray) String() string { } func timeFromFieldValues(utcTimestamp arrow.Timestamp, offsetMinutes int16, unit arrow.TimeUnit) time.Time { - hours := offsetMinutes / 60; + hours := offsetMinutes / 60 minutes := offsetMinutes % 60 if minutes < 0 { minutes = -minutes } - loc := time.FixedZone(fmt.Sprintf("UTC%+03d:%02d", hours, minutes), int(offsetMinutes) * 60) + loc := time.FixedZone(fmt.Sprintf("UTC%+03d:%02d", hours, minutes), int(offsetMinutes)*60) return utcTimestamp.ToTime(unit).In(loc) } @@ -182,11 +236,18 @@ func (a *TimestampWithOffsetArray) rawValueUnsafe(i int) (arrow.Timestamp, int16 timestampField := structs.Field(0) timestamps := timestampField.(*array.Timestamp) - offsets := structs.Field(1).(*array.Int16) timeUnit := timestampField.DataType().(*arrow.TimestampType).Unit - utcTimestamp := timestamps.Value(i) - offsetMinutes := offsets.Value(i) + utcTimestamp := timestamps.Value(i) + + var offsetMinutes int16 + + switch offsets := structs.Field(1).(type) { + case *array.Int16: + offsetMinutes = offsets.Value(i) + case *array.Dictionary: + offsetMinutes = offsets.Dictionary().(*array.Int16).Value(offsets.GetValueIndex(i)) + } return utcTimestamp, offsetMinutes, timeUnit } @@ -243,18 +304,25 @@ type TimestampWithOffsetBuilder struct { *array.ExtensionBuilder // The layout used to parse any timestamps from strings. Defaults to time.RFC3339 - Layout string - unit arrow.TimeUnit + Layout string + unit arrow.TimeUnit + offsetType arrow.DataType } // NewTimestampWithOffsetBuilder creates a new TimestampWithOffsetBuilder, exposing a convenient and efficient interface // for writing time.Time values to the underlying storage array. -func NewTimestampWithOffsetBuilder(mem memory.Allocator, unit arrow.TimeUnit) *TimestampWithOffsetBuilder { - return &TimestampWithOffsetBuilder{ - unit: unit, - Layout: time.RFC3339, - ExtensionBuilder: array.NewExtensionBuilder(mem, NewTimestampWithOffsetType(unit)), +func NewTimestampWithOffsetBuilder(mem memory.Allocator, unit arrow.TimeUnit, offsetType arrow.DataType) (*TimestampWithOffsetBuilder, error) { + dataType, err := NewTimestampWithOffsetType(unit, offsetType) + if err != nil { + return nil, err } + + return &TimestampWithOffsetBuilder{ + unit: unit, + offsetType: offsetType, + Layout: time.RFC3339, + ExtensionBuilder: array.NewExtensionBuilder(mem, dataType), + }, nil } func (b *TimestampWithOffsetBuilder) Append(v time.Time) { @@ -263,7 +331,13 @@ func (b *TimestampWithOffsetBuilder) Append(v time.Time) { structBuilder.Append(true) structBuilder.FieldBuilder(0).(*array.TimestampBuilder).Append(timestamp) - structBuilder.FieldBuilder(1).(*array.Int16Builder).Append(int16(offsetMinutes)) + + switch offsets := structBuilder.FieldBuilder(1).(type) { + case *array.Int16Builder: + offsets.Append(int16(offsetMinutes)) + case *array.Int16DictionaryBuilder: + offsets.Append(int16(offsetMinutes)) + } } func (b *TimestampWithOffsetBuilder) UnsafeAppend(v time.Time) { @@ -272,10 +346,16 @@ func (b *TimestampWithOffsetBuilder) UnsafeAppend(v time.Time) { structBuilder.Append(true) structBuilder.FieldBuilder(0).(*array.TimestampBuilder).UnsafeAppend(timestamp) - structBuilder.FieldBuilder(1).(*array.Int16Builder).UnsafeAppend(int16(offsetMinutes)) + + switch offsets := structBuilder.FieldBuilder(1).(type) { + case *array.Int16Builder: + offsets.UnsafeAppend(int16(offsetMinutes)) + case *array.Int16DictionaryBuilder: + offsets.Append(int16(offsetMinutes)) + } } -// By default, this will try to parse the string using the RFC3339 layout. +// By default, this will try to parse the string using the RFC3339 layout. // // You can change the default layout by using builder.SetLayout() func (b *TimestampWithOffsetBuilder) AppendValueFromString(s string) error { @@ -295,22 +375,36 @@ func (b *TimestampWithOffsetBuilder) AppendValueFromString(s string) error { func (b *TimestampWithOffsetBuilder) AppendValues(values []time.Time, valids []bool) { structBuilder := b.ExtensionBuilder.Builder.(*array.StructBuilder) - timestamps := structBuilder.FieldBuilder(0).(*array.TimestampBuilder); - offsets := structBuilder.FieldBuilder(1).(*array.Int16Builder); + timestamps := structBuilder.FieldBuilder(0).(*array.TimestampBuilder) structBuilder.AppendValues(valids) - - for i, v := range values { - timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) - - // SAFETY: by this point we know all buffers have available space given the earlier - // call to structBuilder.AppendValues which calls Reserve internally - if valids[i] { - timestamps.UnsafeAppend(timestamp) - offsets.UnsafeAppend(offsetMinutes) - } else { - timestamps.UnsafeAppendBoolToBitmap(false) - offsets.UnsafeAppendBoolToBitmap(false) + // SAFETY: by this point we know all buffers have available space given the earlier + // call to structBuilder.AppendValues which calls Reserve internally, so it's OK to + // call UnsafeAppend on inner builders + + switch offsets := structBuilder.FieldBuilder(1).(type) { + case *array.Int16Builder: + for i, v := range values { + timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) + if valids[i] { + timestamps.UnsafeAppend(timestamp) + offsets.UnsafeAppend(offsetMinutes) + } else { + timestamps.UnsafeAppendBoolToBitmap(false) + offsets.UnsafeAppendBoolToBitmap(false) + } + } + case *array.Int16DictionaryBuilder: + for i, v := range values { + timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) + if valids[i] { + timestamps.UnsafeAppend(timestamp) + // TODO: I was here, this needs to be equivalent to UnsafeAppend + offsets.Append(offsetMinutes) + } else { + timestamps.UnsafeAppendBoolToBitmap(false) + offsets.UnsafeAppendBoolToBitmap(false) + } } } } @@ -350,5 +444,5 @@ var ( _ arrow.ExtensionType = (*TimestampWithOffsetType)(nil) _ array.CustomExtensionBuilder = (*TimestampWithOffsetType)(nil) _ array.ExtensionArray = (*TimestampWithOffsetArray)(nil) - _ array.Builder = (*TimestampWithOffsetBuilder)(nil) + _ array.Builder = (*TimestampWithOffsetBuilder)(nil) ) diff --git a/arrow/extensions/timestamp_with_offset_test.go b/arrow/extensions/timestamp_with_offset_test.go index 9c198afd..fc08375c 100644 --- a/arrow/extensions/timestamp_with_offset_test.go +++ b/arrow/extensions/timestamp_with_offset_test.go @@ -18,10 +18,7 @@ package extensions_test import ( "bytes" - _ "bytes" "fmt" - "strings" - _ "strings" "testing" "time" @@ -29,7 +26,6 @@ import ( "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/extensions" "github.com/apache/arrow-go/v18/arrow/ipc" - _ "github.com/apache/arrow-go/v18/arrow/ipc" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/internal/json" "github.com/stretchr/testify/assert" @@ -46,8 +42,33 @@ var testDate2 = testDate1.In(testZone1) var testZone2 = time.FixedZone("UTC+06:00", +6*60*60) var testDate3 = testDate1.In(testZone2) -func TestTimestampWithOffsetTypeBasics(t *testing.T) { - typ := extensions.NewTimestampWithOffsetType(testTimeUnit) +func dict(index arrow.DataType) arrow.DataType { + return &arrow.DictionaryType{ + IndexType: index, + ValueType: arrow.PrimitiveTypes.Int16, + Ordered: false, + } +} + +// All tests use this in a for loop to make sure everything works for every possible +// encoding of offsets (primitive, dictionary, run-end) +var allAllowedOffsetTypes = []arrow.DataType{ + // primitive offsetType + arrow.PrimitiveTypes.Int16, + + // dict-encoded offsetType + dict(arrow.PrimitiveTypes.Uint8), + dict(arrow.PrimitiveTypes.Uint16), + dict(arrow.PrimitiveTypes.Uint32), + dict(arrow.PrimitiveTypes.Uint64), + dict(arrow.PrimitiveTypes.Int8), + dict(arrow.PrimitiveTypes.Int16), + dict(arrow.PrimitiveTypes.Int32), + dict(arrow.PrimitiveTypes.Int64), +} + +func TestTimestampWithOffsetTypePrimitiveBasics(t *testing.T) { + typ := extensions.NewTimestampWithOffsetTypePrimitiveEncoded(testTimeUnit) assert.Equal(t, "arrow.timestamp_with_offset", typ.ExtensionName()) assert.True(t, typ.ExtensionEquals(typ)) @@ -74,136 +95,208 @@ func TestTimestampWithOffsetTypeBasics(t *testing.T) { assert.Equal(t, "extension", typ.String()) } +func TestTimestampWithOffsetTypeDictionaryEncodedBasics(t *testing.T) { + invalidIndexType := arrow.PrimitiveTypes.Float32 + _, err := extensions.NewTimestampWithOffsetTypeDictionaryEncoded(testTimeUnit, invalidIndexType) + assert.True(t, err != nil, "Err should not be nil if index type is invalid dict key") + + indexTypes := []arrow.DataType{ + arrow.PrimitiveTypes.Uint8, + arrow.PrimitiveTypes.Uint16, + arrow.PrimitiveTypes.Uint32, + arrow.PrimitiveTypes.Uint64, + arrow.PrimitiveTypes.Int8, + arrow.PrimitiveTypes.Int16, + arrow.PrimitiveTypes.Int32, + arrow.PrimitiveTypes.Int64, + }; + + for _, indexType := range indexTypes { + typ, err := extensions.NewTimestampWithOffsetTypeDictionaryEncoded(testTimeUnit, indexType) + assert.True(t, err == nil, "Err should be nil") + + assert.Equal(t, "arrow.timestamp_with_offset", typ.ExtensionName()) + assert.True(t, typ.ExtensionEquals(typ)) + + assert.True(t, arrow.TypeEqual(typ, typ)) + assert.True(t, arrow.TypeEqual( + arrow.StructOf( + arrow.Field{ + Name: "timestamp", + Type: &arrow.TimestampType{ + Unit: testTimeUnit, + TimeZone: "UTC", + }, + Nullable: false, + }, + arrow.Field{ + Name: "offset_minutes", + Type: dict(indexType), + Nullable: false, + }, + ), + typ.StorageType())) + + assert.Equal(t, "extension", typ.String()) + } +} + func TestTimestampWithOffsetExtensionBuilder(t *testing.T) { mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) - builder := extensions.NewTimestampWithOffsetBuilder(mem, testTimeUnit) - builder.Append(testDate1) - builder.AppendNull() - builder.Append(testDate2) - builder.Append(testDate3) - - // it should build the array with the correct size - arr := builder.NewArray() - typedArr := arr.(*extensions.TimestampWithOffsetArray) - assert.Equal(t, 4, arr.Data().Len()) - defer arr.Release() - - // typedArr.Value(i) should return values adjusted for their original timezone - assert.Equal(t, testDate1, typedArr.Value(0)) - assert.Equal(t, testDate2, typedArr.Value(2)) - assert.Equal(t, testDate3, typedArr.Value(3)) - - // storage TimeUnit should be the same as we pass in to the builder, and storage timezone should be UTC - timestampStructField := typedArr.Storage().(*array.Struct).Field(0) - timestampStructDataType := timestampStructField.DataType().(*arrow.TimestampType) - assert.Equal(t, timestampStructDataType.Unit, testTimeUnit) - assert.Equal(t, timestampStructDataType.TimeZone, "UTC") - - // stored values should be equivalent to the raw values in UTC - timestampsArr := timestampStructField.(*array.Timestamp) - assert.Equal(t, testDate1.In(time.UTC), timestampsArr.Value(0).ToTime(testTimeUnit)) - assert.Equal(t, testDate2.In(time.UTC), timestampsArr.Value(2).ToTime(testTimeUnit)) - assert.Equal(t, testDate3.In(time.UTC), timestampsArr.Value(3).ToTime(testTimeUnit)) - - // the array should encode itself as JSON and string - arrStr := arr.String() - assert.Equal(t, fmt.Sprintf(`["%[1]s" (null) "%[2]s" "%[3]s"]`, testDate1, testDate2, testDate3), arrStr) - jsonStr, err := json.Marshal(arr) - assert.NoError(t, err) - - // roundtripping from JSON with array.FromJSON should work - roundtripped, _, err := array.FromJSON(mem, extensions.NewTimestampWithOffsetType(testTimeUnit), bytes.NewReader(jsonStr)) - defer roundtripped.Release() + // NOTE: we need to compare the arrays parsed from JSON with a primitive-encoded array, since that will always + // use that encoding (there is no way to pass a flag to array.FromJSON to say explicitly what storage type you want) + primitiveBuilder, err := extensions.NewTimestampWithOffsetBuilder(mem, testTimeUnit, arrow.PrimitiveTypes.Int16) assert.NoError(t, err) - assert.Truef(t, array.Equal(arr, roundtripped), "expected %s got %s", arr, roundtripped) + primitiveBuilder.Append(testDate1) + primitiveBuilder.AppendNull() + primitiveBuilder.Append(testDate2) + primitiveBuilder.Append(testDate3) + jsonComparisonArr := primitiveBuilder.NewArray() + defer jsonComparisonArr.Release() + + for _, offsetType := range allAllowedOffsetTypes { + builder, _ := extensions.NewTimestampWithOffsetBuilder(mem, testTimeUnit, offsetType) + + builder.Append(testDate1) + builder.AppendNull() + builder.Append(testDate2) + builder.Append(testDate3) + + // it should build the array with the correct size + arr := builder.NewArray() + typedArr := arr.(*extensions.TimestampWithOffsetArray) + assert.Equal(t, 4, arr.Data().Len()) + defer arr.Release() + + // typedArr.Value(i) should return values adjusted for their original timezone + assert.Equal(t, testDate1, typedArr.Value(0)) + assert.Equal(t, testDate2, typedArr.Value(2)) + assert.Equal(t, testDate3, typedArr.Value(3)) + + // storage TimeUnit should be the same as we pass in to the builder, and storage timezone should be UTC + timestampStructField := typedArr.Storage().(*array.Struct).Field(0) + timestampStructDataType := timestampStructField.DataType().(*arrow.TimestampType) + assert.Equal(t, timestampStructDataType.Unit, testTimeUnit) + assert.Equal(t, timestampStructDataType.TimeZone, "UTC") + + // stored values should be equivalent to the raw values in UTC + timestampsArr := timestampStructField.(*array.Timestamp) + assert.Equal(t, testDate1.In(time.UTC), timestampsArr.Value(0).ToTime(testTimeUnit)) + assert.Equal(t, testDate2.In(time.UTC), timestampsArr.Value(2).ToTime(testTimeUnit)) + assert.Equal(t, testDate3.In(time.UTC), timestampsArr.Value(3).ToTime(testTimeUnit)) + + // the array should encode itself as JSON and string + arrStr := arr.String() + assert.Equal(t, fmt.Sprintf(`["%[1]s" (null) "%[2]s" "%[3]s"]`, testDate1, testDate2, testDate3), arrStr) + jsonStr, err := json.Marshal(arr) + assert.NoError(t, err) + + // roundtripping from JSON with array.FromJSON should work + expectedDataType, _ := extensions.NewTimestampWithOffsetType(testTimeUnit, offsetType) + roundtripped, _, err := array.FromJSON(mem, expectedDataType, bytes.NewReader(jsonStr)) + defer roundtripped.Release() + assert.NoError(t, err) + assert.Truef(t, array.Equal(jsonComparisonArr, roundtripped), "expected %s\n\ngot %s", jsonComparisonArr, roundtripped) + } } func TestTimestampWithOffsetExtensionRecordBuilder(t *testing.T) { - schema := arrow.NewSchema([]arrow.Field{ - { - Name: "timestamp_with_offset", - Nullable: true, - Type: extensions.NewTimestampWithOffsetType(testTimeUnit), - }, - }, nil) - builder := array.NewRecordBuilder(memory.DefaultAllocator, schema) - defer builder.Release() - - fieldBuilder := builder.Field(0).(*extensions.TimestampWithOffsetBuilder) - - // append a simple time.Time - fieldBuilder.Append(testDate1) - - // append a null and 2 time.Time all at once - values := []time.Time{ - time.Unix(0, 0).In(time.UTC), - testDate2, - testDate3, - } - valids := []bool{false, true, true} - fieldBuilder.AppendValues(values, valids) + for _, offsetType := range allAllowedOffsetTypes { + dataType, _ := extensions.NewTimestampWithOffsetType(testTimeUnit, offsetType) + schema := arrow.NewSchema([]arrow.Field{ + { + Name: "timestamp_with_offset", + Nullable: true, + Type: dataType, + }, + }, nil) + builder := array.NewRecordBuilder(memory.DefaultAllocator, schema) + defer builder.Release() + + fieldBuilder := builder.Field(0).(*extensions.TimestampWithOffsetBuilder) + + // append a simple time.Time + fieldBuilder.Append(testDate1) + + // append a null and 2 time.Time all at once + values := []time.Time{ + time.Unix(0, 0).In(time.UTC), + testDate2, + testDate3, + } + valids := []bool{false, true, true} + fieldBuilder.AppendValues(values, valids) - // append a value from RFC3339 string - fieldBuilder.AppendValueFromString(testDate1.Format(time.RFC3339)) + // append a value from RFC3339 string + fieldBuilder.AppendValueFromString(testDate1.Format(time.RFC3339)) - // append value formatted in a different string layout - fieldBuilder.Layout = time.RFC3339Nano - fieldBuilder.AppendValueFromString(testDate2.Format(time.RFC3339Nano)) + // append value formatted in a different string layout + fieldBuilder.Layout = time.RFC3339Nano + fieldBuilder.AppendValueFromString(testDate2.Format(time.RFC3339Nano)) - record := builder.NewRecordBatch() + record := builder.NewRecordBatch() - // Record batch should JSON-encode values containing per-row timezone info - json, err := record.MarshalJSON() - require.NoError(t, err) - expect := `[{"timestamp_with_offset":"2025-01-01T00:00:00Z"} + // Record batch should JSON-encode values containing per-row timezone info + json, err := record.MarshalJSON() + require.NoError(t, err) + expect := `[{"timestamp_with_offset":"2025-01-01T00:00:00Z"} ,{"timestamp_with_offset":null} ,{"timestamp_with_offset":"2024-12-31T16:00:00-08:00"} ,{"timestamp_with_offset":"2025-01-01T06:00:00+06:00"} ,{"timestamp_with_offset":"2025-01-01T00:00:00Z"} ,{"timestamp_with_offset":"2024-12-31T16:00:00-08:00"} ]` - require.Equal(t, expect, string(json)) - - // Record batch roundtrip to JSON should work - roundtripped, _, err := array.RecordFromJSON(memory.DefaultAllocator, schema, bytes.NewReader(json)) - require.NoError(t, err) - defer roundtripped.Release() - require.Equal(t, schema, roundtripped.Schema()) - assert.Truef(t, array.RecordEqual(record, roundtripped), "expected %s\n\ngot %s", record, roundtripped) + require.Equal(t, expect, string(json)) + + // Record batch roundtrip to JSON should work + roundtripped, _, err := array.RecordFromJSON(memory.DefaultAllocator, schema, bytes.NewReader(json)) + require.NoError(t, err) + defer roundtripped.Release() + require.Equal(t, schema, roundtripped.Schema()) + assert.Truef(t, array.RecordEqual(record, roundtripped), "expected %s\n\ngot %s", record, roundtripped) + } } func TestTimestampWithOffsetTypeBatchIPCRoundTrip(t *testing.T) { - raw := `["2025-01-01T00:00:00Z",null,"2024-12-31T16:00:00-08:00","2025-01-01T06:00:00+06:00","2025-01-01T00:00:00Z","2024-12-31T16:00:00-08:00"]` - typ := extensions.NewTimestampWithOffsetType(testTimeUnit) + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) - arr, _, err := array.FromJSON(memory.DefaultAllocator, typ, strings.NewReader(raw)) - require.NoError(t, err) - defer arr.Release() + for _, offsetType := range allAllowedOffsetTypes { + builder, _ := extensions.NewTimestampWithOffsetBuilder(mem, testTimeUnit, offsetType) + builder.Append(testDate1) + builder.AppendNull() + builder.Append(testDate2) + builder.Append(testDate3) + arr := builder.NewArray() + defer arr.Release() - batch := array.NewRecordBatch(arrow.NewSchema([]arrow.Field{{Name: "timestamp_with_offset", Type: typ, Nullable: true}}, nil), []arrow.Array{arr}, -1) - defer batch.Release() + typ, _ := extensions.NewTimestampWithOffsetType(testTimeUnit, offsetType) - var written arrow.RecordBatch - { - var buf bytes.Buffer - wr := ipc.NewWriter(&buf, ipc.WithSchema(batch.Schema())) - require.NoError(t, wr.Write(batch)) - require.NoError(t, wr.Close()) + batch := array.NewRecordBatch(arrow.NewSchema([]arrow.Field{{Name: "timestamp_with_offset", Type: typ, Nullable: true}}, nil), []arrow.Array{arr}, -1) + defer batch.Release() - rdr, err := ipc.NewReader(&buf) - require.NoError(t, err) - written, err = rdr.Read() - require.NoError(t, err) - written.Retain() - defer written.Release() - rdr.Release() + var written arrow.RecordBatch + { + var buf bytes.Buffer + wr := ipc.NewWriter(&buf, ipc.WithSchema(batch.Schema())) + require.NoError(t, wr.Write(batch)) + require.NoError(t, wr.Close()) + + rdr, err := ipc.NewReader(&buf) + require.NoError(t, err) + written, err = rdr.Read() + require.NoError(t, err) + written.Retain() + defer written.Release() + rdr.Release() + } + + assert.Truef(t, batch.Schema().Equal(written.Schema()), "expected: %s\n\ngot: %s", + batch.Schema(), written.Schema()) + + assert.Truef(t, array.RecordEqual(batch, written), "expected: %s\n\ngot: %s", + batch, written) } - - assert.Truef(t, batch.Schema().Equal(written.Schema()), "expected: %s\n\ngot: %s", - batch.Schema(), written.Schema()) - - assert.Truef(t, array.RecordEqual(batch, written), "expected: %s\n\ngot: %s", - batch, written) } From bb4ef43a8281480d50bba4a0673de0ddffa5d9b7 Mon Sep 17 00:00:00 2001 From: serramatutu Date: Mon, 22 Dec 2025 14:13:57 +0200 Subject: [PATCH 3/4] Allow run-end encoding of `TimestampWithOffset` --- arrow/extensions/timestamp_with_offset.go | 79 +++++++++++++++++-- .../extensions/timestamp_with_offset_test.go | 52 ++++++++++++ 2 files changed, 126 insertions(+), 5 deletions(-) diff --git a/arrow/extensions/timestamp_with_offset.go b/arrow/extensions/timestamp_with_offset.go index e579fab7..4230640e 100644 --- a/arrow/extensions/timestamp_with_offset.go +++ b/arrow/extensions/timestamp_with_offset.go @@ -19,6 +19,7 @@ package extensions import ( "errors" "fmt" + "math" "reflect" "strings" "time" @@ -41,6 +42,13 @@ func isOffsetTypeOk(offsetType arrow.DataType) bool { return true case *arrow.DictionaryType: return arrow.IsInteger(offsetType.IndexType.ID()) && arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16) + case *arrow.RunEndEncodedType: + return offsetType.ValidRunEndsType(offsetType.RunEnds()) && + arrow.TypeEqual(offsetType.Encoded(), arrow.PrimitiveTypes.Int16) + // FIXME: Technically this should be non-nullable, but a Arrow IPC does not deserialize + // ValueNullable properly, so enforcing this here would always fail when reading from an IPC + // stream + // !offsetType.ValueNullable default: return false } @@ -140,6 +148,21 @@ func NewTimestampWithOffsetTypeDictionaryEncoded(unit arrow.TimeUnit, index arro return NewTimestampWithOffsetType(unit, &offsetType) } +// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the underlying storage type set correctly to +// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=RunEndEncoded(E, Int16)), where T is any TimeUnit and E is a +// valid run-ends type. +// +// The error will be populated if runEnds is not a valid run-end encoding run-ends type. +func NewTimestampWithOffsetTypeRunEndEncoded(unit arrow.TimeUnit, runEnds arrow.DataType) (*TimestampWithOffsetType, error) { + offsetType := arrow.RunEndEncodedOf(runEnds, arrow.PrimitiveTypes.Int16) + if !offsetType.ValidRunEndsType(runEnds) { + return nil, errors.New(fmt.Sprintf("Invalid run-ends type %s", runEnds)) + } + + return NewTimestampWithOffsetType(unit, offsetType) +} + + func (b *TimestampWithOffsetType) ArrayType() reflect.Type { return reflect.TypeOf(TimestampWithOffsetArray{}) } @@ -247,6 +270,8 @@ func (a *TimestampWithOffsetArray) rawValueUnsafe(i int) (arrow.Timestamp, int16 offsetMinutes = offsets.Value(i) case *array.Dictionary: offsetMinutes = offsets.Dictionary().(*array.Int16).Value(offsets.GetValueIndex(i)) + case *array.RunEndEncoded: + offsetMinutes = offsets.Values().(*array.Int16).Value(offsets.GetPhysicalIndex(i)) } return utcTimestamp, offsetMinutes, timeUnit @@ -262,6 +287,7 @@ func (a *TimestampWithOffsetArray) Value(i int) time.Time { func (a *TimestampWithOffsetArray) Values() []time.Time { values := make([]time.Time, a.Len()) + // TODO: optimize for run-end encoding for i := range a.Len() { val := a.Value(i) values[i] = val @@ -280,6 +306,7 @@ func (a *TimestampWithOffsetArray) ValueStr(i int) string { func (a *TimestampWithOffsetArray) MarshalJSON() ([]byte, error) { values := make([]interface{}, a.Len()) + // TODO: optimize for run-end encoding for i := 0; i < a.Len(); i++ { if a.IsValid(i) { utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i) @@ -307,6 +334,8 @@ type TimestampWithOffsetBuilder struct { Layout string unit arrow.TimeUnit offsetType arrow.DataType + // lastOffset is only used to determine when to start new runs with run-end encoded offsets + lastOffset int16 } // NewTimestampWithOffsetBuilder creates a new TimestampWithOffsetBuilder, exposing a convenient and efficient interface @@ -320,6 +349,7 @@ func NewTimestampWithOffsetBuilder(mem memory.Allocator, unit arrow.TimeUnit, of return &TimestampWithOffsetBuilder{ unit: unit, offsetType: offsetType, + lastOffset: math.MaxInt16, Layout: time.RFC3339, ExtensionBuilder: array.NewExtensionBuilder(mem, dataType), }, nil @@ -327,6 +357,7 @@ func NewTimestampWithOffsetBuilder(mem memory.Allocator, unit arrow.TimeUnit, of func (b *TimestampWithOffsetBuilder) Append(v time.Time) { timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) + offsetMinutes16 := int16(offsetMinutes) structBuilder := b.ExtensionBuilder.Builder.(*array.StructBuilder) structBuilder.Append(true) @@ -334,14 +365,25 @@ func (b *TimestampWithOffsetBuilder) Append(v time.Time) { switch offsets := structBuilder.FieldBuilder(1).(type) { case *array.Int16Builder: - offsets.Append(int16(offsetMinutes)) + offsets.Append(offsetMinutes16) case *array.Int16DictionaryBuilder: - offsets.Append(int16(offsetMinutes)) + offsets.Append(offsetMinutes16) + case *array.RunEndEncodedBuilder: + if offsetMinutes != b.lastOffset { + offsets.Append(1) + offsets.ValueBuilder().(*array.Int16Builder).Append(offsetMinutes16) + } else { + offsets.ContinueRun(1) + } + + b.lastOffset = offsetMinutes16 } + } func (b *TimestampWithOffsetBuilder) UnsafeAppend(v time.Time) { timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) + offsetMinutes16 := int16(offsetMinutes) structBuilder := b.ExtensionBuilder.Builder.(*array.StructBuilder) structBuilder.Append(true) @@ -349,9 +391,18 @@ func (b *TimestampWithOffsetBuilder) UnsafeAppend(v time.Time) { switch offsets := structBuilder.FieldBuilder(1).(type) { case *array.Int16Builder: - offsets.UnsafeAppend(int16(offsetMinutes)) + offsets.UnsafeAppend(offsetMinutes16) case *array.Int16DictionaryBuilder: - offsets.Append(int16(offsetMinutes)) + offsets.Append(offsetMinutes16) + case *array.RunEndEncodedBuilder: + if offsetMinutes != b.lastOffset { + offsets.Append(1) + offsets.ValueBuilder().(*array.Int16Builder).Append(offsetMinutes16) + } else { + offsets.ContinueRun(1) + } + + b.lastOffset = offsetMinutes16 } } @@ -399,13 +450,31 @@ func (b *TimestampWithOffsetBuilder) AppendValues(values []time.Time, valids []b timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) if valids[i] { timestamps.UnsafeAppend(timestamp) - // TODO: I was here, this needs to be equivalent to UnsafeAppend offsets.Append(offsetMinutes) } else { timestamps.UnsafeAppendBoolToBitmap(false) offsets.UnsafeAppendBoolToBitmap(false) } } + case *array.RunEndEncodedBuilder: + offsetValuesBuilder := offsets.ValueBuilder().(*array.Int16Builder) + for i, v := range values { + timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) + if valids[i] { + timestamps.UnsafeAppend(timestamp) + offsetMinutes16 := int16(offsetMinutes) + if offsetMinutes != b.lastOffset { + offsets.Append(1) + offsetValuesBuilder.Append(offsetMinutes16) + } else { + offsets.ContinueRun(1) + } + b.lastOffset = offsetMinutes16 + } else { + timestamps.UnsafeAppendBoolToBitmap(false) + offsets.UnsafeAppendBoolToBitmap(false) + } + } } } diff --git a/arrow/extensions/timestamp_with_offset_test.go b/arrow/extensions/timestamp_with_offset_test.go index fc08375c..f7e6c278 100644 --- a/arrow/extensions/timestamp_with_offset_test.go +++ b/arrow/extensions/timestamp_with_offset_test.go @@ -50,6 +50,12 @@ func dict(index arrow.DataType) arrow.DataType { } } +func ree(runEnds arrow.DataType) arrow.DataType { + v := arrow.RunEndEncodedOf(runEnds, arrow.PrimitiveTypes.Int16) + v.ValueNullable = false + return v +} + // All tests use this in a for loop to make sure everything works for every possible // encoding of offsets (primitive, dictionary, run-end) var allAllowedOffsetTypes = []arrow.DataType{ @@ -65,6 +71,11 @@ var allAllowedOffsetTypes = []arrow.DataType{ dict(arrow.PrimitiveTypes.Int16), dict(arrow.PrimitiveTypes.Int32), dict(arrow.PrimitiveTypes.Int64), + + // run-end encoded offsetType + ree(arrow.PrimitiveTypes.Int16), + ree(arrow.PrimitiveTypes.Int32), + ree(arrow.PrimitiveTypes.Int64), } func TestTimestampWithOffsetTypePrimitiveBasics(t *testing.T) { @@ -141,6 +152,47 @@ func TestTimestampWithOffsetTypeDictionaryEncodedBasics(t *testing.T) { } } +func TestTimestampWithOffsetTypeRunEndEncodedBasics(t *testing.T) { + invalidRunEndsType := arrow.PrimitiveTypes.Float32 + _, err := extensions.NewTimestampWithOffsetTypeRunEndEncoded(testTimeUnit, invalidRunEndsType) + assert.True(t, err != nil, "Err should not be nil if run ends type is invalid") + + runEndsTypes := []arrow.DataType{ + arrow.PrimitiveTypes.Int16, + arrow.PrimitiveTypes.Int32, + arrow.PrimitiveTypes.Int64, + }; + + for _, indexType := range runEndsTypes { + typ, err := extensions.NewTimestampWithOffsetTypeRunEndEncoded(testTimeUnit, indexType) + assert.True(t, err == nil, "Err should be nil") + + assert.Equal(t, "arrow.timestamp_with_offset", typ.ExtensionName()) + assert.True(t, typ.ExtensionEquals(typ)) + + assert.True(t, arrow.TypeEqual(typ, typ)) + assert.True(t, arrow.TypeEqual( + arrow.StructOf( + arrow.Field{ + Name: "timestamp", + Type: &arrow.TimestampType{ + Unit: testTimeUnit, + TimeZone: "UTC", + }, + Nullable: false, + }, + arrow.Field{ + Name: "offset_minutes", + Type: ree(indexType), + Nullable: false, + }, + ), + typ.StorageType())) + + assert.Equal(t, "extension", typ.String()) + } +} + func TestTimestampWithOffsetExtensionBuilder(t *testing.T) { mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) From ccdd2886c2532b6593a18fed2e1a20ed6a4d3ea8 Mon Sep 17 00:00:00 2001 From: serramatutu Date: Mon, 22 Dec 2025 16:31:55 +0200 Subject: [PATCH 4/4] Optimize `Values()` and `MarshalJSON()` for REE Smartly iterate over offsets if they're run-end encoded instead of doing a binary search at every iteration. This makes the loops O(n) instead of O(n*logn). --- arrow/extensions/timestamp_with_offset.go | 79 +++++++++++++++++++---- 1 file changed, 65 insertions(+), 14 deletions(-) diff --git a/arrow/extensions/timestamp_with_offset.go b/arrow/extensions/timestamp_with_offset.go index 4230640e..c98885f2 100644 --- a/arrow/extensions/timestamp_with_offset.go +++ b/arrow/extensions/timestamp_with_offset.go @@ -285,13 +285,70 @@ func (a *TimestampWithOffsetArray) Value(i int) time.Time { return timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit) } +// Iterates over the array and calls the callback with the timestamp at each position. If it's null, +// the timestamp will be nil. +// +// This will iterate using the fastest method given the underlying storage array +func (a* TimestampWithOffsetArray) iterValues(callback func(i int, utcTimestamp *time.Time)) { + structs := a.Storage().(*array.Struct) + offsets := structs.Field(1) + if reeOffsets, isRee := offsets.(*array.RunEndEncoded); isRee { + timestampField := structs.Field(0) + timeUnit := timestampField.DataType().(*arrow.TimestampType).Unit + timestamps := timestampField.(*array.Timestamp) + + offsetValues := reeOffsets.Values().(*array.Int16) + offsetPhysicalIdx := 0 + + var getRunEnd (func(int) int) + switch arr := reeOffsets.RunEndsArr().(type) { + case *array.Int16: + getRunEnd = func(idx int) int { return int(arr.Value(idx)) } + case *array.Int32: + getRunEnd = func(idx int) int { return int(arr.Value(idx)) } + case *array.Int64: + getRunEnd = func(idx int) int { return int(arr.Value(idx)) } + } + + for i := 0; i < a.Len(); i++ { + if i >= getRunEnd(offsetPhysicalIdx) { + offsetPhysicalIdx += 1 + } + + timestamp := (*time.Time)(nil) + if a.IsValid(i) { + utcTimestamp := timestamps.Value(i) + offsetMinutes := offsetValues.Value(offsetPhysicalIdx) + v := timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit) + timestamp = &v + } + + callback(i, timestamp) + } + } else { + for i := 0; i < a.Len(); i++ { + timestamp := (*time.Time)(nil) + if a.IsValid(i) { + utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i) + v := timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit) + timestamp = &v + } + + callback(i, timestamp) + } + } +} + + func (a *TimestampWithOffsetArray) Values() []time.Time { values := make([]time.Time, a.Len()) - // TODO: optimize for run-end encoding - for i := range a.Len() { - val := a.Value(i) - values[i] = val - } + a.iterValues(func(i int, timestamp *time.Time) { + if timestamp == nil { + values[i] = time.Unix(0, 0) + } else { + values[i] = *timestamp + } + }) return values } @@ -306,15 +363,9 @@ func (a *TimestampWithOffsetArray) ValueStr(i int) string { func (a *TimestampWithOffsetArray) MarshalJSON() ([]byte, error) { values := make([]interface{}, a.Len()) - // TODO: optimize for run-end encoding - for i := 0; i < a.Len(); i++ { - if a.IsValid(i) { - utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i) - values[i] = timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit) - } else { - values[i] = nil - } - } + a.iterValues(func(i int, timestamp *time.Time) { + values[i] = timestamp + }) return json.Marshal(values) }