Skip to content

Commit 82a437f

Browse files
committed
feat: Add IterableAsyncQueue.
1 parent 902b1a6 commit 82a437f

File tree

3 files changed

+405
-2
lines changed

3 files changed

+405
-2
lines changed

lib/shared/common/build.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ base {
3333
java {
3434
withJavadocJar()
3535
withSourcesJar()
36-
sourceCompatibility = JavaVersion.VERSION_1_7
37-
targetCompatibility = JavaVersion.VERSION_1_7
36+
sourceCompatibility = JavaVersion.VERSION_1_8
37+
targetCompatibility = JavaVersion.VERSION_1_8
3838
}
3939

4040
// See Dependencies.kt in buildSrc for the purpose of "privateImplementation"
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package com.launchdarkly.sdk.collections;
2+
3+
import java.util.LinkedList;
4+
import java.util.concurrent.CompletableFuture;
5+
6+
/**
7+
* A thread-safe unbounded queue that provides asynchronous consumption via {@link CompletableFuture}.
8+
* <p>
9+
* This queue supports multiple concurrent producers and consumers. Items are delivered in FIFO order.
10+
* The {@link #take()} method returns a {@link CompletableFuture} that either completes immediately
11+
* if an item is available, or completes later when an item is added via {@link #put(Object)}.
12+
* <p>
13+
* When multiple consumers are waiting (i.e., multiple pending {@link #take()} calls), they are
14+
* satisfied in FIFO order as items become available.
15+
* <p>
16+
* Null values are supported.
17+
*
18+
* @param <T> the type of elements held in this queue
19+
*/
20+
class IterableAsyncQueue<T> {
21+
private final Object lock = new Object();
22+
private final LinkedList<T> queue = new LinkedList<>();
23+
24+
private final LinkedList<CompletableFuture<T>> pendingFutures = new LinkedList<>();
25+
26+
/**
27+
* Adds an item to the queue.
28+
* <p>
29+
* If there is a consumer is waiting (a pending {@link #take()} call), the item is delivered
30+
* directly to the oldest waiting consumer's future. Otherwise, the item is added to the
31+
* queue for later consumption.
32+
*
33+
* @param item the item to add (maybe null)
34+
*/
35+
public void put(T item) {
36+
synchronized (lock) {
37+
CompletableFuture<T> nextFuture = pendingFutures.pollFirst();
38+
if(nextFuture != null) {
39+
nextFuture.complete(item);
40+
return;
41+
}
42+
queue.addLast(item);
43+
}
44+
}
45+
/**
46+
* Retrieves and removes an item from the queue, returning a future that completes with the item.
47+
* <p>
48+
* If the queue contains items, returns an already-completed future with the oldest item.
49+
* If the queue is empty, returns a future that will complete when an item becomes available
50+
* via {@link #put(Object)}.
51+
*
52+
* @return a {@link CompletableFuture} that completes with the next item
53+
*/
54+
public CompletableFuture<T> take() {
55+
synchronized (lock) {
56+
if(!queue.isEmpty()) {
57+
return CompletableFuture.completedFuture(queue.removeFirst());
58+
}
59+
CompletableFuture<T> takeFuture = new CompletableFuture<>();
60+
pendingFutures.addLast(takeFuture);
61+
return takeFuture;
62+
}
63+
}
64+
}

0 commit comments

Comments
 (0)