From 8d2e4182116cf028ccf2d1bf5daa984c1f42fd60 Mon Sep 17 00:00:00 2001 From: kyle Date: Thu, 30 Oct 2025 01:41:45 +0000 Subject: [PATCH] add option to read custom metadata from record batch message in ipc reader --- arrow/ipc/ipc.go | 8 ++++++++ arrow/ipc/reader.go | 25 +++++++++++++++++++++++-- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/arrow/ipc/ipc.go b/arrow/ipc/ipc.go index c4589da6..35e7945a 100644 --- a/arrow/ipc/ipc.go +++ b/arrow/ipc/ipc.go @@ -72,6 +72,7 @@ type config struct { noAutoSchema bool emitDictDeltas bool minSpaceSavings *float64 + readCustomMetadata bool } func newConfig(opts ...Option) *config { @@ -93,6 +94,13 @@ func newConfig(opts ...Option) *config { // and streams. type Option func(*config) +// WithCustomRecordBatchMetadata allows returning custom metadata for RecordBatch. +func WithCustomRecordBatchMetadata(cm bool) Option { + return func(cfg *config) { + cfg.readCustomMetadata = cm + } +} + // WithFooterOffset specifies the Arrow footer position in bytes. func WithFooterOffset(offset int64) Option { return func(cfg *config) { diff --git a/arrow/ipc/reader.go b/arrow/ipc/reader.go index 9d7096c0..d70f001b 100644 --- a/arrow/ipc/reader.go +++ b/arrow/ipc/reader.go @@ -41,6 +41,7 @@ type Reader struct { refCount atomic.Int64 rec arrow.RecordBatch + meta *arrow.Metadata err error // types dictTypeMap @@ -50,6 +51,7 @@ type Reader struct { swapEndianness bool ensureNativeEndian bool expectedSchema *arrow.Schema + readCustomMetadata bool mem memory.Allocator } @@ -76,6 +78,7 @@ func NewReaderFromMessageReader(r MessageReader, opts ...Option) (reader *Reader mem: cfg.alloc, ensureNativeEndian: cfg.ensureNativeEndian, expectedSchema: cfg.schema, + readCustomMetadata: cfg.readCustomMetadata, } rr.refCount.Add(1) @@ -170,6 +173,9 @@ func (r *Reader) Next() bool { r.rec.Release() r.rec = nil } + if r.meta != nil { + r.meta = nil + } if r.err != nil || r.done { return false @@ -251,7 +257,14 @@ func (r *Reader) next() bool { r.err = fmt.Errorf("arrow/ipc: invalid message type (got=%v, want=%v", got, want) return false } - + if r.readCustomMetadata { + rootMsg := flatbuf.GetRootAsMessage(msg.meta.Bytes(), 0) + meta, err := metadataFromFB(rootMsg) + if err != nil { + panic(err) + } + r.meta = &meta + } r.rec = newRecordBatch(r.schema, &r.memo, msg.meta, msg.body, r.swapEndianness, r.mem) return true } @@ -263,6 +276,12 @@ func (r *Reader) RecordBatch() arrow.RecordBatch { return r.rec } +// RecordBatchCustomMetadata returns the current record batch custom metadata from the +// underlying stream. +func (r *Reader) RecordBatchCustomMetadata() (*arrow.Metadata, error) { + return r.meta, nil +} + // Record returns the current record that has been extracted from the // underlying stream. // It is valid until the next call to Next. @@ -279,7 +298,9 @@ func (r *Reader) Read() (arrow.RecordBatch, error) { r.rec.Release() r.rec = nil } - + if r.meta != nil { + r.meta = nil + } if !r.next() { if r.done && r.err == nil { return nil, io.EOF