From cbe6f9b6bd906ee3e7eb0db6c9690b56c975180a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Wed, 25 Mar 2026 17:47:56 +0000 Subject: [PATCH 1/8] Use Base.Semaphore to control test execution parallelism Replace the fixed worker-task-per-slot model with a semaphore-based approach: one task per test, with a Base.Semaphore(jobs) limiting concurrency and a Channel-based worker pool for reuse. This decouples the number of tasks from the parallelism level and simplifies the control flow (no inner while loop, tests array is immutable). Co-authored-by: Claude Made-with: Cursor --- src/ParallelTestRunner.jl | 69 +++++++++++++++++++++++++++------------ 1 file changed, 48 insertions(+), 21 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 4386fb7..9c804ad 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -826,7 +826,11 @@ function runtests(mod::Module, args::ParsedArgs; jobs = clamp(jobs, 1, length(tests)) println(stdout, "Running $(length(tests)) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.") !isnothing(args.verbose) && println(stdout, "Available memory: $(Base.format_bytes(available_memory()))") - workers = fill(nothing, jobs) + sem = Base.Semaphore(jobs) + worker_pool = Channel{Union{Nothing, PTRWorker}}(jobs) + for _ in 1:jobs + put!(worker_pool, nothing) + end t0 = time() results = [] @@ -887,7 +891,7 @@ function runtests(mod::Module, args::ParsedArgs; # only draw if we have something to show isempty(running_tests) && return completed = length(results) - total = completed + length(tests) + length(running_tests) + total = length(tests) # line 1: empty line line1 = "" @@ -921,7 +925,10 @@ function runtests(mod::Module, args::ParsedArgs; est_remaining += max(0.0, duration - elapsed) end ## yet-to-run + completed_names = Set{String}(r[1] for r in results) for test in tests + haskey(running_tests, test) && continue + test in completed_names && continue est_remaining += get(historical_durations, test, est_per_test) end @@ -1004,7 +1011,7 @@ function runtests(mod::Module, args::ParsedArgs; end isa(ex, InterruptException) || rethrow() finally - if isempty(tests) && isempty(running_tests) + if isempty(running_tests) && length(results) >= length(tests) # XXX: only erase the status if we completed successfully. # in other cases we'll have printed "caught interrupt" clear_status() @@ -1012,23 +1019,27 @@ function runtests(mod::Module, args::ParsedArgs; end end - # # execution # - for p in workers + tests_to_start = Threads.Atomic{Int}(length(tests)) + for test in tests push!(worker_tasks, @async begin - while !done - # get a test to run - test, test_t0 = Base.@lock test_lock begin - isempty(tests) && break - test = popfirst!(tests) + local p = nothing + acquired = false + try + Base.acquire(sem) + acquired = true + p = take!(worker_pool) + Threads.atomic_sub!(tests_to_start, 1) + + done && return + test_t0 = Base.@lock test_lock begin test_t0 = time() running_tests[test] = test_t0 - - test, test_t0 + test_t0 end # pass in init_worker_code to custom worker function if defined @@ -1055,7 +1066,7 @@ function runtests(mod::Module, args::ParsedArgs; if isa(ex, InterruptException) # the worker got interrupted, signal other tasks to stop stop_work() - break + return end ex @@ -1069,7 +1080,7 @@ function runtests(mod::Module, args::ParsedArgs; put!(printer_channel, (:finished, test, worker_id(wrkr), result)) if anynonpass(result[]) && args.quickfail !== nothing stop_work() - break + return end if memory_usage(result) > max_worker_rss @@ -1083,7 +1094,7 @@ function runtests(mod::Module, args::ParsedArgs; put!(printer_channel, (:crashed, test, worker_id(wrkr))) if args.quickfail !== nothing stop_work() - break + return end # the worker encountered some serious failure, recycle it @@ -1098,14 +1109,22 @@ function runtests(mod::Module, args::ParsedArgs; Base.@lock test_lock begin delete!(running_tests, test) end - end - if p !== nothing - Malt.stop(p) + catch ex + isa(ex, InterruptException) || rethrow() + finally + if acquired + # stop the worker if no more tests will need one from the pool + if tests_to_start[] == 0 && p !== nothing && Malt.isrunning(p) + Malt.stop(p) + p = nothing + end + put!(worker_pool, p) + Base.release(sem) + end end end) end - # # finalization # @@ -1116,7 +1135,7 @@ function runtests(mod::Module, args::ParsedArgs; if any(istaskfailed, worker_tasks) println(io_ctx.stderr, "\nCaught an error, stopping...") break - elseif done || Base.@lock(test_lock, isempty(tests) && isempty(running_tests)) + elseif done || Base.@lock(test_lock, isempty(running_tests) && length(results) >= length(tests)) break end sleep(1) @@ -1146,6 +1165,14 @@ function runtests(mod::Module, args::ParsedArgs; end end + # clean up remaining workers in the pool + close(worker_pool) + for p in worker_pool + if p !== nothing && Malt.isrunning(p) + Malt.stop(p) + end + end + # print the output generated by each testset for (testname, result, output, _start, _stop) in results if !isempty(output) @@ -1227,7 +1254,7 @@ function runtests(mod::Module, args::ParsedArgs; end # mark remaining or running tests as interrupted - for test in [tests; collect(keys(running_tests))] + for test in tests (test in completed_tests) && continue testset = create_testset(test) Test.record(testset, Test.Error(:test_interrupted, test, nothing, Base.ExceptionStack(NamedTuple[(;exception = "skipped", backtrace = [])]), LineNumberNode(1))) From 62bb315cb608cfbce24ebdb9d79deee60fa35c53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Wed, 25 Mar 2026 23:36:02 +0000 Subject: [PATCH 2/8] Deal with 0 test jobs --- src/ParallelTestRunner.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 9c804ad..3f13792 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -826,7 +826,7 @@ function runtests(mod::Module, args::ParsedArgs; jobs = clamp(jobs, 1, length(tests)) println(stdout, "Running $(length(tests)) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.") !isnothing(args.verbose) && println(stdout, "Available memory: $(Base.format_bytes(available_memory()))") - sem = Base.Semaphore(jobs) + sem = Base.Semaphore(max(1, jobs)) worker_pool = Channel{Union{Nothing, PTRWorker}}(jobs) for _ in 1:jobs put!(worker_pool, nothing) From 7ee377959db756123d6137079bc3a7b32f1f1a33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Thu, 26 Mar 2026 00:51:50 +0000 Subject: [PATCH 3/8] Remove redundant variable --- src/ParallelTestRunner.jl | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 3f13792..23f7146 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -1039,7 +1039,6 @@ function runtests(mod::Module, args::ParsedArgs; test_t0 = Base.@lock test_lock begin test_t0 = time() running_tests[test] = test_t0 - test_t0 end # pass in init_worker_code to custom worker function if defined From a0fc2a30dcda6eb60d315f2ad84085b51edbff30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Thu, 26 Mar 2026 01:03:56 +0000 Subject: [PATCH 4/8] Avoid creating a `Set` --- src/ParallelTestRunner.jl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 23f7146..db7be4b 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -925,10 +925,10 @@ function runtests(mod::Module, args::ParsedArgs; est_remaining += max(0.0, duration - elapsed) end ## yet-to-run - completed_names = Set{String}(r[1] for r in results) for test in tests haskey(running_tests, test) && continue - test in completed_names && continue + # Test is in any completed test + any(r -> test == r.test, results) && continue est_remaining += get(historical_durations, test, est_per_test) end @@ -1072,7 +1072,7 @@ function runtests(mod::Module, args::ParsedArgs; end test_t1 = time() output = String(take!(wrkr.io)) - push!(results, (test, result, output, test_t0, test_t1)) + push!(results, (; test, result, output, test_t0, test_t1)) # act on the results if result isa AbstractTestRecord From 7c39cf50de4323021cdf9c1480bfb4f6c6836b7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Thu, 26 Mar 2026 11:25:57 +0000 Subject: [PATCH 5/8] `@async` -> `Threads.@spawn` -> `@sync` --- src/ParallelTestRunner.jl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index db7be4b..31f77f9 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -1024,8 +1024,8 @@ function runtests(mod::Module, args::ParsedArgs; # tests_to_start = Threads.Atomic{Int}(length(tests)) - for test in tests - push!(worker_tasks, @async begin + @sync for test in tests + push!(worker_tasks, Threads.@spawn begin local p = nothing acquired = false try From b28a3be54ddfb4d3f304b2ad81f3507fce669bd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Thu, 26 Mar 2026 13:57:13 +0000 Subject: [PATCH 6/8] Add a lock to protect pushes to `results` --- src/ParallelTestRunner.jl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 31f77f9..50d0bdd 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -836,6 +836,7 @@ function runtests(mod::Module, args::ParsedArgs; results = [] running_tests = Dict{String, Float64}() # test => start_time test_lock = ReentrantLock() # to protect crucial access to tests and running_tests + results_lock = ReentrantLock() # to protect concurrent access to results worker_tasks = Task[] @@ -1072,7 +1073,7 @@ function runtests(mod::Module, args::ParsedArgs; end test_t1 = time() output = String(take!(wrkr.io)) - push!(results, (; test, result, output, test_t0, test_t1)) + Base.@lock results_lock push!(results, (; test, result, output, test_t0, test_t1)) # act on the results if result isa AbstractTestRecord From 3cfcd97f1019bdd462b3dad0091737466dd9ca33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Thu, 26 Mar 2026 19:13:30 +0000 Subject: [PATCH 7/8] Move `@sync`ed for loop inside `try`/`catch` block --- src/ParallelTestRunner.jl | 198 ++++++++++++++++++-------------------- 1 file changed, 94 insertions(+), 104 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index d191961..0064fdd 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -1027,128 +1027,118 @@ function runtests(mod::Module, args::ParsedArgs; # tests_to_start = Threads.Atomic{Int}(length(tests)) - @sync for test in tests - push!(worker_tasks, Threads.@spawn begin - local p = nothing - acquired = false - try - Base.acquire(sem) - acquired = true - p = take!(worker_pool) - Threads.atomic_sub!(tests_to_start, 1) - - done && return - - test_t0 = Base.@lock test_lock begin - test_t0 = time() - running_tests[test] = test_t0 - end - - # pass in init_worker_code to custom worker function if defined - wrkr = if init_worker_code == :() - test_worker(test) - else - test_worker(test, init_worker_code) - end - if wrkr === nothing - wrkr = p - end - # if a worker failed, spawn a new one - if wrkr === nothing || !Malt.isrunning(wrkr) - wrkr = p = addworker(; init_worker_code, io_ctx.color) - end + try + @sync for test in tests + push!(worker_tasks, Threads.@spawn begin + local p = nothing + acquired = false + try + Base.acquire(sem) + acquired = true + p = take!(worker_pool) + Threads.atomic_sub!(tests_to_start, 1) + + done && return + + test_t0 = Base.@lock test_lock begin + test_t0 = time() + running_tests[test] = test_t0 + end - # run the test - put!(printer_channel, (:started, test, worker_id(wrkr))) - result = try - Malt.remote_eval_wait(Main, wrkr.w, :(import ParallelTestRunner)) - Malt.remote_call_fetch(invokelatest, wrkr.w, runtest, - testsuite[test], test, init_code, test_t0) - catch ex - if isa(ex, InterruptException) - # the worker got interrupted, signal other tasks to stop - stop_work() - return + # pass in init_worker_code to custom worker function if defined + wrkr = if init_worker_code == :() + test_worker(test) + else + test_worker(test, init_worker_code) + end + if wrkr === nothing + wrkr = p + end + # if a worker failed, spawn a new one + if wrkr === nothing || !Malt.isrunning(wrkr) + wrkr = p = addworker(; init_worker_code, io_ctx.color) end - ex - end - test_t1 = time() - output = Base.@lock wrkr.io_lock String(take!(wrkr.io)) - Base.@lock results_lock push!(results, (; test, result, output, test_t0, test_t1)) + # run the test + put!(printer_channel, (:started, test, worker_id(wrkr))) + result = try + Malt.remote_eval_wait(Main, wrkr.w, :(import ParallelTestRunner)) + Malt.remote_call_fetch(invokelatest, wrkr.w, runtest, + testsuite[test], test, init_code, test_t0) + catch ex + if isa(ex, InterruptException) + # the worker got interrupted, signal other tasks to stop + stop_work() + return + end - # act on the results - if result isa AbstractTestRecord - put!(printer_channel, (:finished, test, worker_id(wrkr), result)) - if anynonpass(result[]) && args.quickfail !== nothing - stop_work() - return + ex end + test_t1 = time() + output = Base.@lock wrkr.io_lock String(take!(wrkr.io)) + Base.@lock results_lock push!(results, (; test, result, output, test_t0, test_t1)) + + # act on the results + if result isa AbstractTestRecord + put!(printer_channel, (:finished, test, worker_id(wrkr), result)) + if anynonpass(result[]) && args.quickfail !== nothing + stop_work() + return + end + + if memory_usage(result) > max_worker_rss + # the worker has reached the max-rss limit, recycle it + # so future tests start with a smaller working set + Malt.stop(wrkr) + end + else + # One of Malt.TerminatedWorkerException, Malt.RemoteException, or ErrorException + @assert result isa Exception + put!(printer_channel, (:crashed, test, worker_id(wrkr))) + if args.quickfail !== nothing + stop_work() + return + end - if memory_usage(result) > max_worker_rss - # the worker has reached the max-rss limit, recycle it - # so future tests start with a smaller working set + # the worker encountered some serious failure, recycle it Malt.stop(wrkr) end - else - # One of Malt.TerminatedWorkerException, Malt.RemoteException, or ErrorException - @assert result isa Exception - put!(printer_channel, (:crashed, test, worker_id(wrkr))) - if args.quickfail !== nothing - stop_work() - return - end - # the worker encountered some serious failure, recycle it - Malt.stop(wrkr) - end - - # get rid of the custom worker - if wrkr != p - Malt.stop(wrkr) - end + # get rid of the custom worker + if wrkr != p + Malt.stop(wrkr) + end - Base.@lock test_lock begin - delete!(running_tests, test) - end - catch ex - isa(ex, InterruptException) || rethrow() - finally - if acquired - # stop the worker if no more tests will need one from the pool - if tests_to_start[] == 0 && p !== nothing && Malt.isrunning(p) - Malt.stop(p) - p = nothing + Base.@lock test_lock begin + delete!(running_tests, test) + end + catch ex + isa(ex, InterruptException) || rethrow() + finally + if acquired + # stop the worker if no more tests will need one from the pool + if tests_to_start[] == 0 && p !== nothing && Malt.isrunning(p) + Malt.stop(p) + p = nothing + end + put!(worker_pool, p) + Base.release(sem) end - put!(worker_pool, p) - Base.release(sem) end - end - end) - end - - # - # finalization - # - - # monitor worker tasks for failure so that each one doesn't need a try/catch + stop_work() - try - while true - if any(istaskfailed, worker_tasks) - println(io_ctx.stderr, "\nCaught an error, stopping...") - break - elseif done || Base.@lock(test_lock, isempty(running_tests) && length(results) >= length(tests)) - break - end - sleep(1) + end) end catch err - # in case the sleep got interrupted - isa(err, InterruptException) || rethrow() + if !(err isa InterruptException) + println(io_ctx.stderr, "\nCaught an error, stopping...") + end finally stop_work() end + # + # finalization + # + # wait for the printer to finish so that all results have been printed close(printer_channel) wait(printer_task) From 551a5c03c9903c884684ed03f861ee69d53d0182 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 27 Mar 2026 11:22:40 +0000 Subject: [PATCH 8/8] Require Malt v1.4.1 --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index acf665e..e9fd212 100644 --- a/Project.toml +++ b/Project.toml @@ -17,7 +17,7 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [compat] Dates = "1" IOCapture = "0.2.5, 1" -Malt = "1.4.0" +Malt = "1.4.1" Printf = "1" Random = "1" Scratch = "1.3.0"