Skip to content
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
[compat]
Dates = "1"
IOCapture = "0.2.5, 1"
Malt = "1.4.0"
Malt = "1.4.1"
Printf = "1"
Random = "1"
Scratch = "1.3.0"
Expand Down
212 changes: 114 additions & 98 deletions src/ParallelTestRunner.jl
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,11 @@ function runtests(mod::Module, args::ParsedArgs;
jobs = clamp(jobs, 1, length(tests))
println(stdout, "Running $(length(tests)) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.")
!isnothing(args.verbose) && println(stdout, "Available memory: $(Base.format_bytes(available_memory()))")
workers = fill(nothing, jobs)
sem = Base.Semaphore(max(1, jobs))
worker_pool = Channel{Union{Nothing, PTRWorker}}(jobs)
for _ in 1:jobs
put!(worker_pool, nothing)
end

t0 = time()
results = []
Expand Down Expand Up @@ -890,7 +894,7 @@ function runtests(mod::Module, args::ParsedArgs;
# only draw if we have something to show
isempty(running_tests) && return
completed = Base.@lock results_lock length(results)
total = completed + length(tests) + length(running_tests)
total = length(tests)

# line 1: empty line
line1 = ""
Expand Down Expand Up @@ -925,6 +929,9 @@ function runtests(mod::Module, args::ParsedArgs;
end
## yet-to-run
for test in tests
haskey(running_tests, test) && continue
# Test is in any completed test
any(r -> test == r.test, results) && continue
est_remaining += get(historical_durations, test, est_per_test)
end

Expand Down Expand Up @@ -1007,130 +1014,131 @@ function runtests(mod::Module, args::ParsedArgs;
end
isa(ex, InterruptException) || rethrow()
finally
if isempty(tests) && isempty(running_tests)
if isempty(running_tests) && length(results) >= length(tests)
# XXX: only erase the status if we completed successfully.
# in other cases we'll have printed "caught interrupt"
clear_status()
end
end
end


#
# execution
#

for p in workers
push!(worker_tasks, @async begin
while !done
# get a test to run
test, test_t0 = Base.@lock test_lock begin
isempty(tests) && break
test = popfirst!(tests)

test_t0 = time()
running_tests[test] = test_t0
tests_to_start = Threads.Atomic{Int}(length(tests))
try
@sync for test in tests
push!(worker_tasks, Threads.@spawn begin
local p = nothing
acquired = false
try
Base.acquire(sem)
acquired = true
p = take!(worker_pool)
Threads.atomic_sub!(tests_to_start, 1)

done && return

test_t0 = Base.@lock test_lock begin
test_t0 = time()
running_tests[test] = test_t0
end

test, test_t0
end
# pass in init_worker_code to custom worker function if defined
wrkr = if init_worker_code == :()
test_worker(test)
else
test_worker(test, init_worker_code)
end
if wrkr === nothing
wrkr = p
end
# if a worker failed, spawn a new one
if wrkr === nothing || !Malt.isrunning(wrkr)
wrkr = p = addworker(; init_worker_code, io_ctx.color)
end

# pass in init_worker_code to custom worker function if defined
wrkr = if init_worker_code == :()
test_worker(test)
else
test_worker(test, init_worker_code)
end
if wrkr === nothing
wrkr = p
end
# if a worker failed, spawn a new one
if wrkr === nothing || !Malt.isrunning(wrkr)
wrkr = p = addworker(; init_worker_code, io_ctx.color)
end
# run the test
put!(printer_channel, (:started, test, worker_id(wrkr)))
result = try
Malt.remote_eval_wait(Main, wrkr.w, :(import ParallelTestRunner))
Malt.remote_call_fetch(invokelatest, wrkr.w, runtest,
testsuite[test], test, init_code, test_t0)
catch ex
if isa(ex, InterruptException)
# the worker got interrupted, signal other tasks to stop
stop_work()
return
end

# run the test
put!(printer_channel, (:started, test, worker_id(wrkr)))
result = try
Malt.remote_eval_wait(Main, wrkr.w, :(import ParallelTestRunner))
Malt.remote_call_fetch(invokelatest, wrkr.w, runtest,
testsuite[test], test, init_code, test_t0)
catch ex
if isa(ex, InterruptException)
# the worker got interrupted, signal other tasks to stop
stop_work()
break
ex
end
test_t1 = time()
output = Base.@lock wrkr.io_lock String(take!(wrkr.io))
Base.@lock results_lock push!(results, (; test, result, output, test_t0, test_t1))

# act on the results
if result isa AbstractTestRecord
put!(printer_channel, (:finished, test, worker_id(wrkr), result))
if anynonpass(result[]) && args.quickfail !== nothing
stop_work()
return
end

ex
end
test_t1 = time()
output = Base.@lock wrkr.io_lock String(take!(wrkr.io))
Base.@lock results_lock push!(results, (test, result, output, test_t0, test_t1))
if memory_usage(result) > max_worker_rss
# the worker has reached the max-rss limit, recycle it
# so future tests start with a smaller working set
Malt.stop(wrkr)
end
else
# One of Malt.TerminatedWorkerException, Malt.RemoteException, or ErrorException
@assert result isa Exception
put!(printer_channel, (:crashed, test, worker_id(wrkr)))
if args.quickfail !== nothing
stop_work()
return
end

# act on the results
if result isa AbstractTestRecord
put!(printer_channel, (:finished, test, worker_id(wrkr), result))
if anynonpass(result[]) && args.quickfail !== nothing
stop_work()
break
# the worker encountered some serious failure, recycle it
Malt.stop(wrkr)
end

if memory_usage(result) > max_worker_rss
# the worker has reached the max-rss limit, recycle it
# so future tests start with a smaller working set
# get rid of the custom worker
if wrkr != p
Malt.stop(wrkr)
end
else
# One of Malt.TerminatedWorkerException, Malt.RemoteException, or ErrorException
@assert result isa Exception
put!(printer_channel, (:crashed, test, worker_id(wrkr)))
if args.quickfail !== nothing
stop_work()
break
end

# the worker encountered some serious failure, recycle it
Malt.stop(wrkr)
end

# get rid of the custom worker
if wrkr != p
Malt.stop(wrkr)
end

Base.@lock test_lock begin
delete!(running_tests, test)
Base.@lock test_lock begin
delete!(running_tests, test)
end
catch ex
isa(ex, InterruptException) || rethrow()
finally
if acquired
# stop the worker if no more tests will need one from the pool
if tests_to_start[] == 0 && p !== nothing && Malt.isrunning(p)
Malt.stop(p)
p = nothing
end
put!(worker_pool, p)
Base.release(sem)
end
end
end
if p !== nothing
Malt.stop(p)
end
end)
end


#
# finalization
#

# monitor worker tasks for failure so that each one doesn't need a try/catch + stop_work()
try
while true
if any(istaskfailed, worker_tasks)
println(io_ctx.stderr, "\nCaught an error, stopping...")
break
elseif done || Base.@lock(test_lock, isempty(tests) && isempty(running_tests))
break
end
sleep(1)
end)
end
catch err
# in case the sleep got interrupted
isa(err, InterruptException) || rethrow()
if !(err isa InterruptException)
println(io_ctx.stderr, "\nCaught an error, stopping...")
end
finally
stop_work()
end

#
# finalization
#

# wait for the printer to finish so that all results have been printed
close(printer_channel)
wait(printer_task)
Expand All @@ -1149,6 +1157,14 @@ function runtests(mod::Module, args::ParsedArgs;
end
end

# clean up remaining workers in the pool
close(worker_pool)
for p in worker_pool
if p !== nothing && Malt.isrunning(p)
Malt.stop(p)
end
end

# print the output generated by each testset
for (testname, result, output, _start, _stop) in results
if !isempty(output)
Expand Down Expand Up @@ -1230,7 +1246,7 @@ function runtests(mod::Module, args::ParsedArgs;
end

# mark remaining or running tests as interrupted
for test in [tests; collect(keys(running_tests))]
for test in tests
(test in completed_tests) && continue
testset = create_testset(test)
Test.record(testset, Test.Error(:test_interrupted, test, nothing, Base.ExceptionStack(NamedTuple[(;exception = "skipped", backtrace = [])]), LineNumberNode(1)))
Expand Down
Loading