Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/async/container/generic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ def wait
@group.wait
end

# Gracefully interrupt all child instances.
def interrupt
@group.interrupt
end

# Returns true if all children instances have the specified status flag set.
# e.g. `:ready`.
# This state is updated by the process readiness protocol mechanism. See {Notify::Client} for more details.
Expand Down
15 changes: 8 additions & 7 deletions lib/async/container/hybrid.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **optio
container.wait_until_ready
instance.ready!

container.wait
rescue Async::Container::Terminate
# Stop it immediately:
container.stop(false)
raise
begin
container.wait
rescue Interrupt
# Gracefully interrupt child threads; parent process handles escalation.
container.interrupt
retry
end
ensure
# Stop it gracefully (also code path for Interrupt):
container.stop
container.stop(false)
end
end

Expand Down
48 changes: 48 additions & 0 deletions test/async/container/hybrid.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,52 @@
it "should be multiprocess" do
expect(subject).to be(:multiprocess?)
end

it "forcefully stops the inner threaded container on exit" do
stop_arguments = []
interrupt_count = 0

threaded_class = Class.new
threaded_class.define_method(:run) do |**options, &block|
self
end
threaded_class.define_method(:wait_until_ready) do
end
threaded_class.define_method(:wait) do
@wait_count ||= 0
@wait_count += 1

raise Interrupt if @wait_count == 1
end
threaded_class.define_method(:interrupt) do
interrupt_count += 1
end
threaded_class.define_method(:stop) do |graceful = true|
stop_arguments << graceful
end

container_class = Class.new(subject) do
def spawn(**options, &block)
instance = Object.new
def instance.ready!
end

block.call(instance)
end
end

original_threaded = Async::Container.send(:remove_const, :Threaded)
Async::Container.const_set(:Threaded, threaded_class)

container = container_class.new
container.run(count: 1, forks: 1, threads: 1) do |instance|
# No-op.
end

expect(interrupt_count).to be == 1
expect(stop_arguments).to be == [false]
ensure
Async::Container.send(:remove_const, :Threaded)
Async::Container.const_set(:Threaded, original_threaded)
end
end if Async::Container.fork?
Loading