From 71a3ba77dd530df9b7b18c6a63dcc81fe3eea15d Mon Sep 17 00:00:00 2001 From: J S <49557684+svilupp@users.noreply.github.com> Date: Sat, 11 Mar 2023 19:06:34 +0000 Subject: [PATCH 1/9] speed hacks for Arrow Arrow project - added deps for InlineStrings Arrow.Table - added kwarg useinlinestrings to load strings as InlineStrings whenever possible Arrow.write - added chunking as a default (ala PyArrow), kwarg chunksize Compression - added thread-safe implementation with locks for decompression, added locks for compression TranscodingStreams - added inplace mutating transcode to avoid unnecessary resizing --- Project.toml | 2 +- src/Arrow.jl | 28 +++++++++++++++++++++ src/arraytypes/arraytypes.jl | 10 ++++++-- src/dirtyhacks.jl | 47 ++++++++++++++++++++++++++++++++++++ src/inlinestrings.jl | 42 ++++++++++++++++++++++++++++++++ src/table.jl | 20 ++++++++++++--- src/write.jl | 18 +++++++++++--- 7 files changed, 157 insertions(+), 10 deletions(-) create mode 100644 src/dirtyhacks.jl create mode 100644 src/inlinestrings.jl diff --git a/Project.toml b/Project.toml index 6a100113..3126c019 100644 --- a/Project.toml +++ b/Project.toml @@ -42,8 +42,8 @@ BitIntegers = "0.2" CodecLz4 = "0.4" CodecZstd = "0.7" DataAPI = "1" -LoggingExtras = "0.4, 1" FilePathsBase = "0.9" +LoggingExtras = "0.4, 1" PooledArrays = "0.5, 1.0" SentinelArrays = "1" Tables = "1.1" diff --git a/src/Arrow.jl b/src/Arrow.jl index 01c7993c..0ab28e70 100644 --- a/src/Arrow.jl +++ b/src/Arrow.jl @@ -71,8 +71,23 @@ include("write.jl") include("append.jl") include("show.jl") +using InlineStrings +export InlineString, inlinestrings +include("inlinestrings.jl") + +const TS = CodecLz4.TranscodingStreams +include("dirtyhacks.jl") + + const LZ4_FRAME_COMPRESSOR = LZ4FrameCompressor[] const ZSTD_COMPRESSOR = ZstdCompressor[] +const LZ4_FRAME_DECOMPRESSOR = LZ4FrameDecompressor[] +const ZSTD_DECOMPRESSOR = ZstdDecompressor[] +# add locks for multithreaded (de/)compression (because we index by threadid which might not be safe under Julia >1.8) +const LZ4_FRAME_COMPRESSOR_LOCK = ReentrantLock[] +const ZSTD_COMPRESSOR_LOCK = ReentrantLock[] +const LZ4_FRAME_DECOMPRESSOR_LOCK = ReentrantLock[] +const ZSTD_DECOMPRESSOR_LOCK = ReentrantLock[] function __init__() for _ = 1:Threads.nthreads() @@ -82,6 +97,19 @@ function __init__() lz4 = LZ4FrameCompressor(; compressionlevel=4) CodecLz4.TranscodingStreams.initialize(lz4) push!(LZ4_FRAME_COMPRESSOR, lz4) + # Locks + push!(LZ4_FRAME_COMPRESSOR_LOCK, ReentrantLock()) + push!(ZSTD_COMPRESSOR_LOCK, ReentrantLock()) + # Decompressors + zstdd = ZstdDecompressor() + CodecZstd.TranscodingStreams.initialize(zstdd) + push!(ZSTD_DECOMPRESSOR, zstdd) + lz4d = LZ4FrameDecompressor() + CodecLz4.TranscodingStreams.initialize(lz4d) + push!(LZ4_FRAME_DECOMPRESSOR, lz4d) + # Locks + push!(LZ4_FRAME_DECOMPRESSOR_LOCK, ReentrantLock()) + push!(ZSTD_DECOMPRESSOR_LOCK, ReentrantLock()) end return end diff --git a/src/arraytypes/arraytypes.jl b/src/arraytypes/arraytypes.jl index 355a0341..60ec6c84 100644 --- a/src/arraytypes/arraytypes.jl +++ b/src/arraytypes/arraytypes.jl @@ -38,11 +38,17 @@ function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], meta=g if compression isa LZ4FrameCompressor A = compress(Meta.CompressionTypes.LZ4_FRAME, compression, A) elseif compression isa Vector{LZ4FrameCompressor} - A = compress(Meta.CompressionTypes.LZ4_FRAME, compression[Threads.threadid()], A) + tid=Threads.threadid() + A = lock(LZ4_FRAME_DECOMPRESSOR_LOCK[tid]) do + compress(Meta.CompressionTypes.LZ4_FRAME, compression[tid], A) + end elseif compression isa ZstdCompressor A = compress(Meta.CompressionTypes.ZSTD, compression, A) elseif compression isa Vector{ZstdCompressor} - A = compress(Meta.CompressionTypes.ZSTD, compression[Threads.threadid()], A) + tid=Threads.threadid() + A = lock(ZSTD_DECOMPRESSOR_LOCK[tid]) do + compress(Meta.CompressionTypes.ZSTD, compression[tid], A) + end end @debugv 2 "converted top-level column to arrow format: $(typeof(A))" @debugv 3 A diff --git a/src/dirtyhacks.jl b/src/dirtyhacks.jl new file mode 100644 index 00000000..6955dbe6 --- /dev/null +++ b/src/dirtyhacks.jl @@ -0,0 +1,47 @@ +# The following change needs to be upstreamed to TranscodingStreams.jl: + +# We know the size of each output buffer (saved within Arrow metadata) +# The below functions mutates the provided output buffer. +function _transcode!(codec::Union{LZ4FrameDecompressor,LZ4FrameDecompressor}, data::TS.ByteData,output::TS.Buffer) + input = TS.Buffer(data) + error = TS.Error() + code = TS.startproc(codec, :write, error) + if code === :error + @goto error + end + # n = TS.minoutsize(codec, buffermem(input)) + @label process + # makemargin!(output, n) + Δin, Δout, code = TS.process(codec, TS.buffermem(input), TS.marginmem(output), error) + @debug( + "called process()", + code = code, + input_size = buffersize(input), + output_size = marginsize(output), + input_delta = Δin, + output_delta = Δout, + ) + TS.consumed!(input, Δin) + TS.supplied!(output, Δout) + if code === :error + @goto error + elseif code === :end + if TS.buffersize(input) > 0 + if TS.startproc(codec, :write, error) === :error + @goto error + end + # n = minoutsize(codec, buffermem(input)) + @goto process + end + resize!(output.data, output.marginpos - 1) + return output.data + else + # n = max(Δout, minoutsize(codec, buffermem(input))) + @goto process + end + @label error + if !(TS.haserror)(error) + TS.set_default_error!(error) + end + throw(error[]) +end \ No newline at end of file diff --git a/src/inlinestrings.jl b/src/inlinestrings.jl new file mode 100644 index 00000000..05da05d8 --- /dev/null +++ b/src/inlinestrings.jl @@ -0,0 +1,42 @@ +# This code should be moved into InlineStrings extensions + +### Type extensions +# Use InlineStrings to get data from pointers (for getindex and similar) +ArrowTypes.fromarrow(::Type{T}, ptr::Ptr{UInt8}, len::Int) where {T<:InlineString} = ArrowTypes.fromarrow(T, T(ptr, len)) + +### Utilities for inlining strings +# determines the maximum string length necessary for the offsets +# calculates difference between offsets as a proxy for string size +function _maximum_diff(v::AbstractVector{<:Integer}) + mx = first(v) + prev = mx + @inbounds if length(v) > 1 + mx = max(mx, v[2] - prev) + prev = v[2] + for i in firstindex(v)+2:lastindex(v) + diff = v[i] - prev + mx < diff && (mx = diff) + prev = v[i] + end + end + mx +end + +# passthrough for non-strings +pick_string_type(::Type{T}, offsets) where {T} = T +pick_string_type(::Union{Missing,T}, offsets::AbstractVector{<:Integer}) where {T<:AbstractString} = Union{Missing,pick_string_type(T, offsets)} +function pick_string_type(::Type{T}, offsets::AbstractVector{<:Integer}) where {T<:AbstractString} + max_size = _maximum_diff(offsets) + # if the maximum string length is less than 255, we can use InlineStrings + return max_size < 255 ? InlineStringType(max_size) : T +end +# extend inlinestrings to pass through Arrow.Lists +_inlinestrings(vect::AbstractVector) = vect +# if it's already an InlineString, we can pass it through +_inlinestrings(vect::Arrow.List{T,O,A}) where {T<:InlineString,O,A} = vect +# if we detect that the strings are small enough, we can inline them +function _inlinestrings(vect::Arrow.List{T,O,A}) where {T<:AbstractString,O,A} + S = pick_string_type(T, vect.offsets.offsets) + # reconstruct the Arrow.List with the new string type + Arrow.List{S,O,A}(vect.arrow, vect.validity, vect.offsets, vect.data, vect.ℓ, vect.metadata) +end diff --git a/src/table.jl b/src/table.jl index c4a2e3d6..ed477e18 100644 --- a/src/table.jl +++ b/src/table.jl @@ -270,7 +270,7 @@ Table(input::Vector{UInt8}, pos::Integer=1, len=nothing; kw...) = Table([ArrowBl Table(inputs::Vector; kw...) = Table([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...) # will detect whether we're reading a Table from a file or stream -function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) +function Table(blobs::Vector{ArrowBlob}; convert::Bool=true, useinlinestrings::Bool=true) t = Table() sch = nothing dictencodings = Dict{Int64, DictEncoding}() # dictionary id => DictEncoding @@ -362,6 +362,10 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) end close(tsks) wait(tsk) + + # apply inlinestrings to each column if requested + convert && useinlinestrings && (columns(t) .= _inlinestrings.(columns(t))) + lu = lookup(t) ty = types(t) # 158; some implementations may send 0 record batches @@ -499,10 +503,20 @@ function uncompress(ptr::Ptr{UInt8}, buffer, compression) len = unsafe_load(convert(Ptr{Int64}, ptr)) ptr += 8 # skip past uncompressed length as Int64 encodedbytes = unsafe_wrap(Array, ptr, buffer.length - 8) + tid=Threads.threadid() + output=TS.Buffer(len) # pre-allocate for output if compression.codec === Meta.CompressionTypes.LZ4_FRAME - decodedbytes = transcode(LZ4FrameDecompressor, encodedbytes) + # decodedbytes = transcode(LZ4FrameDecompressor, encodedbytes) + lock(LZ4_FRAME_DECOMPRESSOR_LOCK[tid]) do + _transcode!(LZ4_FRAME_DECOMPRESSOR[tid], encodedbytes, output) + end + decodedbytes = output.data elseif compression.codec === Meta.CompressionTypes.ZSTD - decodedbytes = transcode(ZstdDecompressor, encodedbytes) + # decodedbytes = transcode(ZstdDecompressor, encodedbytes) + lock(ZSTD_DECOMPRESSOR_LOCK[tid]) do + _transcode!(ZSTD_DECOMPRESSOR[tid], encodedbytes, output) + end + decodedbytes = output.data else error("unsupported compression type when reading arrow buffers: $(typeof(compression.codec))") end diff --git a/src/write.jl b/src/write.jl index 5c965cac..77f90bd7 100644 --- a/src/write.jl +++ b/src/write.jl @@ -53,9 +53,14 @@ function write end write(io_or_file; kw...) = x -> write(io_or_file, x; kw...) -function write(file_path, tbl; kwargs...) +function write(file_path, tbl; chunksize::Union{Nothing,Integer}=64000, kwargs...) + if !isnothing(chunksize) && Tables.istable(tbl) + tbl_source = Iterators.partition(Tables.rows(tbl),chunksize) + else + tbl_source = tbl + end open(Writer, file_path; file=true, kwargs...) do writer - write(writer, tbl) + write(writer, tbl_source) end file_path end @@ -278,9 +283,14 @@ function Base.close(writer::Writer) nothing end -function write(io::IO, tbl; kwargs...) +function write(io::IO, tbl; chunksize::Union{Nothing,Integer}=64000, kwargs...) + if !isnothing(chunksize) && Tables.istable(tbl) + tbl_source = Iterators.partition(Tables.rows(tbl),chunksize) + else + tbl_source = tbl + end open(Writer, io; file=false, kwargs...) do writer - write(writer, tbl) + write(writer, tbl_source) end io end From e15f9d6ff9f8a3374fc7636d8837d6b3b666ff96 Mon Sep 17 00:00:00 2001 From: J S <49557684+svilupp@users.noreply.github.com> Date: Sat, 11 Mar 2023 19:22:08 +0000 Subject: [PATCH 2/9] add ZstdDecompressor to transcode! --- src/dirtyhacks.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dirtyhacks.jl b/src/dirtyhacks.jl index 6955dbe6..f4a982e6 100644 --- a/src/dirtyhacks.jl +++ b/src/dirtyhacks.jl @@ -2,7 +2,7 @@ # We know the size of each output buffer (saved within Arrow metadata) # The below functions mutates the provided output buffer. -function _transcode!(codec::Union{LZ4FrameDecompressor,LZ4FrameDecompressor}, data::TS.ByteData,output::TS.Buffer) +function _transcode!(codec::Union{LZ4FrameDecompressor,ZstdDecompressor}, data::TS.ByteData,output::TS.Buffer) input = TS.Buffer(data) error = TS.Error() code = TS.startproc(codec, :write, error) From 6caea63eb1f3140b610f22b3f308c9f746fc9b1b Mon Sep 17 00:00:00 2001 From: J S <49557684+svilupp@users.noreply.github.com> Date: Sat, 11 Mar 2023 19:22:35 +0000 Subject: [PATCH 3/9] added deps --- Project.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Project.toml b/Project.toml index 3126c019..a6636e6d 100644 --- a/Project.toml +++ b/Project.toml @@ -27,6 +27,7 @@ CodecLz4 = "5ba52731-8f18-5e0d-9241-30f10d1ec561" CodecZstd = "6b39b394-51ab-5f42-8807-6242bab2b4c2" DataAPI = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a" Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" +InlineStrings = "842dd82b-1e85-43dc-bf29-5d0ee9dffc48" LoggingExtras = "e6f89c97-d47a-5376-807f-9c37f3926c36" Mmap = "a63ad114-7e13-5084-954f-fe012c677804" PooledArrays = "2dfb63ee-cc39-5dd5-95bd-886bf059d720" From cc5dad8a8ffa85001c5c03002829cb497d22ceea Mon Sep 17 00:00:00 2001 From: J S <49557684+svilupp@users.noreply.github.com> Date: Sat, 11 Mar 2023 20:03:20 +0000 Subject: [PATCH 4/9] updated inlinestrings to catch chainedvectors (moved deeper in) --- src/inlinestrings.jl | 4 ++-- src/table.jl | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/inlinestrings.jl b/src/inlinestrings.jl index 05da05d8..11a5a47d 100644 --- a/src/inlinestrings.jl +++ b/src/inlinestrings.jl @@ -24,7 +24,7 @@ end # passthrough for non-strings pick_string_type(::Type{T}, offsets) where {T} = T -pick_string_type(::Union{Missing,T}, offsets::AbstractVector{<:Integer}) where {T<:AbstractString} = Union{Missing,pick_string_type(T, offsets)} +pick_string_type(::Type{Union{Missing,T}}, offsets::AbstractVector{<:Integer}) where {T<:AbstractString} = Union{Missing,pick_string_type(T, offsets)} function pick_string_type(::Type{T}, offsets::AbstractVector{<:Integer}) where {T<:AbstractString} max_size = _maximum_diff(offsets) # if the maximum string length is less than 255, we can use InlineStrings @@ -35,7 +35,7 @@ _inlinestrings(vect::AbstractVector) = vect # if it's already an InlineString, we can pass it through _inlinestrings(vect::Arrow.List{T,O,A}) where {T<:InlineString,O,A} = vect # if we detect that the strings are small enough, we can inline them -function _inlinestrings(vect::Arrow.List{T,O,A}) where {T<:AbstractString,O,A} +function _inlinestrings(vect::Arrow.List{T,O,A}) where {T<:Union{AbstractString,Union{AbstractString,Missing}},O,A} S = pick_string_type(T, vect.offsets.offsets) # reconstruct the Arrow.List with the new string type Arrow.List{S,O,A}(vect.arrow, vect.validity, vect.offsets, vect.data, vect.ℓ, vect.metadata) diff --git a/src/table.jl b/src/table.jl index ed477e18..1ce0ff5f 100644 --- a/src/table.jl +++ b/src/table.jl @@ -352,6 +352,8 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true, useinlinestrings::B @debugv 1 "parsing record batch message: compression = $(header.compression)" Threads.@spawn begin cols = collect(VectorIterator(sch, $batch, dictencodings, convert)) + # apply inlinestrings to each column if requested + convert && useinlinestrings && (cols .= _inlinestrings.(cols)) put!(() -> put!(tsks, cols), sync, $(rbi)) end rbi += 1 @@ -362,10 +364,6 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true, useinlinestrings::B end close(tsks) wait(tsk) - - # apply inlinestrings to each column if requested - convert && useinlinestrings && (columns(t) .= _inlinestrings.(columns(t))) - lu = lookup(t) ty = types(t) # 158; some implementations may send 0 record batches From 2148cc9b8702402f3a71157eb6e6b00fceafc854 Mon Sep 17 00:00:00 2001 From: J S <49557684+svilupp@users.noreply.github.com> Date: Sat, 11 Mar 2023 20:37:47 +0000 Subject: [PATCH 5/9] extend inlinestring fromarrow to apply to missing type union --- src/inlinestrings.jl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/inlinestrings.jl b/src/inlinestrings.jl index 11a5a47d..458cf951 100644 --- a/src/inlinestrings.jl +++ b/src/inlinestrings.jl @@ -2,7 +2,8 @@ ### Type extensions # Use InlineStrings to get data from pointers (for getindex and similar) -ArrowTypes.fromarrow(::Type{T}, ptr::Ptr{UInt8}, len::Int) where {T<:InlineString} = ArrowTypes.fromarrow(T, T(ptr, len)) +ArrowTypes.fromarrow(::Type{T}, ptr::Ptr{UInt8}, len::Int) where {T<:InlineString} =ArrowTypes.fromarrow(T, T(ptr, len)) +ArrowTypes.fromarrow(::Type{Union{T,Missing}}, ptr::Ptr{UInt8}, len::Int) where {T<:InlineString} =ArrowTypes.fromarrow(T, T(ptr, len)) ### Utilities for inlining strings # determines the maximum string length necessary for the offsets From d4a3ef9dedf158e137c9898603877ca4d68c24bc Mon Sep 17 00:00:00 2001 From: J S <49557684+svilupp@users.noreply.github.com> Date: Sun, 12 Mar 2023 11:19:44 +0000 Subject: [PATCH 6/9] add support for ChainedVector and move inlining to the outer loop (safer for type consistency)) --- src/inlinestrings.jl | 49 +++++++++++++++++++++++++++++++++++++++++++- src/table.jl | 4 ++-- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/src/inlinestrings.jl b/src/inlinestrings.jl index 458cf951..8e51eead 100644 --- a/src/inlinestrings.jl +++ b/src/inlinestrings.jl @@ -22,17 +22,64 @@ function _maximum_diff(v::AbstractVector{<:Integer}) end mx end +# extract offsets from Arrow.List +_offsetsints(arrowlist::Arrow.List) = arrowlist.offsets.offsets + +# convert strings to InlineStrings, does not check validity (hence unsafe!) - simply swaps the type +function _unsafe_convert(::Type{Arrow.List{S, O, A}}, vect::Arrow.List{T,O,A}) where {S<:Union{InlineString, Union{Missing, InlineString}},T<:Union{AbstractString, Union{Missing, AbstractString}},O,A} + Arrow.List{S,O,A}(vect.arrow, vect.validity, vect.offsets, vect.data, vect.ℓ, vect.metadata) +end # passthrough for non-strings pick_string_type(::Type{T}, offsets) where {T} = T -pick_string_type(::Type{Union{Missing,T}}, offsets::AbstractVector{<:Integer}) where {T<:AbstractString} = Union{Missing,pick_string_type(T, offsets)} +pick_string_type(::Type{Union{Missing,T}}, offsets) where {T<:AbstractString} = Union{Missing,pick_string_type(T, offsets)} function pick_string_type(::Type{T}, offsets::AbstractVector{<:Integer}) where {T<:AbstractString} max_size = _maximum_diff(offsets) # if the maximum string length is less than 255, we can use InlineStrings return max_size < 255 ? InlineStringType(max_size) : T end +# find one joint string type for all chained arrays - vector of vectors +function pick_string_type(::Type{T}, vectoffsets::AbstractVector{<:AbstractVector{<:Integer}}) where {T<:AbstractString} + max_size = _maximum_diff.(vectoffsets)|>maximum + # if the maximum string length is less than 255, we can use InlineStrings + return max_size < 255 ? InlineStringType(max_size) : T +end + # extend inlinestrings to pass through Arrow.Lists _inlinestrings(vect::AbstractVector) = vect + +## methods for SentinelArrays.ChainedVector (if we have many RecordBatches / partitions) +# if it's already an InlineString, we can pass it through +_inlinestrings(vect::SentinelArrays.ChainedVector{<:Union{T,Union{T,Missing}}}) where {T<:InlineString} = vect +# if we detect a String type, try to inline it -- we need to find one unified type across all chained arrays +function _inlinestrings(vectofvect::SentinelArrays.ChainedVector{T, Arrow.List{T,O,A}}) where {T<:Union{AbstractString,Union{Missing,AbstractString}},O,A} + # find the smallest common denominator string type for all chained arrays + S = pick_string_type(T, _offsetsints.(vectofvect.arrays)) + @info "" S T + if S == T + # if the type is the same, we can pass it through + return vectofvect + else + # otherwise, we need to reconstruct the ChainedVector with the new string type + # TODO: look into in-place conversion + return SentinelArrays.ChainedVector(_unsafe_convert.(Arrow.List{S,O,A},vectofvect.arrays)) + end +end + +# TODO: handle ChainedVector that contains something else than Arrow.List with Strings + +# if we detect that the strings are small enough, we can inline them +function _inlinestrings(vect::Arrow.List{T,O,A}) where {T<:Union{AbstractString,Union{AbstractString,Missing}},O,A} + S = pick_string_type(T, _offsetsints(vect)) + if S == T + return vect + else + # reconstruct the Arrow.List with the new string type + return _unsafe_convert(Arrow.List{S,O,A}, vect) + end +end + +## methods for Arrow.List (if we have only 1 RecordBatch, ie, unpartitioned) # if it's already an InlineString, we can pass it through _inlinestrings(vect::Arrow.List{T,O,A}) where {T<:InlineString,O,A} = vect # if we detect that the strings are small enough, we can inline them diff --git a/src/table.jl b/src/table.jl index 1ce0ff5f..4f88bb3a 100644 --- a/src/table.jl +++ b/src/table.jl @@ -352,8 +352,6 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true, useinlinestrings::B @debugv 1 "parsing record batch message: compression = $(header.compression)" Threads.@spawn begin cols = collect(VectorIterator(sch, $batch, dictencodings, convert)) - # apply inlinestrings to each column if requested - convert && useinlinestrings && (cols .= _inlinestrings.(cols)) put!(() -> put!(tsks, cols), sync, $(rbi)) end rbi += 1 @@ -364,6 +362,8 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true, useinlinestrings::B end close(tsks) wait(tsk) + # apply inlinestrings to each column if requested + convert && useinlinestrings && (columns(t) .= _inlinestrings.(columns(t))) lu = lookup(t) ty = types(t) # 158; some implementations may send 0 record batches From 966287a42556cdf42f762e29d20f659b5ce490e0 Mon Sep 17 00:00:00 2001 From: J S <49557684+svilupp@users.noreply.github.com> Date: Sun, 12 Mar 2023 11:25:29 +0000 Subject: [PATCH 7/9] remove loose debug statement --- src/inlinestrings.jl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/inlinestrings.jl b/src/inlinestrings.jl index 8e51eead..02de432d 100644 --- a/src/inlinestrings.jl +++ b/src/inlinestrings.jl @@ -55,7 +55,6 @@ _inlinestrings(vect::SentinelArrays.ChainedVector{<:Union{T,Union{T,Missing}}}) function _inlinestrings(vectofvect::SentinelArrays.ChainedVector{T, Arrow.List{T,O,A}}) where {T<:Union{AbstractString,Union{Missing,AbstractString}},O,A} # find the smallest common denominator string type for all chained arrays S = pick_string_type(T, _offsetsints.(vectofvect.arrays)) - @info "" S T if S == T # if the type is the same, we can pass it through return vectofvect @@ -66,7 +65,7 @@ function _inlinestrings(vectofvect::SentinelArrays.ChainedVector{T, Arrow.List{T end end -# TODO: handle ChainedVector that contains something else than Arrow.List with Strings +# TODO: check that we handle ChainedVector that contains something else than Arrow.List with Strings # if we detect that the strings are small enough, we can inline them function _inlinestrings(vect::Arrow.List{T,O,A}) where {T<:Union{AbstractString,Union{AbstractString,Missing}},O,A} From 153b82a4447681386fecd1af881d153b57b5a2a0 Mon Sep 17 00:00:00 2001 From: J S <49557684+svilupp@users.noreply.github.com> Date: Sun, 12 Mar 2023 13:46:46 +0000 Subject: [PATCH 8/9] fix lock name typo --- src/arraytypes/arraytypes.jl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/arraytypes/arraytypes.jl b/src/arraytypes/arraytypes.jl index 60ec6c84..686d1e36 100644 --- a/src/arraytypes/arraytypes.jl +++ b/src/arraytypes/arraytypes.jl @@ -35,18 +35,17 @@ function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], meta=g @debugv 2 "converting top-level column to arrow format: col = $(typeof(x)), compression = $compression, kw = $(kw.data)" @debugv 3 x A = arrowvector(x, i, 0, 0, de, ded, meta; compression=compression, kw...) + tid=Threads.threadid() if compression isa LZ4FrameCompressor A = compress(Meta.CompressionTypes.LZ4_FRAME, compression, A) elseif compression isa Vector{LZ4FrameCompressor} - tid=Threads.threadid() - A = lock(LZ4_FRAME_DECOMPRESSOR_LOCK[tid]) do + A = lock(LZ4_FRAME_COMPRESSOR_LOCK[tid]) do compress(Meta.CompressionTypes.LZ4_FRAME, compression[tid], A) end elseif compression isa ZstdCompressor A = compress(Meta.CompressionTypes.ZSTD, compression, A) elseif compression isa Vector{ZstdCompressor} - tid=Threads.threadid() - A = lock(ZSTD_DECOMPRESSOR_LOCK[tid]) do + A = lock(ZSTD_COMPRESSOR_LOCK[tid]) do compress(Meta.CompressionTypes.ZSTD, compression[tid], A) end end From b476043c58f24c656e6923a477e1447274dafd6e Mon Sep 17 00:00:00 2001 From: J S <49557684+svilupp@users.noreply.github.com> Date: Mon, 13 Mar 2023 07:59:31 +0000 Subject: [PATCH 9/9] add compat for InlineStrings --- Project.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Project.toml b/Project.toml index a6636e6d..bfe105a0 100644 --- a/Project.toml +++ b/Project.toml @@ -44,6 +44,7 @@ CodecLz4 = "0.4" CodecZstd = "0.7" DataAPI = "1" FilePathsBase = "0.9" +InlineStrings = "1.4" LoggingExtras = "0.4, 1" PooledArrays = "0.5, 1.0" SentinelArrays = "1"