From 65e92d566fcfe00ab8d50d1e768c3f7a0f097b47 Mon Sep 17 00:00:00 2001
From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com>
Date: Thu, 29 Jan 2026 13:59:10 -0800
Subject: [PATCH 1/2] feat: Add IterableAsyncQueue.
---
lib/shared/common/build.gradle.kts | 4 +-
.../sdk/collections/IterableAsyncQueue.java | 68 ++++
.../sdk/collections/package-info.java | 4 +
.../collections/IterableAsyncQueueTest.java | 339 ++++++++++++++++++
.../sdk/collections/package-info.java | 4 +
5 files changed, 417 insertions(+), 2 deletions(-)
create mode 100644 lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/IterableAsyncQueue.java
create mode 100644 lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/package-info.java
create mode 100644 lib/shared/common/src/test/java/com/launchdarkly/sdk/collections/IterableAsyncQueueTest.java
create mode 100644 lib/shared/common/src/test/java/com/launchdarkly/sdk/collections/package-info.java
diff --git a/lib/shared/common/build.gradle.kts b/lib/shared/common/build.gradle.kts
index 82a4b5b7..47d3464e 100644
--- a/lib/shared/common/build.gradle.kts
+++ b/lib/shared/common/build.gradle.kts
@@ -33,8 +33,8 @@ base {
java {
withJavadocJar()
withSourcesJar()
- sourceCompatibility = JavaVersion.VERSION_1_7
- targetCompatibility = JavaVersion.VERSION_1_7
+ sourceCompatibility = JavaVersion.VERSION_1_8
+ targetCompatibility = JavaVersion.VERSION_1_8
}
// See Dependencies.kt in buildSrc for the purpose of "privateImplementation"
diff --git a/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/IterableAsyncQueue.java b/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/IterableAsyncQueue.java
new file mode 100644
index 00000000..6c55858e
--- /dev/null
+++ b/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/IterableAsyncQueue.java
@@ -0,0 +1,68 @@
+package com.launchdarkly.sdk.collections;
+
+import java.util.LinkedList;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A thread-safe unbounded queue that provides asynchronous consumption via {@link CompletableFuture}.
+ *
+ * This queue supports multiple concurrent producers and consumers. Items are delivered in FIFO order.
+ * The {@link #take()} method returns a {@link CompletableFuture} that either completes immediately
+ * if an item is available, or completes later when an item is added via {@link #put(Object)}.
+ *
+ * When multiple consumers are waiting (i.e., multiple pending {@link #take()} calls), they are
+ * satisfied in FIFO order as items become available.
+ *
+ * Null values are supported.
+ *
+ * @param the type of elements held in this queue
+ */
+class IterableAsyncQueue {
+ private final Object lock = new Object();
+ private final LinkedList queue = new LinkedList<>();
+
+ private final LinkedList> pendingFutures = new LinkedList<>();
+
+ /**
+ * Adds an item to the queue.
+ *
+ * If there is a consumer is waiting (a pending {@link #take()} call), the item is delivered
+ * directly to the oldest waiting consumer's future. Otherwise, the item is added to the
+ * queue for later consumption.
+ *
+ * If a future returned by this method is completed or canceled by the caller, then the item associated
+ * with that call will not be delivered. It is recommended not to complete or cancel the future
+ * returned by {@link #take()} unless you are finished using the queue.
+ *
+ * @param item the item to add (maybe null)
+ */
+ public void put(T item) {
+ synchronized (lock) {
+ CompletableFuture nextFuture = pendingFutures.pollFirst();
+ if(nextFuture != null) {
+ nextFuture.complete(item);
+ return;
+ }
+ queue.addLast(item);
+ }
+ }
+ /**
+ * Retrieves and removes an item from the queue, returning a future that completes with the item.
+ *
+ * If the queue contains items, returns an already-completed future with the oldest item.
+ * If the queue is empty, returns a future that will complete when an item becomes available
+ * via {@link #put(Object)}.
+ *
+ * @return a {@link CompletableFuture} that completes with the next item
+ */
+ public CompletableFuture take() {
+ synchronized (lock) {
+ if(!queue.isEmpty()) {
+ return CompletableFuture.completedFuture(queue.removeFirst());
+ }
+ CompletableFuture takeFuture = new CompletableFuture<>();
+ pendingFutures.addLast(takeFuture);
+ return takeFuture;
+ }
+ }
+}
diff --git a/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/package-info.java b/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/package-info.java
new file mode 100644
index 00000000..f556d829
--- /dev/null
+++ b/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Collections for use in LaunchDarkly SDKs and components.
+ */
+package com.launchdarkly.sdk.collections;
diff --git a/lib/shared/common/src/test/java/com/launchdarkly/sdk/collections/IterableAsyncQueueTest.java b/lib/shared/common/src/test/java/com/launchdarkly/sdk/collections/IterableAsyncQueueTest.java
new file mode 100644
index 00000000..59642fb1
--- /dev/null
+++ b/lib/shared/common/src/test/java/com/launchdarkly/sdk/collections/IterableAsyncQueueTest.java
@@ -0,0 +1,339 @@
+package com.launchdarkly.sdk.collections;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("javadoc")
+public class IterableAsyncQueueTest {
+
+ @Test
+ public void putThenTakeReturnsImmediately() throws Exception {
+ IterableAsyncQueue queue = new IterableAsyncQueue<>();
+
+ queue.put("item1");
+
+ CompletableFuture future = queue.take();
+ assertTrue("Future should be completed immediately", future.isDone());
+ assertEquals("item1", future.get());
+ }
+
+ @Test
+ public void takeThenPutCompletesWaitingFuture() throws Exception {
+ IterableAsyncQueue queue = new IterableAsyncQueue<>();
+
+ CompletableFuture future = queue.take();
+ assertFalse("Future should not be completed yet", future.isDone());
+
+ queue.put("item1");
+
+ assertTrue("Future should be completed after put", future.isDone());
+ assertEquals("item1", future.get());
+ }
+
+ @Test
+ public void multiplePutsThenMultipleTakesPreservesOrder() throws Exception {
+ IterableAsyncQueue queue = new IterableAsyncQueue<>();
+
+ // Put multiple items
+ queue.put(1);
+ queue.put(2);
+ queue.put(3);
+
+ // Take them in order
+ assertEquals(Integer.valueOf(1), queue.take().get());
+ assertEquals(Integer.valueOf(2), queue.take().get());
+ assertEquals(Integer.valueOf(3), queue.take().get());
+ }
+
+ @Test
+ public void multipleTakesThenMultiplePutsCompletesInOrder() throws Exception {
+ IterableAsyncQueue queue = new IterableAsyncQueue<>();
+
+ // Multiple takes when queue is empty
+ CompletableFuture future1 = queue.take();
+ CompletableFuture future2 = queue.take();
+ CompletableFuture future3 = queue.take();
+
+ assertFalse(future1.isDone());
+ assertFalse(future2.isDone());
+ assertFalse(future3.isDone());
+
+ // Put items - should complete futures in FIFO order
+ queue.put(1);
+ assertTrue("First future should be completed", future1.isDone());
+ assertFalse("Second future should not be completed yet", future2.isDone());
+ assertFalse("Third future should not be completed yet", future3.isDone());
+ assertEquals(Integer.valueOf(1), future1.get());
+
+ queue.put(2);
+ assertTrue("Second future should be completed", future2.isDone());
+ assertFalse("Third future should not be completed yet", future3.isDone());
+ assertEquals(Integer.valueOf(2), future2.get());
+
+ queue.put(3);
+ assertTrue("Third future should be completed", future3.isDone());
+ assertEquals(Integer.valueOf(3), future3.get());
+ }
+
+ @Test
+ public void interleavedPutAndTakeOperations() throws Exception {
+ IterableAsyncQueue queue = new IterableAsyncQueue<>();
+
+ // Put one
+ queue.put("a");
+ assertEquals("a", queue.take().get());
+
+ // Take when empty, then put
+ CompletableFuture future = queue.take();
+ assertFalse(future.isDone());
+ queue.put("b");
+ assertEquals("b", future.get());
+
+ // Put multiple, take one, put one more, take remaining
+ queue.put("c");
+ queue.put("d");
+ assertEquals("c", queue.take().get());
+ queue.put("e");
+ assertEquals("d", queue.take().get());
+ assertEquals("e", queue.take().get());
+ }
+
+ @Test
+ public void concurrentProducersAndConsumers() throws Exception {
+ IterableAsyncQueue queue = new IterableAsyncQueue<>();
+ int itemCount = 1000;
+ int producerThreads = 5;
+ int consumerThreads = 5;
+
+ ExecutorService executor = Executors.newFixedThreadPool(producerThreads + consumerThreads);
+ CountDownLatch producerLatch = new CountDownLatch(producerThreads);
+ CountDownLatch consumerLatch = new CountDownLatch(consumerThreads);
+
+ List consumedItems = new ArrayList<>();
+ Object consumedLock = new Object();
+
+ // Start producers
+ for (int t = 0; t < producerThreads; t++) {
+ final int threadId = t;
+ executor.submit(() -> {
+ try {
+ for (int i = 0; i < itemCount / producerThreads; i++) {
+ queue.put(threadId * 1000 + i);
+ Thread.sleep(1); // Small delay to encourage interleaving
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ producerLatch.countDown();
+ }
+ });
+ }
+
+ // Start consumers
+ for (int t = 0; t < consumerThreads; t++) {
+ executor.submit(() -> {
+ try {
+ for (int i = 0; i < itemCount / consumerThreads; i++) {
+ Integer item = queue.take().get(5, TimeUnit.SECONDS);
+ synchronized (consumedLock) {
+ consumedItems.add(item);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ consumerLatch.countDown();
+ }
+ });
+ }
+
+ // Wait for completion
+ assertTrue("Producers should complete", producerLatch.await(10, TimeUnit.SECONDS));
+ assertTrue("Consumers should complete", consumerLatch.await(10, TimeUnit.SECONDS));
+
+ executor.shutdown();
+ assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
+
+ // Verify all items were consumed
+ assertEquals("All items should be consumed", itemCount, consumedItems.size());
+ }
+
+ @Test
+ public void singleProducerAndConsumer() throws Exception {
+ IterableAsyncQueue queue = new IterableAsyncQueue<>();
+ int itemCount = 10000;
+
+ AtomicInteger producedCount = new AtomicInteger(0);
+ AtomicInteger consumedCount = new AtomicInteger(0);
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+
+ // Producer
+ CompletableFuture producer = CompletableFuture.runAsync(() -> {
+ for (int i = 0; i < itemCount; i++) {
+ queue.put(i);
+ producedCount.incrementAndGet();
+ }
+ }, executor);
+
+ // Consumer
+ CompletableFuture consumer = CompletableFuture.runAsync(() -> {
+ try {
+ for (int i = 0; i < itemCount; i++) {
+ Integer item = queue.take().get(5, TimeUnit.SECONDS);
+ assertEquals(Integer.valueOf(i), item);
+ consumedCount.incrementAndGet();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, executor);
+
+ // Wait for both to complete
+ CompletableFuture.allOf(producer, consumer).get(10, TimeUnit.SECONDS);
+
+ executor.shutdown();
+ assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
+
+ assertEquals("All items should be produced", itemCount, producedCount.get());
+ assertEquals("All items should be consumed", itemCount, consumedCount.get());
+ }
+
+ @Test
+ public void multipleProducersSingleConsumer() throws Exception {
+ IterableAsyncQueue queue = new IterableAsyncQueue<>();
+ int producersCount = 10;
+ int itemsPerProducer = 100;
+ int totalItems = producersCount * itemsPerProducer;
+
+ ExecutorService executor = Executors.newFixedThreadPool(producersCount + 1);
+ CountDownLatch producerLatch = new CountDownLatch(producersCount);
+
+ // Start multiple producers
+ for (int p = 0; p < producersCount; p++) {
+ final int producerId = p;
+ executor.submit(() -> {
+ try {
+ for (int i = 0; i < itemsPerProducer; i++) {
+ queue.put("producer-" + producerId + "-item-" + i);
+ }
+ } finally {
+ producerLatch.countDown();
+ }
+ });
+ }
+
+ // Single consumer
+ List consumed = new ArrayList<>();
+ CompletableFuture consumer = CompletableFuture.runAsync(() -> {
+ try {
+ for (int i = 0; i < totalItems; i++) {
+ String item = queue.take().get(5, TimeUnit.SECONDS);
+ consumed.add(item);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, executor);
+
+ assertTrue("Producers should complete", producerLatch.await(10, TimeUnit.SECONDS));
+ consumer.get(10, TimeUnit.SECONDS);
+
+ executor.shutdown();
+ assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
+
+ assertEquals("Consumer should receive all items", totalItems, consumed.size());
+ }
+
+ @Test
+ public void singleProducerMultipleConsumers() throws Exception {
+ IterableAsyncQueue queue = new IterableAsyncQueue<>();
+ int consumersCount = 10;
+ int totalItems = 1000;
+ int itemsPerConsumer = totalItems / consumersCount;
+
+ ExecutorService executor = Executors.newFixedThreadPool(consumersCount + 1);
+ CountDownLatch consumerLatch = new CountDownLatch(consumersCount);
+
+ List allConsumed = new ArrayList<>();
+ Object consumedLock = new Object();
+
+ // Start multiple consumers
+ for (int c = 0; c < consumersCount; c++) {
+ executor.submit(() -> {
+ try {
+ for (int i = 0; i < itemsPerConsumer; i++) {
+ Integer item = queue.take().get(5, TimeUnit.SECONDS);
+ synchronized (consumedLock) {
+ allConsumed.add(item);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ consumerLatch.countDown();
+ }
+ });
+ }
+
+ // Single producer
+ CompletableFuture producer = CompletableFuture.runAsync(() -> {
+ for (int i = 0; i < totalItems; i++) {
+ queue.put(i);
+ }
+ }, executor);
+
+ producer.get(5, TimeUnit.SECONDS);
+ assertTrue("Consumers should complete", consumerLatch.await(10, TimeUnit.SECONDS));
+
+ executor.shutdown();
+ assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
+
+ assertEquals("All items should be consumed", totalItems, allConsumed.size());
+ }
+
+ @Test
+ public void nullValuesAreSupported() throws Exception {
+ IterableAsyncQueue queue = new IterableAsyncQueue<>();
+
+ queue.put(null);
+ queue.put("not-null");
+ queue.put(null);
+
+ assertNull(queue.take().get());
+ assertEquals("not-null", queue.take().get());
+ assertNull(queue.take().get());
+ }
+
+ @Test
+ public void takeCompletesAsynchronously() throws Exception {
+ IterableAsyncQueue queue = new IterableAsyncQueue<>();
+
+ CompletableFuture future = queue.take();
+ AtomicInteger callbackInvoked = new AtomicInteger(0);
+
+ // Attach callback
+ future.thenAccept(item -> {
+ assertEquals("async-item", item);
+ callbackInvoked.incrementAndGet();
+ });
+
+ assertFalse("Future should not be completed yet", future.isDone());
+ assertEquals(0, callbackInvoked.get());
+
+ // Put item should trigger callback
+ queue.put("async-item");
+
+ // Give callback time to execute
+ Thread.sleep(50);
+
+ assertTrue("Future should be completed", future.isDone());
+ assertEquals("Callback should have been invoked", 1, callbackInvoked.get());
+ }
+}
diff --git a/lib/shared/common/src/test/java/com/launchdarkly/sdk/collections/package-info.java b/lib/shared/common/src/test/java/com/launchdarkly/sdk/collections/package-info.java
new file mode 100644
index 00000000..5fef164e
--- /dev/null
+++ b/lib/shared/common/src/test/java/com/launchdarkly/sdk/collections/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Tests for collections.
+ */
+package com.launchdarkly.sdk.collections;
From c8eb15c9fa08fb8d451199eb11a6aeede71bdd42 Mon Sep 17 00:00:00 2001
From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com>
Date: Thu, 29 Jan 2026 14:59:24 -0800
Subject: [PATCH 2/2] Complete promise outside lock.
---
.../launchdarkly/sdk/collections/IterableAsyncQueue.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/IterableAsyncQueue.java b/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/IterableAsyncQueue.java
index 6c55858e..baaa4399 100644
--- a/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/IterableAsyncQueue.java
+++ b/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/IterableAsyncQueue.java
@@ -37,14 +37,18 @@ class IterableAsyncQueue {
* @param item the item to add (maybe null)
*/
public void put(T item) {
+ CompletableFuture pendingFuture = null;
synchronized (lock) {
CompletableFuture nextFuture = pendingFutures.pollFirst();
if(nextFuture != null) {
- nextFuture.complete(item);
+ pendingFuture = nextFuture;
+ } else {
+ queue.addLast(item);
return;
}
- queue.addLast(item);
}
+ // Execute callback outside the lock.
+ pendingFuture.complete(item);
}
/**
* Retrieves and removes an item from the queue, returning a future that completes with the item.