Skip to content

Commit 5ce8a9b

Browse files
committed
Off-lock disposal in PoolingHttpClientConnectionManager: enqueue GRACEFUL, run IMMEDIATE inline
Drain queue after lease/release/closeExpired/closeIdle to avoid holding the pool lock
1 parent 888c65b commit 5ce8a9b

4 files changed

Lines changed: 196 additions & 449 deletions

File tree

httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/OffLockDisposalCallback.java

Lines changed: 10 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,8 @@
2626
*/
2727
package org.apache.hc.client5.http.impl.io;
2828

29-
import java.util.ArrayDeque;
30-
import java.util.Deque;
31-
import java.util.concurrent.atomic.AtomicBoolean;
32-
import java.util.concurrent.locks.ReentrantLock;
29+
import java.util.Queue;
30+
import java.util.concurrent.ConcurrentLinkedQueue;
3331

3432
import org.apache.hc.core5.annotation.Internal;
3533
import org.apache.hc.core5.io.CloseMode;
@@ -40,65 +38,24 @@
4038
final class OffLockDisposalCallback<T extends ModalCloseable> implements DisposalCallback<T> {
4139

4240
private final DisposalCallback<T> delegate;
43-
private final Deque<Item<T>> queue = new ArrayDeque<>();
44-
private final ReentrantLock qlock = new ReentrantLock();
45-
private final AtomicBoolean draining = new AtomicBoolean();
41+
private final Queue<T> gracefulQueue = new ConcurrentLinkedQueue<>();
4642

4743
OffLockDisposalCallback(final DisposalCallback<T> delegate) {
4844
this.delegate = delegate;
4945
}
5046

5147
@Override
5248
public void execute(final T closeable, final CloseMode mode) {
53-
qlock.lock();
54-
try {
55-
queue.addLast(new Item<>(closeable, mode));
56-
} finally {
57-
qlock.unlock();
49+
if (mode == CloseMode.IMMEDIATE) {
50+
delegate.execute(closeable, CloseMode.IMMEDIATE);
51+
} else {
52+
gracefulQueue.offer(closeable);
5853
}
5954
}
6055

6156
void drain() {
62-
for (;;) {
63-
if (!draining.compareAndSet(false, true)) {
64-
return;
65-
}
66-
try {
67-
for (;;) {
68-
final Item<T> it;
69-
qlock.lock();
70-
try {
71-
it = queue.pollFirst();
72-
} finally {
73-
qlock.unlock();
74-
}
75-
if (it == null) {
76-
break;
77-
}
78-
delegate.execute(it.closeable, it.mode);
79-
}
80-
} finally {
81-
draining.set(false);
82-
}
83-
qlock.lock();
84-
try {
85-
if (queue.isEmpty()) {
86-
return;
87-
}
88-
} finally {
89-
qlock.unlock();
90-
}
57+
for (T c; (c = gracefulQueue.poll()) != null; ) {
58+
delegate.execute(c, CloseMode.GRACEFUL);
9159
}
9260
}
93-
94-
static final class Item<T> {
95-
final T closeable;
96-
final CloseMode mode;
97-
98-
Item(final T c, final CloseMode m) {
99-
this.closeable = c;
100-
this.mode = m;
101-
}
102-
}
103-
}
104-
61+
}

httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,6 @@ public ConnectionEndpoint get(
415415
}
416416
} finally {
417417
lock.unlock();
418-
drainDisposals();
419418
}
420419
}
421420

httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/SleeperConnection.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,13 @@ public void bind(final Socket socket) throws IOException {
6565
@Override
6666
public void close(final CloseMode closeMode) {
6767
try {
68-
Thread.sleep(sleepMillis);
68+
if (closeMode == CloseMode.GRACEFUL) {
69+
Thread.sleep(sleepMillis); // slow only for graceful
70+
}
6971
} catch (final InterruptedException ignore) {
7072
Thread.currentThread().interrupt();
7173
} finally {
7274
open = false;
73-
if (onClose != null) {
74-
onClose.run(); // capture thread in test
75-
}
7675
}
7776
}
7877

0 commit comments

Comments
 (0)