diff --git a/docs/src/manual.md b/docs/src/manual.md index cb7e433b..158fc343 100644 --- a/docs/src/manual.md +++ b/docs/src/manual.md @@ -46,6 +46,14 @@ using Arrow table = Arrow.Table("data.arrow") ``` +Optionally the `filtercolumns` keyword argument will select which columns to load: + +``` +table = Arrow.Table("data.arrow"; filtercolumns = ["col1", "col3"]) +``` + +This can be a significant performance improvement when the data is compressed and some columns are not needed. + ### `Arrow.Table` The type of `table` in this example will be an `Arrow.Table`. When "reading" the arrow data, `Arrow.Table` first ["mmapped"](https://en.wikipedia.org/wiki/Mmap) the `data.arrow` file, which is an important technique for dealing with data larger than available RAM on a system. By "mmapping" a file, the OS doesn't actually load the entire file contents into RAM at the same time, but file contents are "swapped" into RAM as different regions of a file are requested. Once "mmapped", `Arrow.Table` then inspected the metadata in the file to determine the number of columns, their names and types, at which byte offset each column begins in the file data, and even how many "batches" are included in this file (arrow tables may be partitioned into one or more "record batches" each containing portions of the data). Armed with all the appropriate metadata, `Arrow.Table` then created custom array objects ([`Arrow.ArrowVector`](@ref)), which act as "views" into the raw arrow memory bytes. This is a significant point in that no extra memory is allocated for "data" when reading arrow data. This is in contrast to if we wanted to read data from a csv file as columns into Julia structures; we would need to allocate those array structures ourselves, then parse the file, "filling in" each element of the array with the data we parsed from the file. Arrow data, on the other hand, is *already laid out in memory or on disk* in a binary format, and as long as we have the metadata to interpret the raw bytes, we can figure out whether to treat those bytes as a `Vector{Float64}`, etc. A sample of the kinds of arrow array types you might see when deserializing arrow data, include: diff --git a/src/table.jl b/src/table.jl index 882a99b1..71bb7eb5 100644 --- a/src/table.jl +++ b/src/table.jl @@ -74,9 +74,14 @@ mutable struct Stream dictencoded::Dict{Int64,Meta.Field} # dictionary id => field convert::Bool compression::Ref{Union{Symbol,Nothing}} + filtercolumns::Union{Nothing,Vector{String}} end -function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true) +function Stream( + inputs::Vector{ArrowBlob}; + convert::Bool=true, + filtercolumns::Union{Nothing,Vector{String}}=nothing, +) inputindex = 1 batchiterator = nothing names = Symbol[] @@ -96,6 +101,7 @@ function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true) dictencoded, convert, compression, + filtercolumns, ) end @@ -187,6 +193,7 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0)) # assert endianness? # store custom_metadata? for (i, field) in enumerate(x.schema.fields) + isnothing(x.filtercolumns) || field.name in x.filtercolumns || continue push!(x.names, Symbol(field.name)) push!( x.types, @@ -255,7 +262,8 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0)) if header.compression !== nothing compression = header.compression end - for vec in VectorIterator(x.schema, batch, x.dictencodings, x.convert) + for vec in + vectoriterator(x.schema, batch, x.dictencodings, x.convert; x.filtercolumns) push!(columns, vec) end break @@ -277,6 +285,7 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0)) lookup = Dict{Symbol,AbstractVector}() types = Type[] for (nm, col) in zip(x.names, columns) + isnothing(x.filtercolumns) || String(nm) in x.filtercolumns || continue lookup[nm] = col push!(types, eltype(col)) end @@ -412,7 +421,11 @@ Table(inputs::Vector; kw...) = Table([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...) # will detect whether we're reading a Table from a file or stream -function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) +function Table( + blobs::Vector{ArrowBlob}; + convert::Bool=true, + filtercolumns::Union{Nothing,Vector{String}}=nothing, +) t = Table() sch = nothing dictencodings = Dict{Int64,DictEncoding}() # dictionary id => DictEncoding @@ -448,6 +461,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) # store custom_metadata? if sch === nothing for (i, field) in enumerate(header.fields) + isnothing(filtercolumns) || field.name in filtercolumns || continue push!(names(t), Symbol(field.name)) # recursively find any dictionaries for any fields getdictionaries!(dictencoded, field) @@ -469,6 +483,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) if haskey(dictencodings, id) && header.isDelta # delta field = dictencoded[id] + isnothing(filtercolumns) || field.name in filtercolumns || continue values, _, _ = build( field, field.type, @@ -523,7 +538,9 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) anyrecordbatches = true @debugv 1 "parsing record batch message: compression = $(header.compression)" @wkspawn begin - cols = collect(VectorIterator(sch, $batch, dictencodings, convert)) + cols = collect( + vectoriterator(sch, $batch, dictencodings, convert; filtercolumns), + ) put!(() -> put!(tsks, cols), sync, $(rbi)) end rbi += 1 @@ -539,11 +556,13 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) # 158; some implementations may send 0 record batches if !anyrecordbatches && !isnothing(sch) for field in sch.fields + isnothing(filtercolumns) || field.name in filtercolumns || continue T = juliaeltype(field, buildmetadata(field), convert) push!(columns(t), T[]) end end for (nm, col) in zip(names(t), columns(t)) + isnothing(filtercolumns) || String(nm) in filtercolumns || continue lu[nm] = col push!(ty, eltype(col)) end @@ -607,6 +626,13 @@ function Base.iterate(x::BatchIterator, (pos, id)=(x.startpos, 0)) return Batch(msg, x.bytes, pos, id), (pos + msg.bodyLength, id + 1) end +function vectoriterator(schema, batch, de, convert; filtercolumns=nothing) + ( + isnothing(filtercolumns) ? VectorIterator(schema, batch, de, convert) : + FilteredVectorIterator(schema, batch, de, convert, filtercolumns) + ) +end + struct VectorIterator schema::Meta.Schema batch::Batch # batch.msg.header MUST BE RecordBatch @@ -642,6 +668,50 @@ end Base.length(x::VectorIterator) = length(x.schema.fields) +struct FilteredVectorIterator + schema::Meta.Schema + batch::Batch # batch.msg.header MUST BE RecordBatch + dictencodings::Dict{Int64,DictEncoding} + convert::Bool + filtercolumns::Vector{String} +end + +Base.IteratorSize(::Type{FilteredVectorIterator}) = Base.SizeUnknown() + +function _nextstate(x::FilteredVectorIterator, state) + columnidx, nodeidx, bufferidx = state + columnidx > length(x.schema.fields) && return + field = x.schema.fields[columnidx] + while !(field.name in x.filtercolumns) + nodeidx, bufferidx = Arrow._step(field, nodeidx, bufferidx) + columnidx += 1 + columnidx > length(x.schema.fields) && return + field = x.schema.fields[columnidx] + end + (columnidx, nodeidx, bufferidx) +end + +function Base.iterate(x::FilteredVectorIterator, state=(Int64(1), Int64(1), Int64(1))) + state = _nextstate(x, state) + isnothing(state) && return + (columnidx, nodeidx, bufferidx) = state + field = x.schema.fields[columnidx] + @debugv 2 "building top-level column: field = $(field), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx" + A, nodeidx, bufferidx = build( + field, + x.batch, + x.batch.msg.header, + x.dictencodings, + nodeidx, + bufferidx, + x.convert, + ) + @debugv 2 "built top-level column: A = $(typeof(A)), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx" + @debugv 3 A + columnidx += 1 + return A, (columnidx, nodeidx, bufferidx) +end + const ListTypes = Union{Meta.Utf8,Meta.LargeUtf8,Meta.Binary,Meta.LargeBinary,Meta.List,Meta.LargeList} const LargeLists = Union{Meta.LargeUtf8,Meta.LargeBinary,Meta.LargeList} @@ -924,3 +994,74 @@ function build(f::Meta.Field, L::Meta.Bool, batch, rb, de, nodeidx, bufferidx, c T = juliaeltype(f, meta, convert) return BoolVector{T}(decodedbytes, pos, validity, len, meta), nodeidx + 1, bufferidx + 1 end + +_step(::Meta.Field, ::L, nodeidx, bufferidx) where {L} = nodeidx + 1, bufferidx + 2 +_step(::Meta.Field, ::Meta.Bool, nodeidx, bufferidx) = nodeidx + 1, bufferidx + 2 + +function _step(field::Meta.Field, nodeidx, bufferidx) + if field.dictionary !== nothing + return nodeidx + 1, bufferidx + 2 + else + return _step(field, field.type, nodeidx, bufferidx) + end +end + +function _step(f::Meta.Field, L::ListTypes, nodeidx, bufferidx) + bufferidx += 2 + nodeidx += 1 + if L isa Meta.Utf8 || + L isa Meta.LargeUtf8 || + L isa Meta.Binary || + L isa Meta.LargeBinary + bufferidx += 1 + else + nodeidx, bufferidx = _step(f.children[1], nodeidx, bufferidx) + end + nodeidx, bufferidx +end + +function _step( + f::Meta.Field, + L::Union{Meta.FixedSizeBinary,Meta.FixedSizeList}, + nodeidx, + bufferidx, +) + bufferidx += 1 + nodeidx += 1 + if L isa Meta.FixedSizeBinary + bufferidx += 1 + else + nodeidx, bufferidx = _step(f.children[1], nodeidx, bufferidx) + end + nodeidx, bufferidx +end + +function _step(f::Meta.Field, ::Meta.Map, nodeidx, bufferidx) + bufferidx += 2 + nodeidx += 1 + nodeidx, bufferidx = _step(f.children[1], nodeidx, bufferidx) + nodeidx, bufferidx +end + +function _step(f::Meta.Field, ::Meta.Struct, nodeidx, bufferidx) + bufferidx += 1 + nodeidx += 1 + for child in f.children + nodeidx, bufferidx = _step(child, nodeidx, bufferidx) + end + nodeidx, bufferidx +end + +function _step(f::Meta.Field, L::Meta.Union, nodeidx, bufferidx) + bufferidx += 1 + if L.mode == Meta.UnionModes.Dense + bufferidx += 1 + end + nodeidx += 1 + for child in f.children + nodeidx, bufferidx = _step(child, nodeidx, bufferidx) + end + nodeidx, bufferidx +end + +_step(::Meta.Field, ::Meta.Null, nodeidx, bufferidx) = nodeidx + 1, bufferidx