Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion lib/async/pool/controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
require "async/semaphore"

require "thread"
require "set"

module Async
module Pool
Expand Down Expand Up @@ -50,6 +51,9 @@ def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil,
# Used to signal when a resource has been released:
@mutex = Thread::Mutex.new
@condition = Thread::ConditionVariable.new

# Resources currently being retired (close may yield):
@retiring = Set.new
end

# @attribute [Proc] The constructor used to create new resources.
Expand Down Expand Up @@ -234,9 +238,11 @@ def prune(retain = 0)
def retire(resource)
Console.debug(self){"Retire #{resource}"}

@resources.delete(resource)
return false unless @resources.delete(resource)

@retiring.add(resource)
resource.close
@retiring.delete(resource)

@mutex.synchronize{@condition.broadcast}

Expand Down Expand Up @@ -286,6 +292,9 @@ def reuse(resource)

usage = @resources[resource]

# Already retired (e.g. connection reset during retire close):
return false if usage.nil? && @retiring.include?(resource)

if usage.nil? || usage.zero?
raise "Trying to reuse unacquired resource: #{resource}!"
end
Expand Down
78 changes: 78 additions & 0 deletions test/async/pool/retire_race.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# frozen_string_literal: true

# Verifies that retire + release on the same resource does not raise
# "Trying to reuse unacquired resource".
#
# In practice this happens with async-http when an HTTP/2 connection is reset:
# the background reader retires the connection, but retire's resource.close
# yields (e.g. sending GOAWAY), allowing another fiber to call release while
# the resource is deleted from @resources but still reports reusable? = true.

require "async/pool/controller"
require "async/pool/resource"
require "sus/fixtures/async/reactor_context"

# A resource whose close yields (simulating async IO like sending GOAWAY),
# but whose reusable? check is synchronous (no yield).
class SlowCloseResource < Async::Pool::Resource
def close
Async::Task.current.yield
super
end
end

describe Async::Pool::Controller do
include Sus::Fixtures::Async::ReactorContext

with "retire/release race on slow-close resource" do
let(:pool) {subject.new(SlowCloseResource)}

it "gracefully handles release after retire begins closing" do
resource = pool.acquire

# Start retire in a child task. It runs synchronously up to the
# yield inside SlowCloseResource#close, then pauses. At that point
# @resources[resource] has been deleted but resource.close hasn't
# finished, so reusable? still returns true.
retire_task = Async do
pool.retire(resource)
end

# No yield needed — the child already ran to its yield point.
# Verify the race window exists:
expect(resource).to be(:reusable?)
expect(pool.resources).not.to be(:key?, resource)

# The client's error handler now tries to release the same resource.
# This should not raise — retire already claimed ownership.
pool.release(resource)

retire_task.wait
end

it "gracefully handles multiplexed release after retire begins closing" do
constructor = lambda{SlowCloseResource.new(128)}
pool = subject.new(constructor)

# Two streams on the same HTTP/2 connection:
r1 = pool.acquire
r2 = pool.acquire
expect(r1).to be_equal(r2)

# Background reader retires (yields during close):
retire_task = Async do
pool.retire(r1)
end

# The race window is open: resource deleted from pool but not
# yet closed. First stream's error handler hits the race:
expect(r1).to be(:reusable?)
expect(pool.resources).not.to be(:key?, r1)

# Should not raise:
pool.release(r1)

retire_task.wait
end
end
end
Loading