diff --git a/Project.toml b/Project.toml index acf665e..e9fd212 100644 --- a/Project.toml +++ b/Project.toml @@ -17,7 +17,7 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [compat] Dates = "1" IOCapture = "0.2.5, 1" -Malt = "1.4.0" +Malt = "1.4.1" Printf = "1" Random = "1" Scratch = "1.3.0" diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index f597907..a049488 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -828,7 +828,11 @@ function runtests(mod::Module, args::ParsedArgs; jobs = clamp(jobs, 1, length(tests)) println(stdout, "Running $(length(tests)) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.") !isnothing(args.verbose) && println(stdout, "Available memory: $(Base.format_bytes(available_memory()))") - workers = fill(nothing, jobs) + sem = Base.Semaphore(max(1, jobs)) + worker_pool = Channel{Union{Nothing, PTRWorker}}(jobs) + for _ in 1:jobs + put!(worker_pool, nothing) + end t0 = time() results = [] @@ -890,7 +894,7 @@ function runtests(mod::Module, args::ParsedArgs; # only draw if we have something to show isempty(running_tests) && return completed = Base.@lock results_lock length(results) - total = completed + length(tests) + length(running_tests) + total = length(tests) # line 1: empty line line1 = "" @@ -925,6 +929,9 @@ function runtests(mod::Module, args::ParsedArgs; end ## yet-to-run for test in tests + haskey(running_tests, test) && continue + # Test is in any completed test + any(r -> test == r.test, results) && continue est_remaining += get(historical_durations, test, est_per_test) end @@ -1007,7 +1014,7 @@ function runtests(mod::Module, args::ParsedArgs; end isa(ex, InterruptException) || rethrow() finally - if isempty(tests) && isempty(running_tests) + if isempty(running_tests) && length(results) >= length(tests) # XXX: only erase the status if we completed successfully. # in other cases we'll have printed "caught interrupt" clear_status() @@ -1015,122 +1022,123 @@ function runtests(mod::Module, args::ParsedArgs; end end - # # execution # - for p in workers - push!(worker_tasks, @async begin - while !done - # get a test to run - test, test_t0 = Base.@lock test_lock begin - isempty(tests) && break - test = popfirst!(tests) - - test_t0 = time() - running_tests[test] = test_t0 + tests_to_start = Threads.Atomic{Int}(length(tests)) + try + @sync for test in tests + push!(worker_tasks, Threads.@spawn begin + local p = nothing + acquired = false + try + Base.acquire(sem) + acquired = true + p = take!(worker_pool) + Threads.atomic_sub!(tests_to_start, 1) + + done && return + + test_t0 = Base.@lock test_lock begin + test_t0 = time() + running_tests[test] = test_t0 + end - test, test_t0 - end + # pass in init_worker_code to custom worker function if defined + wrkr = if init_worker_code == :() + test_worker(test) + else + test_worker(test, init_worker_code) + end + if wrkr === nothing + wrkr = p + end + # if a worker failed, spawn a new one + if wrkr === nothing || !Malt.isrunning(wrkr) + wrkr = p = addworker(; init_worker_code, io_ctx.color) + end - # pass in init_worker_code to custom worker function if defined - wrkr = if init_worker_code == :() - test_worker(test) - else - test_worker(test, init_worker_code) - end - if wrkr === nothing - wrkr = p - end - # if a worker failed, spawn a new one - if wrkr === nothing || !Malt.isrunning(wrkr) - wrkr = p = addworker(; init_worker_code, io_ctx.color) - end + # run the test + put!(printer_channel, (:started, test, worker_id(wrkr))) + result = try + Malt.remote_eval_wait(Main, wrkr.w, :(import ParallelTestRunner)) + Malt.remote_call_fetch(invokelatest, wrkr.w, runtest, + testsuite[test], test, init_code, test_t0) + catch ex + if isa(ex, InterruptException) + # the worker got interrupted, signal other tasks to stop + stop_work() + return + end - # run the test - put!(printer_channel, (:started, test, worker_id(wrkr))) - result = try - Malt.remote_eval_wait(Main, wrkr.w, :(import ParallelTestRunner)) - Malt.remote_call_fetch(invokelatest, wrkr.w, runtest, - testsuite[test], test, init_code, test_t0) - catch ex - if isa(ex, InterruptException) - # the worker got interrupted, signal other tasks to stop - stop_work() - break + ex end + test_t1 = time() + output = Base.@lock wrkr.io_lock String(take!(wrkr.io)) + Base.@lock results_lock push!(results, (; test, result, output, test_t0, test_t1)) + + # act on the results + if result isa AbstractTestRecord + put!(printer_channel, (:finished, test, worker_id(wrkr), result)) + if anynonpass(result[]) && args.quickfail !== nothing + stop_work() + return + end - ex - end - test_t1 = time() - output = Base.@lock wrkr.io_lock String(take!(wrkr.io)) - Base.@lock results_lock push!(results, (test, result, output, test_t0, test_t1)) + if memory_usage(result) > max_worker_rss + # the worker has reached the max-rss limit, recycle it + # so future tests start with a smaller working set + Malt.stop(wrkr) + end + else + # One of Malt.TerminatedWorkerException, Malt.RemoteException, or ErrorException + @assert result isa Exception + put!(printer_channel, (:crashed, test, worker_id(wrkr))) + if args.quickfail !== nothing + stop_work() + return + end - # act on the results - if result isa AbstractTestRecord - put!(printer_channel, (:finished, test, worker_id(wrkr), result)) - if anynonpass(result[]) && args.quickfail !== nothing - stop_work() - break + # the worker encountered some serious failure, recycle it + Malt.stop(wrkr) end - if memory_usage(result) > max_worker_rss - # the worker has reached the max-rss limit, recycle it - # so future tests start with a smaller working set + # get rid of the custom worker + if wrkr != p Malt.stop(wrkr) end - else - # One of Malt.TerminatedWorkerException, Malt.RemoteException, or ErrorException - @assert result isa Exception - put!(printer_channel, (:crashed, test, worker_id(wrkr))) - if args.quickfail !== nothing - stop_work() - break - end - - # the worker encountered some serious failure, recycle it - Malt.stop(wrkr) - end - # get rid of the custom worker - if wrkr != p - Malt.stop(wrkr) - end - - Base.@lock test_lock begin - delete!(running_tests, test) + Base.@lock test_lock begin + delete!(running_tests, test) + end + catch ex + isa(ex, InterruptException) || rethrow() + finally + if acquired + # stop the worker if no more tests will need one from the pool + if tests_to_start[] == 0 && p !== nothing && Malt.isrunning(p) + Malt.stop(p) + p = nothing + end + put!(worker_pool, p) + Base.release(sem) + end end - end - if p !== nothing - Malt.stop(p) - end - end) - end - - - # - # finalization - # - - # monitor worker tasks for failure so that each one doesn't need a try/catch + stop_work() - try - while true - if any(istaskfailed, worker_tasks) - println(io_ctx.stderr, "\nCaught an error, stopping...") - break - elseif done || Base.@lock(test_lock, isempty(tests) && isempty(running_tests)) - break - end - sleep(1) + end) end catch err - # in case the sleep got interrupted - isa(err, InterruptException) || rethrow() + if !(err isa InterruptException) + println(io_ctx.stderr, "\nCaught an error, stopping...") + end finally stop_work() end + # + # finalization + # + # wait for the printer to finish so that all results have been printed close(printer_channel) wait(printer_task) @@ -1149,6 +1157,14 @@ function runtests(mod::Module, args::ParsedArgs; end end + # clean up remaining workers in the pool + close(worker_pool) + for p in worker_pool + if p !== nothing && Malt.isrunning(p) + Malt.stop(p) + end + end + # print the output generated by each testset for (testname, result, output, _start, _stop) in results if !isempty(output) @@ -1230,7 +1246,7 @@ function runtests(mod::Module, args::ParsedArgs; end # mark remaining or running tests as interrupted - for test in [tests; collect(keys(running_tests))] + for test in tests (test in completed_tests) && continue testset = create_testset(test) Test.record(testset, Test.Error(:test_interrupted, test, nothing, Base.ExceptionStack(NamedTuple[(;exception = "skipped", backtrace = [])]), LineNumberNode(1)))