Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions arrow/ipc/ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type config struct {
noAutoSchema bool
emitDictDeltas bool
minSpaceSavings *float64
readCustomMetadata bool
}

func newConfig(opts ...Option) *config {
Expand All @@ -93,6 +94,13 @@ func newConfig(opts ...Option) *config {
// and streams.
type Option func(*config)

// WithCustomRecordBatchMetadata allows returning custom metadata for RecordBatch.
func WithCustomRecordBatchMetadata(cm bool) Option {
return func(cfg *config) {
cfg.readCustomMetadata = cm
}
}

// WithFooterOffset specifies the Arrow footer position in bytes.
func WithFooterOffset(offset int64) Option {
return func(cfg *config) {
Expand Down
25 changes: 23 additions & 2 deletions arrow/ipc/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Reader struct {

refCount atomic.Int64
rec arrow.RecordBatch
meta *arrow.Metadata
err error

// types dictTypeMap
Expand All @@ -50,6 +51,7 @@ type Reader struct {
swapEndianness bool
ensureNativeEndian bool
expectedSchema *arrow.Schema
readCustomMetadata bool

mem memory.Allocator
}
Expand All @@ -76,6 +78,7 @@ func NewReaderFromMessageReader(r MessageReader, opts ...Option) (reader *Reader
mem: cfg.alloc,
ensureNativeEndian: cfg.ensureNativeEndian,
expectedSchema: cfg.schema,
readCustomMetadata: cfg.readCustomMetadata,
}
rr.refCount.Add(1)

Expand Down Expand Up @@ -170,6 +173,9 @@ func (r *Reader) Next() bool {
r.rec.Release()
r.rec = nil
}
if r.meta != nil {
r.meta = nil
}

if r.err != nil || r.done {
return false
Expand Down Expand Up @@ -251,7 +257,14 @@ func (r *Reader) next() bool {
r.err = fmt.Errorf("arrow/ipc: invalid message type (got=%v, want=%v", got, want)
return false
}

if r.readCustomMetadata {
rootMsg := flatbuf.GetRootAsMessage(msg.meta.Bytes(), 0)
meta, err := metadataFromFB(rootMsg)
if err != nil {
panic(err)
}
r.meta = &meta
}
r.rec = newRecordBatch(r.schema, &r.memo, msg.meta, msg.body, r.swapEndianness, r.mem)
return true
}
Expand All @@ -263,6 +276,12 @@ func (r *Reader) RecordBatch() arrow.RecordBatch {
return r.rec
}

// RecordBatchCustomMetadata returns the current record batch custom metadata from the
// underlying stream.
func (r *Reader) RecordBatchCustomMetadata() (*arrow.Metadata, error) {
return r.meta, nil
}

// Record returns the current record that has been extracted from the
// underlying stream.
// It is valid until the next call to Next.
Expand All @@ -279,7 +298,9 @@ func (r *Reader) Read() (arrow.RecordBatch, error) {
r.rec.Release()
r.rec = nil
}

if r.meta != nil {
r.meta = nil
}
if !r.next() {
if r.done && r.err == nil {
return nil, io.EOF
Expand Down
Loading