diff --git a/src/arraytypes/dictencoding.jl b/src/arraytypes/dictencoding.jl index fe48304..84d5105 100644 --- a/src/arraytypes/dictencoding.jl +++ b/src/arraytypes/dictencoding.jl @@ -142,6 +142,8 @@ function arrowvector( kw..., ) id = x.encoding.id + # XXX This is a race condition if two workers hit this block at the same time, then they'll create + # distinct locks if !haskey(de, id) de[id] = Lockable(x.encoding) else @@ -215,6 +217,8 @@ function arrowvector( x = x.data len = length(x) validity = ValidityBitmap(x) + # XXX This is a race condition if two workers hit this block at the same time, then they'll create + # distinct locks if !haskey(de, id) # dict encoding doesn't exist yet, so create for 1st time if DataAPI.refarray(x) === x || DataAPI.refpool(x) === nothing diff --git a/src/write.jl b/src/write.jl index 1f1bfd1..4c3800f 100644 --- a/src/write.jl +++ b/src/write.jl @@ -295,47 +295,49 @@ function write(writer::Writer, source) recbatchmsg = makerecordbatchmsg(writer.schema[], cols, writer.alignment) put!(writer.msgs, recbatchmsg) else - if writer.threaded - @wkspawn process_partition( - tblcols, - writer.dictencodings, - writer.largelists, - writer.compress, - writer.denseunions, - writer.dictencode, - writer.dictencodenested, - writer.maxdepth, - writer.sync, - writer.msgs, - writer.alignment, - $(writer.partition_count), - writer.schema, - writer.errorref, - writer.anyerror, - writer.meta, - writer.colmeta, - ) - else - @async process_partition( - tblcols, - writer.dictencodings, - writer.largelists, - writer.compress, - writer.denseunions, - writer.dictencode, - writer.dictencodenested, - writer.maxdepth, - writer.sync, - writer.msgs, - writer.alignment, - $(writer.partition_count), - writer.schema, - writer.errorref, - writer.anyerror, - writer.meta, - writer.colmeta, - ) - end + # XXX There is a race condition in the processing of dict encodings + # so we disable multithreaded writing until that can be addressed. See #582 + # if writer.threaded + # @wkspawn process_partition( + # tblcols, + # writer.dictencodings, + # writer.largelists, + # writer.compress, + # writer.denseunions, + # writer.dictencode, + # writer.dictencodenested, + # writer.maxdepth, + # writer.sync, + # writer.msgs, + # writer.alignment, + # $(writer.partition_count), + # writer.schema, + # writer.errorref, + # writer.anyerror, + # writer.meta, + # writer.colmeta, + # ) + # else + @async process_partition( + tblcols, + writer.dictencodings, + writer.largelists, + writer.compress, + writer.denseunions, + writer.dictencode, + writer.dictencodenested, + writer.maxdepth, + writer.sync, + writer.msgs, + writer.alignment, + $(writer.partition_count), + writer.schema, + writer.errorref, + writer.anyerror, + writer.meta, + writer.colmeta, + ) + # end end writer.partition_count += 1 end diff --git a/test/Project.toml b/test/Project.toml index 93977a9..1079926 100644 --- a/test/Project.toml +++ b/test/Project.toml @@ -31,6 +31,7 @@ SentinelArrays = "91c51154-3ec4-41a3-a24f-3f23e20d615c" Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c" TimeZones = "f269a46b-ccf7-5d73-abea-4c690281aa53" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" +TestSetExtensions = "98d24dd4-01ad-11ea-1b02-c9a08f80db04" UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" [compat] @@ -44,4 +45,5 @@ PooledArrays = "1" StructTypes = "1" SentinelArrays = "1" Tables = "1" +TestSetExtensions = "3" TimeZones = "1" diff --git a/test/runtests.jl b/test/runtests.jl index 9ca171f..b068be1 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -28,12 +28,16 @@ using DataAPI using FilePathsBase using DataFrames import Random: randstring +using TestSetExtensions: ExtendedTestSet +# this formulation tests the loaded ArrowTypes, even if it's not the dev version +# within the mono-repo include(joinpath(dirname(pathof(ArrowTypes)), "../test/tests.jl")) -include(joinpath(dirname(pathof(Arrow)), "../test/testtables.jl")) -include(joinpath(dirname(pathof(Arrow)), "../test/testappend.jl")) -include(joinpath(dirname(pathof(Arrow)), "../test/integrationtest.jl")) -include(joinpath(dirname(pathof(Arrow)), "../test/dates.jl")) + +include(joinpath(@__DIR__, "testtables.jl")) +include(joinpath(@__DIR__, "testappend.jl")) +include(joinpath(@__DIR__, "integrationtest.jl")) +include(joinpath(@__DIR__, "dates.jl")) struct CustomStruct x::Int @@ -45,7 +49,7 @@ struct CustomStruct2{sym} x::Int end -@testset "Arrow" begin +@testset ExtendedTestSet "Arrow" begin @testset "table roundtrips" begin for case in testtables testtable(case...) @@ -381,6 +385,8 @@ end end @testset "# 126" begin + # XXX This test also captures a race condition in multithreaded + # writes of dictionary encoded arrays t = Tables.partitioner(( (a=Arrow.toarrowvector(PooledArray([1, 2, 3])),), (a=Arrow.toarrowvector(PooledArray([1, 2, 3, 4])),), @@ -602,14 +608,15 @@ end end @testset "# 181" begin + # XXX this test hangs on Julia 1.12 when using a deeper nesting d = Dict{Int,Int}() - for i = 1:9 + for i = 1:1 d = Dict(i => d) end tbl = (x=[d],) - msg = "reached nested serialization level (20) deeper than provided max depth argument (19); to increase allowed nesting level, pass `maxdepth=X`" - @test_throws ErrorException(msg) Arrow.tobuffer(tbl; maxdepth=19) - @test Arrow.Table(Arrow.tobuffer(tbl; maxdepth=20)).x == tbl.x + msg = "reached nested serialization level (2) deeper than provided max depth argument (1); to increase allowed nesting level, pass `maxdepth=X`" + @test_throws ErrorException(msg) Arrow.tobuffer(tbl; maxdepth=1) + @test Arrow.Table(Arrow.tobuffer(tbl; maxdepth=5)).x == tbl.x end @testset "# 167" begin