Skip to content

Add a lock to protect pushes to results#122

Merged
giordano merged 2 commits intomainfrom
mg/locks
Mar 26, 2026
Merged

Add a lock to protect pushes to results#122
giordano merged 2 commits intomainfrom
mg/locks

Conversation

@giordano
Copy link
Copy Markdown
Collaborator

Taken out of #119, ref: #119 (comment).

@maleadt
Copy link
Copy Markdown
Collaborator

maleadt commented Mar 26, 2026

We also need this on the read operations, no? At least when multithreading; I assume with regular concurrent @async this would work fine without (or at least I assumed so when writing this code).

@giordano
Copy link
Copy Markdown
Collaborator Author

We also need this on the read operations, no?

There weren't other uses of results in the worker tasks @async block

push!(worker_tasks, @async begin
while !done
# get a test to run
test, test_t0 = Base.@lock test_lock begin
isempty(tests) && break
test = popfirst!(tests)
test_t0 = time()
running_tests[test] = test_t0
test, test_t0
end
# pass in init_worker_code to custom worker function if defined
wrkr = if init_worker_code == :()
test_worker(test)
else
test_worker(test, init_worker_code)
end
if wrkr === nothing
wrkr = p
end
# if a worker failed, spawn a new one
if wrkr === nothing || !Malt.isrunning(wrkr)
wrkr = p = addworker(; init_worker_code, io_ctx.color)
end
# run the test
put!(printer_channel, (:started, test, worker_id(wrkr)))
result = try
Malt.remote_eval_wait(Main, wrkr.w, :(import ParallelTestRunner))
Malt.remote_call_fetch(invokelatest, wrkr.w, runtest,
testsuite[test], test, init_code, test_t0)
catch ex
if isa(ex, InterruptException)
# the worker got interrupted, signal other tasks to stop
stop_work()
break
end
ex
end
test_t1 = time()
output = Base.@lock wrkr.io_lock String(take!(wrkr.io))
Base.@lock results_lock push!(results, (test, result, output, test_t0, test_t1))
# act on the results
if result isa AbstractTestRecord
put!(printer_channel, (:finished, test, worker_id(wrkr), result))
if anynonpass(result[]) && args.quickfail !== nothing
stop_work()
break
end
if memory_usage(result) > max_worker_rss
# the worker has reached the max-rss limit, recycle it
# so future tests start with a smaller working set
Malt.stop(wrkr)
end
else
# One of Malt.TerminatedWorkerException, Malt.RemoteException, or ErrorException
@assert result isa Exception
put!(printer_channel, (:crashed, test, worker_id(wrkr)))
if args.quickfail !== nothing
stop_work()
break
end
# the worker encountered some serious failure, recycle it
Malt.stop(wrkr)
end
# get rid of the custom worker
if wrkr != p
Malt.stop(wrkr)
end
Base.@lock test_lock begin
delete!(running_tests, test)
end
end
if p !== nothing
Malt.stop(p)
end
end)
but there were a couple of reads of results inside update_status(), which I just locked. Other uses I can see are all after the tasks are done. Am I missing something else?

@giordano giordano merged commit 129d9d5 into main Mar 26, 2026
23 checks passed
@giordano giordano deleted the mg/locks branch March 26, 2026 21:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants