Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,17 @@ long track(final PoolEntry<T, C> entry) {
PoolEntry<T, C> lease() {
lock.lock();
try {
final PoolEntry<T, C> entry = entryMap.entrySet().stream()
return entryMap.entrySet().stream()
.filter(e -> {
final C conn = e.getKey().getConnection();
return conn != null && conn.isOpen();
})
.min(Comparator.comparingLong(e -> e.getValue().get()))
.map(Map.Entry::getKey)
.map(e -> {
Copy link
Contributor

@yhzdys yhzdys Jul 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arturobernalg Perhaps this is better?

return entryMap.entrySet().stream()
        .filter(e -> {
            final C conn = e.getKey().getConnection();
            return conn != null && conn.isOpen();
        })
        .min(Comparator.comparingLong(e -> e.getValue().get()))
        .map(e -> {
            e.getValue().incrementAndGet();
            return e.getKey();
        })
        .orElse(null);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhzdys idd. I just made the change. please check

e.getValue().incrementAndGet();
return e.getKey();
})
.orElse(null);
if (entry == null) {
return null;
}
final AtomicLong counter = getCounter(entry);
counter.incrementAndGet();
return entry;
} finally {
lock.unlock();
}
Expand All @@ -288,20 +289,18 @@ PoolEntry<T, C> lease() {
long release(final PoolEntry<T, C> entry, final boolean reusable) {
lock.lock();
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arturobernalg Could you please add this check here?

if (!reusable) {
     entry.discardConnection(CloseMode.GRACEFUL);
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ok2c done

final C connection = entry.getConnection();
if (!reusable || connection == null || !connection.isOpen()) {
entryMap.remove(entry);
return 0;
} else {
final AtomicLong counter = entryMap.compute(entry, (e, c) -> {
if (c == null) {
return null;
}
final long count = c.decrementAndGet();
return count > 0 ? c : null;
});
return counter != null ? counter.get() : 0L;
if (!reusable) {
entry.discardConnection(CloseMode.GRACEFUL);
}

final AtomicLong counter = entryMap.compute(entry, (e, c) -> {
if (c == null) {
return null;
}
final long count = c.decrementAndGet();
return count > 0 ? c : null;
});
return counter != null ? counter.get() : 0L;
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,16 @@ void testLeaseFutureReturned() throws Exception {
@Test
void testLeaseExistingConnectionReturned() throws Exception {
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
final HttpConnection conn = Mockito.mock(HttpConnection.class);
Mockito.when(conn.isOpen()).thenReturn(true);
poolEntry.assignConnection(conn);

final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool =
h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
routePool.track(poolEntry);
final Future<PoolEntry<String, HttpConnection>> future =
h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback);

final Future<PoolEntry<String, HttpConnection>> future = h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback);
Assertions.assertNotNull(future);
Assertions.assertSame(poolEntry, future.get());

Expand All @@ -98,8 +104,7 @@ void testLeaseExistingConnectionReturned() throws Exception {
Mockito.any(),
Mockito.any(),
Mockito.any());
Mockito.verify(callback).completed(
Mockito.same(poolEntry));
Mockito.verify(callback).completed(Mockito.same(poolEntry));
}

@Test
Expand Down Expand Up @@ -314,38 +319,6 @@ void testReleaseReusableInCacheNotReturnedToPool() throws Exception {
Mockito.anyBoolean());
}

@Test
void testReleaseNonReusableInCacheReturnedToPool() throws Exception {
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
poolEntry.assignConnection(connection);
Mockito.when(connection.isOpen()).thenReturn(true);
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
routePool.track(poolEntry);
routePool.track(poolEntry);

h2SharingPool.release(poolEntry, false);

Mockito.verify(connPool).release(
Mockito.same(poolEntry),
Mockito.eq(false));
}

@Test
void testReleaseReusableAndClosedInCacheReturnedToPool() throws Exception {
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
poolEntry.assignConnection(connection);
Mockito.when(connection.isOpen()).thenReturn(false);
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
routePool.track(poolEntry);
routePool.track(poolEntry);

h2SharingPool.release(poolEntry, true);

Mockito.verify(connPool).release(
Mockito.same(poolEntry),
Mockito.eq(true));
}

/**
* Same connection can only be released once.
* Attempting to release it again will throw: IllegalStateException("Pool entry is not present in the set of leased entries")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void testReleaseNonReusable() {
pool.track(poolEntry1);
pool.track(poolEntry1);

Assertions.assertEquals(0, pool.release(poolEntry1, false));
Assertions.assertEquals(2, pool.release(poolEntry1, false)); // 3 → 2
}

@Test
Expand All @@ -114,7 +114,7 @@ void testReleaseConnectionClosed() {
pool.track(poolEntry1);

Mockito.when(poolEntry1.getConnection().isOpen()).thenReturn(false);
Assertions.assertEquals(0, pool.release(poolEntry1, true));
Assertions.assertEquals(2, pool.release(poolEntry1, true)); // 3 → 2
}

@Test
Expand All @@ -124,7 +124,7 @@ void testReleaseConnectionMissing() {
pool.track(poolEntry1);

poolEntry1.discardConnection(CloseMode.IMMEDIATE);
Assertions.assertEquals(0, pool.release(poolEntry1, true));
Assertions.assertEquals(2, pool.release(poolEntry1, true)); // 3 → 2
}

}