From 88142d43e8e76513fc82675d772c391871a47e44 Mon Sep 17 00:00:00 2001 From: Noam Preil Date: Fri, 21 Nov 2025 15:45:52 -0600 Subject: [PATCH 1/9] proof of concept: unify refcounting logic --- arrow/ipc/message.go | 40 +++++++++++----------------------------- 1 file changed, 11 insertions(+), 29 deletions(-) diff --git a/arrow/ipc/message.go b/arrow/ipc/message.go index a2c4e370..6b33f085 100644 --- a/arrow/ipc/message.go +++ b/arrow/ipc/message.go @@ -22,7 +22,6 @@ import ( "io" "sync/atomic" - "github.com/apache/arrow-go/v18/arrow/internal/debug" "github.com/apache/arrow-go/v18/arrow/internal/flatbuf" "github.com/apache/arrow-go/v18/arrow/memory" ) @@ -66,10 +65,10 @@ func (m MessageType) String() string { // Message is an IPC message, including metadata and body. type Message struct { - refCount atomic.Int64 - msg *flatbuf.Message - meta *memory.Buffer - body *memory.Buffer + memory.Refcount + msg *flatbuf.Message + meta *memory.Buffer + body *memory.Buffer } // NewMessage creates a new message from the metadata and body buffers. @@ -85,7 +84,9 @@ func NewMessage(meta, body *memory.Buffer) *Message { meta: meta, body: body, } - m.refCount.Add(1) + m.Refcount.Buffers = []**memory.Buffer{&m.meta, &m.body} + m.Additional = func() { m.msg = nil } + m.Retain() return m } @@ -99,31 +100,12 @@ func newMessageFromFB(meta *flatbuf.Message, body *memory.Buffer) *Message { meta: memory.NewBufferBytes(meta.Table().Bytes), body: body, } - m.refCount.Add(1) + m.Refcount.Buffers = []**memory.Buffer{&m.meta, &m.body} + m.Additional = func() { m.msg = nil } + m.Retain() return m } -// Retain increases the reference count by 1. -// Retain may be called simultaneously from multiple goroutines. -func (msg *Message) Retain() { - msg.refCount.Add(1) -} - -// Release decreases the reference count by 1. -// Release may be called simultaneously from multiple goroutines. -// When the reference count goes to zero, the memory is freed. -func (msg *Message) Release() { - debug.Assert(msg.refCount.Load() > 0, "too many releases") - - if msg.refCount.Add(-1) == 0 { - msg.meta.Release() - msg.body.Release() - msg.msg = nil - msg.meta = nil - msg.body = nil - } -} - func (msg *Message) Version() MetadataVersion { return MetadataVersion(msg.msg.Version()) } @@ -175,7 +157,7 @@ func (r *messageReader) Retain() { // When the reference count goes to zero, the memory is freed. // Release may be called simultaneously from multiple goroutines. func (r *messageReader) Release() { - debug.Assert(r.refCount.Load() > 0, "too many releases") + r.refCount.Load() if r.refCount.Add(-1) == 0 { if r.msg != nil { From fe72cf4bc80e3b2cf5eeae78148466c8cf03d3ea Mon Sep 17 00:00:00 2001 From: Noam Preil Date: Fri, 21 Nov 2025 15:47:43 -0600 Subject: [PATCH 2/9] add refcount file --- arrow/memory/refcount.go | 41 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 arrow/memory/refcount.go diff --git a/arrow/memory/refcount.go b/arrow/memory/refcount.go new file mode 100644 index 00000000..6b88a696 --- /dev/null +++ b/arrow/memory/refcount.go @@ -0,0 +1,41 @@ +package memory + +import ( + "sync/atomic" + + "github.com/apache/arrow-go/v18/arrow/internal/debug" +) + +type Refcount struct { + count atomic.Int64 + Dependencies []**Refcount + Buffers []**Buffer + Additional func() +} + +func (r *Refcount) Retain() { + r.count.Add(1) +} + +func (r *Refcount) Release() { + new := r.count.Add(-1) + if new == 0 { + for _, buffer := range r.Buffers { + (*buffer).Release() + *buffer = nil + } + for _, dependency := range r.Dependencies { + (*dependency).Release() + *dependency = nil + } + r.Buffers = nil + r.Dependencies = nil + if r.Additional != nil { + r.Additional() + } + } else if new < 0 { + // This branch can be optimized out when !debug + // This avoids an unnecessary extra Load + debug.Assert(false, "too many releases") + } +} From c708c6dec9eec00ce2a8a8415189ddd747a64107 Mon Sep 17 00:00:00 2001 From: Noam Preil Date: Fri, 21 Nov 2025 15:50:02 -0600 Subject: [PATCH 3/9] fixup --- arrow/ipc/message.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/arrow/ipc/message.go b/arrow/ipc/message.go index 6b33f085..c08d8963 100644 --- a/arrow/ipc/message.go +++ b/arrow/ipc/message.go @@ -22,6 +22,7 @@ import ( "io" "sync/atomic" + "github.com/apache/arrow-go/v18/arrow/internal/debug" "github.com/apache/arrow-go/v18/arrow/internal/flatbuf" "github.com/apache/arrow-go/v18/arrow/memory" ) @@ -157,7 +158,7 @@ func (r *messageReader) Retain() { // When the reference count goes to zero, the memory is freed. // Release may be called simultaneously from multiple goroutines. func (r *messageReader) Release() { - r.refCount.Load() + debug.Assert(r.refCount.Load() > 0, "too many releases") if r.refCount.Add(-1) == 0 { if r.msg != nil { From 6bd3861e2b10fc042e5fa60962823a103b18729f Mon Sep 17 00:00:00 2001 From: Noam Preil Date: Fri, 21 Nov 2025 16:39:42 -0600 Subject: [PATCH 4/9] switch to []unsafe.Pointer --- arrow/ipc/message.go | 5 +++-- arrow/memory/refcount.go | 7 ++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/arrow/ipc/message.go b/arrow/ipc/message.go index c08d8963..cd59ffd4 100644 --- a/arrow/ipc/message.go +++ b/arrow/ipc/message.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "sync/atomic" + "unsafe" "github.com/apache/arrow-go/v18/arrow/internal/debug" "github.com/apache/arrow-go/v18/arrow/internal/flatbuf" @@ -86,7 +87,7 @@ func NewMessage(meta, body *memory.Buffer) *Message { body: body, } m.Refcount.Buffers = []**memory.Buffer{&m.meta, &m.body} - m.Additional = func() { m.msg = nil } + m.Derived = []unsafe.Pointer{unsafe.Pointer(&m.msg)} m.Retain() return m } @@ -102,7 +103,7 @@ func newMessageFromFB(meta *flatbuf.Message, body *memory.Buffer) *Message { body: body, } m.Refcount.Buffers = []**memory.Buffer{&m.meta, &m.body} - m.Additional = func() { m.msg = nil } + m.Derived = []unsafe.Pointer{unsafe.Pointer(&m.msg)} m.Retain() return m } diff --git a/arrow/memory/refcount.go b/arrow/memory/refcount.go index 6b88a696..e5f63c21 100644 --- a/arrow/memory/refcount.go +++ b/arrow/memory/refcount.go @@ -2,6 +2,7 @@ package memory import ( "sync/atomic" + "unsafe" "github.com/apache/arrow-go/v18/arrow/internal/debug" ) @@ -10,7 +11,7 @@ type Refcount struct { count atomic.Int64 Dependencies []**Refcount Buffers []**Buffer - Additional func() + Derived []unsafe.Pointer } func (r *Refcount) Retain() { @@ -30,8 +31,8 @@ func (r *Refcount) Release() { } r.Buffers = nil r.Dependencies = nil - if r.Additional != nil { - r.Additional() + for _, derived := range r.Derived { + *((*uintptr)(derived)) = 0 } } else if new < 0 { // This branch can be optimized out when !debug From 31d84433bd9040da965d1d5bb5fbeb59a09d1576 Mon Sep 17 00:00:00 2001 From: Noam Preil Date: Fri, 21 Nov 2025 17:03:19 -0600 Subject: [PATCH 5/9] disable refcounting when not used --- arrow/ipc/message.go | 8 ++++---- arrow/memory/refcount.go | 33 +++++++++++++++++++++++++++++++ arrow/memory/refcount_disabled.go | 11 +++++++++++ 3 files changed, 48 insertions(+), 4 deletions(-) create mode 100644 arrow/memory/refcount_disabled.go diff --git a/arrow/ipc/message.go b/arrow/ipc/message.go index cd59ffd4..2583fbf6 100644 --- a/arrow/ipc/message.go +++ b/arrow/ipc/message.go @@ -86,8 +86,8 @@ func NewMessage(meta, body *memory.Buffer) *Message { meta: meta, body: body, } - m.Refcount.Buffers = []**memory.Buffer{&m.meta, &m.body} - m.Derived = []unsafe.Pointer{unsafe.Pointer(&m.msg)} + m.ReferenceBuffer(&m.meta, &m.body) + m.ReferenceDerived(unsafe.Pointer(&m.msg)) m.Retain() return m } @@ -102,8 +102,8 @@ func newMessageFromFB(meta *flatbuf.Message, body *memory.Buffer) *Message { meta: memory.NewBufferBytes(meta.Table().Bytes), body: body, } - m.Refcount.Buffers = []**memory.Buffer{&m.meta, &m.body} - m.Derived = []unsafe.Pointer{unsafe.Pointer(&m.msg)} + m.ReferenceBuffer(&m.meta, &m.body) + m.ReferenceDerived(unsafe.Pointer(&m.msg)) m.Retain() return m } diff --git a/arrow/memory/refcount.go b/arrow/memory/refcount.go index e5f63c21..cd6f868f 100644 --- a/arrow/memory/refcount.go +++ b/arrow/memory/refcount.go @@ -1,3 +1,5 @@ +//go:build refcounting + package memory import ( @@ -14,6 +16,37 @@ type Refcount struct { Derived []unsafe.Pointer } +func (r *Refcount) ReferenceDependency(d ...**Refcount) { + if r.Dependencies == nil { + r.Dependencies = d + } else { + for _, d := range d { + r.Dependencies = append(r.Dependencies, d) + } + } +} + +func (r *Refcount) ReferenceBuffer(b...**Buffer) { + if r.Buffers == nil { + r.Buffers = b + } else { + for _, b := range b { + r.Buffers = append(r.Buffers, b) + } + } +} + + +func (r *Refcount) ReferenceDerived(p ...unsafe.Pointer) { + if r.Derived == nil { + r.Derived = p + } else { + for _, p := range p { + r.Derived = append(r.Derived, p) + } + } +} + func (r *Refcount) Retain() { r.count.Add(1) } diff --git a/arrow/memory/refcount_disabled.go b/arrow/memory/refcount_disabled.go new file mode 100644 index 00000000..fbafea1e --- /dev/null +++ b/arrow/memory/refcount_disabled.go @@ -0,0 +1,11 @@ +//go:build !refcounting + +package memory + +type Refcount struct{} + +func (r *Refcount) ReferenceDependency(d ...any) {} +func (r *Refcount) ReferenceBuffer(b ...any) {} +func (r *Refcount) ReferenceDerived(p ...any) {} +func (r *Refcount) Retain() {} +func (r *Refcount) Release() {} From 9654a4fd6a3717ef0516bcaf0155ccaf326c0273 Mon Sep 17 00:00:00 2001 From: Noam Preil Date: Fri, 21 Nov 2025 17:09:09 -0600 Subject: [PATCH 6/9] better API when refcounting is enabled. --- arrow/memory/refcount.go | 55 ++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 31 deletions(-) diff --git a/arrow/memory/refcount.go b/arrow/memory/refcount.go index cd6f868f..31747772 100644 --- a/arrow/memory/refcount.go +++ b/arrow/memory/refcount.go @@ -11,40 +11,33 @@ import ( type Refcount struct { count atomic.Int64 - Dependencies []**Refcount - Buffers []**Buffer - Derived []unsafe.Pointer + dependencies []**Refcount + buffers []**Buffer + derived []unsafe.Pointer } +// Must only be called once per object. Defines the dependency tree. +// When this object is completely unreferenced, all dependencies will +// be unreferenced by it and, if this was the only object still +// referencing them, they will be freed as well, recursively. func (r *Refcount) ReferenceDependency(d ...**Refcount) { - if r.Dependencies == nil { - r.Dependencies = d - } else { - for _, d := range d { - r.Dependencies = append(r.Dependencies, d) - } - } + r.dependencies = d } -func (r *Refcount) ReferenceBuffer(b...**Buffer) { - if r.Buffers == nil { - r.Buffers = b - } else { - for _, b := range b { - r.Buffers = append(r.Buffers, b) - } - } +// Must only be called once per object. Defines buffers that are referenced +// by this object. When this object is unreferenced, all such buffers will +// be deallocated immediately. +func (r *Refcount) ReferenceBuffer(b ...**Buffer) { + r.buffers = b } - +// Must only be called once per object, with a list of pointers that are +// _derived from_ allocations owned by or referenced by this object. +// When this object is unreferenced, all such pointers will be nilled. +// Note: this needs the _address of_ the pointers to nil, _not_ the pointers +// themselves! func (r *Refcount) ReferenceDerived(p ...unsafe.Pointer) { - if r.Derived == nil { - r.Derived = p - } else { - for _, p := range p { - r.Derived = append(r.Derived, p) - } - } + r.derived = p } func (r *Refcount) Retain() { @@ -54,17 +47,17 @@ func (r *Refcount) Retain() { func (r *Refcount) Release() { new := r.count.Add(-1) if new == 0 { - for _, buffer := range r.Buffers { + for _, buffer := range r.buffers { (*buffer).Release() *buffer = nil } - for _, dependency := range r.Dependencies { + for _, dependency := range r.dependencies { (*dependency).Release() *dependency = nil } - r.Buffers = nil - r.Dependencies = nil - for _, derived := range r.Derived { + r.buffers = nil + r.dependencies = nil + for _, derived := range r.derived { *((*uintptr)(derived)) = 0 } } else if new < 0 { From 75a3cf6dcab7081fd7173fa14aef4dbfc71418e7 Mon Sep 17 00:00:00 2001 From: Noam Preil Date: Fri, 21 Nov 2025 17:51:02 -0600 Subject: [PATCH 7/9] improved API --- arrow/ipc/message.go | 27 +++------------------------ arrow/ipc/reader.go | 4 +++- arrow/memory/refcount.go | 21 ++++++++++++++------- 3 files changed, 20 insertions(+), 32 deletions(-) diff --git a/arrow/ipc/message.go b/arrow/ipc/message.go index 2583fbf6..f98dfb05 100644 --- a/arrow/ipc/message.go +++ b/arrow/ipc/message.go @@ -20,10 +20,8 @@ import ( "encoding/binary" "fmt" "io" - "sync/atomic" "unsafe" - "github.com/apache/arrow-go/v18/arrow/internal/debug" "github.com/apache/arrow-go/v18/arrow/internal/flatbuf" "github.com/apache/arrow-go/v18/arrow/memory" ) @@ -128,9 +126,9 @@ type MessageReader interface { // MessageReader reads messages from an io.Reader. type messageReader struct { + memory.Refcount r io.Reader - refCount atomic.Int64 msg *Message mem memory.Allocator @@ -145,30 +143,11 @@ func NewMessageReader(r io.Reader, opts ...Option) MessageReader { } mr := &messageReader{r: r, mem: cfg.alloc} - mr.refCount.Add(1) + mr.Retain() + mr.ReferenceDependency(unsafe.Pointer(&mr.msg)) return mr } -// Retain increases the reference count by 1. -// Retain may be called simultaneously from multiple goroutines. -func (r *messageReader) Retain() { - r.refCount.Add(1) -} - -// Release decreases the reference count by 1. -// When the reference count goes to zero, the memory is freed. -// Release may be called simultaneously from multiple goroutines. -func (r *messageReader) Release() { - debug.Assert(r.refCount.Load() > 0, "too many releases") - - if r.refCount.Add(-1) == 0 { - if r.msg != nil { - r.msg.Release() - r.msg = nil - } - } -} - // Message returns the current message that has been extracted from the // underlying stream. // It is valid until the next call to Message. diff --git a/arrow/ipc/reader.go b/arrow/ipc/reader.go index df9e5000..a2a4e68f 100644 --- a/arrow/ipc/reader.go +++ b/arrow/ipc/reader.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "sync/atomic" + "unsafe" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" @@ -97,7 +98,8 @@ func NewReader(r io.Reader, opts ...Option) (rr *Reader, err error) { }() cfg := newConfig(opts...) mr := &messageReader{r: r, mem: cfg.alloc} - mr.refCount.Add(1) + mr.Retain() + mr.ReferenceDependency(unsafe.Pointer(&mr.msg)) rr = &Reader{ r: mr, refCount: atomic.Int64{}, diff --git a/arrow/memory/refcount.go b/arrow/memory/refcount.go index 31747772..c795a289 100644 --- a/arrow/memory/refcount.go +++ b/arrow/memory/refcount.go @@ -11,7 +11,7 @@ import ( type Refcount struct { count atomic.Int64 - dependencies []**Refcount + dependencies []unsafe.Pointer buffers []**Buffer derived []unsafe.Pointer } @@ -20,7 +20,7 @@ type Refcount struct { // When this object is completely unreferenced, all dependencies will // be unreferenced by it and, if this was the only object still // referencing them, they will be freed as well, recursively. -func (r *Refcount) ReferenceDependency(d ...**Refcount) { +func (r *Refcount) ReferenceDependency(d ...unsafe.Pointer) { r.dependencies = d } @@ -31,9 +31,8 @@ func (r *Refcount) ReferenceBuffer(b ...**Buffer) { r.buffers = b } -// Must only be called once per object, with a list of pointers that are -// _derived from_ allocations owned by or referenced by this object. -// When this object is unreferenced, all such pointers will be nilled. +// Must only be called once per object, with a list of pointers that need to be +// cleared when the object becomes unreferenced. // Note: this needs the _address of_ the pointers to nil, _not_ the pointers // themselves! func (r *Refcount) ReferenceDerived(p ...unsafe.Pointer) { @@ -52,8 +51,16 @@ func (r *Refcount) Release() { *buffer = nil } for _, dependency := range r.dependencies { - (*dependency).Release() - *dependency = nil + ptr := (*unsafe.Pointer)(dependency) + if *ptr != nil { + // Ptr should be a **T, where T has a Refcount + // embedded at the front. + // So, if *ptr != nil, we should be able to cast *ptr + // to a *Refcount. + rc := (*Refcount)(*ptr) + rc.Release() + *ptr = nil + } } r.buffers = nil r.dependencies = nil From 1da3cb12574aa847365b088555508d7f83d4106b Mon Sep 17 00:00:00 2001 From: Noam Preil Date: Fri, 21 Nov 2025 17:54:24 -0600 Subject: [PATCH 8/9] default to refcounting on --- arrow/memory/refcount.go | 2 +- arrow/memory/refcount_disabled.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow/memory/refcount.go b/arrow/memory/refcount.go index c795a289..0fb533d7 100644 --- a/arrow/memory/refcount.go +++ b/arrow/memory/refcount.go @@ -1,4 +1,4 @@ -//go:build refcounting +//go:build !norc package memory diff --git a/arrow/memory/refcount_disabled.go b/arrow/memory/refcount_disabled.go index fbafea1e..69a2d3af 100644 --- a/arrow/memory/refcount_disabled.go +++ b/arrow/memory/refcount_disabled.go @@ -1,4 +1,4 @@ -//go:build !refcounting +//go:build norc package memory From 5889b53e2005794fc33b6e12abab0c90515fe524 Mon Sep 17 00:00:00 2001 From: Noam Preil Date: Mon, 24 Nov 2025 14:54:41 -0600 Subject: [PATCH 9/9] go fmt --- arrow/ipc/message.go | 2 +- arrow/memory/refcount_disabled.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/arrow/ipc/message.go b/arrow/ipc/message.go index f98dfb05..1727fa45 100644 --- a/arrow/ipc/message.go +++ b/arrow/ipc/message.go @@ -129,7 +129,7 @@ type messageReader struct { memory.Refcount r io.Reader - msg *Message + msg *Message mem memory.Allocator header [4]byte diff --git a/arrow/memory/refcount_disabled.go b/arrow/memory/refcount_disabled.go index 69a2d3af..6c6b38ac 100644 --- a/arrow/memory/refcount_disabled.go +++ b/arrow/memory/refcount_disabled.go @@ -5,7 +5,7 @@ package memory type Refcount struct{} func (r *Refcount) ReferenceDependency(d ...any) {} -func (r *Refcount) ReferenceBuffer(b ...any) {} -func (r *Refcount) ReferenceDerived(p ...any) {} -func (r *Refcount) Retain() {} -func (r *Refcount) Release() {} +func (r *Refcount) ReferenceBuffer(b ...any) {} +func (r *Refcount) ReferenceDerived(p ...any) {} +func (r *Refcount) Retain() {} +func (r *Refcount) Release() {}