From 9098dc3ebaab31e167effd2e40018faab4ee0610 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Tue, 26 May 2026 21:19:13 +0900 Subject: [PATCH] Better handling of graceful timeout by Hybrid container. Fixes #56. --- lib/async/container/generic.rb | 5 ++++ lib/async/container/hybrid.rb | 15 ++++++----- test/async/container/hybrid.rb | 48 ++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/lib/async/container/generic.rb b/lib/async/container/generic.rb index 07cdb02..350022f 100644 --- a/lib/async/container/generic.rb +++ b/lib/async/container/generic.rb @@ -115,6 +115,11 @@ def wait @group.wait end + # Gracefully interrupt all child instances. + def interrupt + @group.interrupt + end + # Returns true if all children instances have the specified status flag set. # e.g. `:ready`. # This state is updated by the process readiness protocol mechanism. See {Notify::Client} for more details. diff --git a/lib/async/container/hybrid.rb b/lib/async/container/hybrid.rb index 736429b..8c6f4f5 100644 --- a/lib/async/container/hybrid.rb +++ b/lib/async/container/hybrid.rb @@ -31,14 +31,15 @@ def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **optio container.wait_until_ready instance.ready! - container.wait - rescue Async::Container::Terminate - # Stop it immediately: - container.stop(false) - raise + begin + container.wait + rescue Interrupt + # Gracefully interrupt child threads; parent process handles escalation. + container.interrupt + retry + end ensure - # Stop it gracefully (also code path for Interrupt): - container.stop + container.stop(false) end end diff --git a/test/async/container/hybrid.rb b/test/async/container/hybrid.rb index ce9f615..4b40260 100644 --- a/test/async/container/hybrid.rb +++ b/test/async/container/hybrid.rb @@ -13,4 +13,52 @@ it "should be multiprocess" do expect(subject).to be(:multiprocess?) end + + it "forcefully stops the inner threaded container on exit" do + stop_arguments = [] + interrupt_count = 0 + + threaded_class = Class.new + threaded_class.define_method(:run) do |**options, &block| + self + end + threaded_class.define_method(:wait_until_ready) do + end + threaded_class.define_method(:wait) do + @wait_count ||= 0 + @wait_count += 1 + + raise Interrupt if @wait_count == 1 + end + threaded_class.define_method(:interrupt) do + interrupt_count += 1 + end + threaded_class.define_method(:stop) do |graceful = true| + stop_arguments << graceful + end + + container_class = Class.new(subject) do + def spawn(**options, &block) + instance = Object.new + def instance.ready! + end + + block.call(instance) + end + end + + original_threaded = Async::Container.send(:remove_const, :Threaded) + Async::Container.const_set(:Threaded, threaded_class) + + container = container_class.new + container.run(count: 1, forks: 1, threads: 1) do |instance| + # No-op. + end + + expect(interrupt_count).to be == 1 + expect(stop_arguments).to be == [false] + ensure + Async::Container.send(:remove_const, :Threaded) + Async::Container.const_set(:Threaded, original_threaded) + end end if Async::Container.fork?