diff --git a/Project.toml b/Project.toml index 6a100113..bfe105a0 100644 --- a/Project.toml +++ b/Project.toml @@ -27,6 +27,7 @@ CodecLz4 = "5ba52731-8f18-5e0d-9241-30f10d1ec561" CodecZstd = "6b39b394-51ab-5f42-8807-6242bab2b4c2" DataAPI = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a" Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" +InlineStrings = "842dd82b-1e85-43dc-bf29-5d0ee9dffc48" LoggingExtras = "e6f89c97-d47a-5376-807f-9c37f3926c36" Mmap = "a63ad114-7e13-5084-954f-fe012c677804" PooledArrays = "2dfb63ee-cc39-5dd5-95bd-886bf059d720" @@ -42,8 +43,9 @@ BitIntegers = "0.2" CodecLz4 = "0.4" CodecZstd = "0.7" DataAPI = "1" -LoggingExtras = "0.4, 1" FilePathsBase = "0.9" +InlineStrings = "1.4" +LoggingExtras = "0.4, 1" PooledArrays = "0.5, 1.0" SentinelArrays = "1" Tables = "1.1" diff --git a/src/Arrow.jl b/src/Arrow.jl index 01c7993c..0ab28e70 100644 --- a/src/Arrow.jl +++ b/src/Arrow.jl @@ -71,8 +71,23 @@ include("write.jl") include("append.jl") include("show.jl") +using InlineStrings +export InlineString, inlinestrings +include("inlinestrings.jl") + +const TS = CodecLz4.TranscodingStreams +include("dirtyhacks.jl") + + const LZ4_FRAME_COMPRESSOR = LZ4FrameCompressor[] const ZSTD_COMPRESSOR = ZstdCompressor[] +const LZ4_FRAME_DECOMPRESSOR = LZ4FrameDecompressor[] +const ZSTD_DECOMPRESSOR = ZstdDecompressor[] +# add locks for multithreaded (de/)compression (because we index by threadid which might not be safe under Julia >1.8) +const LZ4_FRAME_COMPRESSOR_LOCK = ReentrantLock[] +const ZSTD_COMPRESSOR_LOCK = ReentrantLock[] +const LZ4_FRAME_DECOMPRESSOR_LOCK = ReentrantLock[] +const ZSTD_DECOMPRESSOR_LOCK = ReentrantLock[] function __init__() for _ = 1:Threads.nthreads() @@ -82,6 +97,19 @@ function __init__() lz4 = LZ4FrameCompressor(; compressionlevel=4) CodecLz4.TranscodingStreams.initialize(lz4) push!(LZ4_FRAME_COMPRESSOR, lz4) + # Locks + push!(LZ4_FRAME_COMPRESSOR_LOCK, ReentrantLock()) + push!(ZSTD_COMPRESSOR_LOCK, ReentrantLock()) + # Decompressors + zstdd = ZstdDecompressor() + CodecZstd.TranscodingStreams.initialize(zstdd) + push!(ZSTD_DECOMPRESSOR, zstdd) + lz4d = LZ4FrameDecompressor() + CodecLz4.TranscodingStreams.initialize(lz4d) + push!(LZ4_FRAME_DECOMPRESSOR, lz4d) + # Locks + push!(LZ4_FRAME_DECOMPRESSOR_LOCK, ReentrantLock()) + push!(ZSTD_DECOMPRESSOR_LOCK, ReentrantLock()) end return end diff --git a/src/arraytypes/arraytypes.jl b/src/arraytypes/arraytypes.jl index 355a0341..686d1e36 100644 --- a/src/arraytypes/arraytypes.jl +++ b/src/arraytypes/arraytypes.jl @@ -35,14 +35,19 @@ function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], meta=g @debugv 2 "converting top-level column to arrow format: col = $(typeof(x)), compression = $compression, kw = $(kw.data)" @debugv 3 x A = arrowvector(x, i, 0, 0, de, ded, meta; compression=compression, kw...) + tid=Threads.threadid() if compression isa LZ4FrameCompressor A = compress(Meta.CompressionTypes.LZ4_FRAME, compression, A) elseif compression isa Vector{LZ4FrameCompressor} - A = compress(Meta.CompressionTypes.LZ4_FRAME, compression[Threads.threadid()], A) + A = lock(LZ4_FRAME_COMPRESSOR_LOCK[tid]) do + compress(Meta.CompressionTypes.LZ4_FRAME, compression[tid], A) + end elseif compression isa ZstdCompressor A = compress(Meta.CompressionTypes.ZSTD, compression, A) elseif compression isa Vector{ZstdCompressor} - A = compress(Meta.CompressionTypes.ZSTD, compression[Threads.threadid()], A) + A = lock(ZSTD_COMPRESSOR_LOCK[tid]) do + compress(Meta.CompressionTypes.ZSTD, compression[tid], A) + end end @debugv 2 "converted top-level column to arrow format: $(typeof(A))" @debugv 3 A diff --git a/src/dirtyhacks.jl b/src/dirtyhacks.jl new file mode 100644 index 00000000..f4a982e6 --- /dev/null +++ b/src/dirtyhacks.jl @@ -0,0 +1,47 @@ +# The following change needs to be upstreamed to TranscodingStreams.jl: + +# We know the size of each output buffer (saved within Arrow metadata) +# The below functions mutates the provided output buffer. +function _transcode!(codec::Union{LZ4FrameDecompressor,ZstdDecompressor}, data::TS.ByteData,output::TS.Buffer) + input = TS.Buffer(data) + error = TS.Error() + code = TS.startproc(codec, :write, error) + if code === :error + @goto error + end + # n = TS.minoutsize(codec, buffermem(input)) + @label process + # makemargin!(output, n) + Δin, Δout, code = TS.process(codec, TS.buffermem(input), TS.marginmem(output), error) + @debug( + "called process()", + code = code, + input_size = buffersize(input), + output_size = marginsize(output), + input_delta = Δin, + output_delta = Δout, + ) + TS.consumed!(input, Δin) + TS.supplied!(output, Δout) + if code === :error + @goto error + elseif code === :end + if TS.buffersize(input) > 0 + if TS.startproc(codec, :write, error) === :error + @goto error + end + # n = minoutsize(codec, buffermem(input)) + @goto process + end + resize!(output.data, output.marginpos - 1) + return output.data + else + # n = max(Δout, minoutsize(codec, buffermem(input))) + @goto process + end + @label error + if !(TS.haserror)(error) + TS.set_default_error!(error) + end + throw(error[]) +end \ No newline at end of file diff --git a/src/inlinestrings.jl b/src/inlinestrings.jl new file mode 100644 index 00000000..02de432d --- /dev/null +++ b/src/inlinestrings.jl @@ -0,0 +1,89 @@ +# This code should be moved into InlineStrings extensions + +### Type extensions +# Use InlineStrings to get data from pointers (for getindex and similar) +ArrowTypes.fromarrow(::Type{T}, ptr::Ptr{UInt8}, len::Int) where {T<:InlineString} =ArrowTypes.fromarrow(T, T(ptr, len)) +ArrowTypes.fromarrow(::Type{Union{T,Missing}}, ptr::Ptr{UInt8}, len::Int) where {T<:InlineString} =ArrowTypes.fromarrow(T, T(ptr, len)) + +### Utilities for inlining strings +# determines the maximum string length necessary for the offsets +# calculates difference between offsets as a proxy for string size +function _maximum_diff(v::AbstractVector{<:Integer}) + mx = first(v) + prev = mx + @inbounds if length(v) > 1 + mx = max(mx, v[2] - prev) + prev = v[2] + for i in firstindex(v)+2:lastindex(v) + diff = v[i] - prev + mx < diff && (mx = diff) + prev = v[i] + end + end + mx +end +# extract offsets from Arrow.List +_offsetsints(arrowlist::Arrow.List) = arrowlist.offsets.offsets + +# convert strings to InlineStrings, does not check validity (hence unsafe!) - simply swaps the type +function _unsafe_convert(::Type{Arrow.List{S, O, A}}, vect::Arrow.List{T,O,A}) where {S<:Union{InlineString, Union{Missing, InlineString}},T<:Union{AbstractString, Union{Missing, AbstractString}},O,A} + Arrow.List{S,O,A}(vect.arrow, vect.validity, vect.offsets, vect.data, vect.ℓ, vect.metadata) +end + +# passthrough for non-strings +pick_string_type(::Type{T}, offsets) where {T} = T +pick_string_type(::Type{Union{Missing,T}}, offsets) where {T<:AbstractString} = Union{Missing,pick_string_type(T, offsets)} +function pick_string_type(::Type{T}, offsets::AbstractVector{<:Integer}) where {T<:AbstractString} + max_size = _maximum_diff(offsets) + # if the maximum string length is less than 255, we can use InlineStrings + return max_size < 255 ? InlineStringType(max_size) : T +end +# find one joint string type for all chained arrays - vector of vectors +function pick_string_type(::Type{T}, vectoffsets::AbstractVector{<:AbstractVector{<:Integer}}) where {T<:AbstractString} + max_size = _maximum_diff.(vectoffsets)|>maximum + # if the maximum string length is less than 255, we can use InlineStrings + return max_size < 255 ? InlineStringType(max_size) : T +end + +# extend inlinestrings to pass through Arrow.Lists +_inlinestrings(vect::AbstractVector) = vect + +## methods for SentinelArrays.ChainedVector (if we have many RecordBatches / partitions) +# if it's already an InlineString, we can pass it through +_inlinestrings(vect::SentinelArrays.ChainedVector{<:Union{T,Union{T,Missing}}}) where {T<:InlineString} = vect +# if we detect a String type, try to inline it -- we need to find one unified type across all chained arrays +function _inlinestrings(vectofvect::SentinelArrays.ChainedVector{T, Arrow.List{T,O,A}}) where {T<:Union{AbstractString,Union{Missing,AbstractString}},O,A} + # find the smallest common denominator string type for all chained arrays + S = pick_string_type(T, _offsetsints.(vectofvect.arrays)) + if S == T + # if the type is the same, we can pass it through + return vectofvect + else + # otherwise, we need to reconstruct the ChainedVector with the new string type + # TODO: look into in-place conversion + return SentinelArrays.ChainedVector(_unsafe_convert.(Arrow.List{S,O,A},vectofvect.arrays)) + end +end + +# TODO: check that we handle ChainedVector that contains something else than Arrow.List with Strings + +# if we detect that the strings are small enough, we can inline them +function _inlinestrings(vect::Arrow.List{T,O,A}) where {T<:Union{AbstractString,Union{AbstractString,Missing}},O,A} + S = pick_string_type(T, _offsetsints(vect)) + if S == T + return vect + else + # reconstruct the Arrow.List with the new string type + return _unsafe_convert(Arrow.List{S,O,A}, vect) + end +end + +## methods for Arrow.List (if we have only 1 RecordBatch, ie, unpartitioned) +# if it's already an InlineString, we can pass it through +_inlinestrings(vect::Arrow.List{T,O,A}) where {T<:InlineString,O,A} = vect +# if we detect that the strings are small enough, we can inline them +function _inlinestrings(vect::Arrow.List{T,O,A}) where {T<:Union{AbstractString,Union{AbstractString,Missing}},O,A} + S = pick_string_type(T, vect.offsets.offsets) + # reconstruct the Arrow.List with the new string type + Arrow.List{S,O,A}(vect.arrow, vect.validity, vect.offsets, vect.data, vect.ℓ, vect.metadata) +end diff --git a/src/table.jl b/src/table.jl index c4a2e3d6..4f88bb3a 100644 --- a/src/table.jl +++ b/src/table.jl @@ -270,7 +270,7 @@ Table(input::Vector{UInt8}, pos::Integer=1, len=nothing; kw...) = Table([ArrowBl Table(inputs::Vector; kw...) = Table([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...) # will detect whether we're reading a Table from a file or stream -function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) +function Table(blobs::Vector{ArrowBlob}; convert::Bool=true, useinlinestrings::Bool=true) t = Table() sch = nothing dictencodings = Dict{Int64, DictEncoding}() # dictionary id => DictEncoding @@ -362,6 +362,8 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) end close(tsks) wait(tsk) + # apply inlinestrings to each column if requested + convert && useinlinestrings && (columns(t) .= _inlinestrings.(columns(t))) lu = lookup(t) ty = types(t) # 158; some implementations may send 0 record batches @@ -499,10 +501,20 @@ function uncompress(ptr::Ptr{UInt8}, buffer, compression) len = unsafe_load(convert(Ptr{Int64}, ptr)) ptr += 8 # skip past uncompressed length as Int64 encodedbytes = unsafe_wrap(Array, ptr, buffer.length - 8) + tid=Threads.threadid() + output=TS.Buffer(len) # pre-allocate for output if compression.codec === Meta.CompressionTypes.LZ4_FRAME - decodedbytes = transcode(LZ4FrameDecompressor, encodedbytes) + # decodedbytes = transcode(LZ4FrameDecompressor, encodedbytes) + lock(LZ4_FRAME_DECOMPRESSOR_LOCK[tid]) do + _transcode!(LZ4_FRAME_DECOMPRESSOR[tid], encodedbytes, output) + end + decodedbytes = output.data elseif compression.codec === Meta.CompressionTypes.ZSTD - decodedbytes = transcode(ZstdDecompressor, encodedbytes) + # decodedbytes = transcode(ZstdDecompressor, encodedbytes) + lock(ZSTD_DECOMPRESSOR_LOCK[tid]) do + _transcode!(ZSTD_DECOMPRESSOR[tid], encodedbytes, output) + end + decodedbytes = output.data else error("unsupported compression type when reading arrow buffers: $(typeof(compression.codec))") end diff --git a/src/write.jl b/src/write.jl index 5c965cac..77f90bd7 100644 --- a/src/write.jl +++ b/src/write.jl @@ -53,9 +53,14 @@ function write end write(io_or_file; kw...) = x -> write(io_or_file, x; kw...) -function write(file_path, tbl; kwargs...) +function write(file_path, tbl; chunksize::Union{Nothing,Integer}=64000, kwargs...) + if !isnothing(chunksize) && Tables.istable(tbl) + tbl_source = Iterators.partition(Tables.rows(tbl),chunksize) + else + tbl_source = tbl + end open(Writer, file_path; file=true, kwargs...) do writer - write(writer, tbl) + write(writer, tbl_source) end file_path end @@ -278,9 +283,14 @@ function Base.close(writer::Writer) nothing end -function write(io::IO, tbl; kwargs...) +function write(io::IO, tbl; chunksize::Union{Nothing,Integer}=64000, kwargs...) + if !isnothing(chunksize) && Tables.istable(tbl) + tbl_source = Iterators.partition(Tables.rows(tbl),chunksize) + else + tbl_source = tbl + end open(Writer, io; file=false, kwargs...) do writer - write(writer, tbl) + write(writer, tbl_source) end io end