From 65e92d566fcfe00ab8d50d1e768c3f7a0f097b47 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Thu, 29 Jan 2026 13:59:10 -0800 Subject: [PATCH 1/2] feat: Add IterableAsyncQueue. --- lib/shared/common/build.gradle.kts | 4 +- .../sdk/collections/IterableAsyncQueue.java | 68 ++++ .../sdk/collections/package-info.java | 4 + .../collections/IterableAsyncQueueTest.java | 339 ++++++++++++++++++ .../sdk/collections/package-info.java | 4 + 5 files changed, 417 insertions(+), 2 deletions(-) create mode 100644 lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/IterableAsyncQueue.java create mode 100644 lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/package-info.java create mode 100644 lib/shared/common/src/test/java/com/launchdarkly/sdk/collections/IterableAsyncQueueTest.java create mode 100644 lib/shared/common/src/test/java/com/launchdarkly/sdk/collections/package-info.java diff --git a/lib/shared/common/build.gradle.kts b/lib/shared/common/build.gradle.kts index 82a4b5b7..47d3464e 100644 --- a/lib/shared/common/build.gradle.kts +++ b/lib/shared/common/build.gradle.kts @@ -33,8 +33,8 @@ base { java { withJavadocJar() withSourcesJar() - sourceCompatibility = JavaVersion.VERSION_1_7 - targetCompatibility = JavaVersion.VERSION_1_7 + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 } // See Dependencies.kt in buildSrc for the purpose of "privateImplementation" diff --git a/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/IterableAsyncQueue.java b/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/IterableAsyncQueue.java new file mode 100644 index 00000000..6c55858e --- /dev/null +++ b/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/IterableAsyncQueue.java @@ -0,0 +1,68 @@ +package com.launchdarkly.sdk.collections; + +import java.util.LinkedList; +import java.util.concurrent.CompletableFuture; + +/** + * A thread-safe unbounded queue that provides asynchronous consumption via {@link CompletableFuture}. + *

+ * This queue supports multiple concurrent producers and consumers. Items are delivered in FIFO order. + * The {@link #take()} method returns a {@link CompletableFuture} that either completes immediately + * if an item is available, or completes later when an item is added via {@link #put(Object)}. + *

+ * When multiple consumers are waiting (i.e., multiple pending {@link #take()} calls), they are + * satisfied in FIFO order as items become available. + *

+ * Null values are supported. + * + * @param the type of elements held in this queue + */ +class IterableAsyncQueue { + private final Object lock = new Object(); + private final LinkedList queue = new LinkedList<>(); + + private final LinkedList> pendingFutures = new LinkedList<>(); + + /** + * Adds an item to the queue. + *

+ * If there is a consumer is waiting (a pending {@link #take()} call), the item is delivered + * directly to the oldest waiting consumer's future. Otherwise, the item is added to the + * queue for later consumption. + *

+ * If a future returned by this method is completed or canceled by the caller, then the item associated + * with that call will not be delivered. It is recommended not to complete or cancel the future + * returned by {@link #take()} unless you are finished using the queue. + * + * @param item the item to add (maybe null) + */ + public void put(T item) { + synchronized (lock) { + CompletableFuture nextFuture = pendingFutures.pollFirst(); + if(nextFuture != null) { + nextFuture.complete(item); + return; + } + queue.addLast(item); + } + } + /** + * Retrieves and removes an item from the queue, returning a future that completes with the item. + *

+ * If the queue contains items, returns an already-completed future with the oldest item. + * If the queue is empty, returns a future that will complete when an item becomes available + * via {@link #put(Object)}. + * + * @return a {@link CompletableFuture} that completes with the next item + */ + public CompletableFuture take() { + synchronized (lock) { + if(!queue.isEmpty()) { + return CompletableFuture.completedFuture(queue.removeFirst()); + } + CompletableFuture takeFuture = new CompletableFuture<>(); + pendingFutures.addLast(takeFuture); + return takeFuture; + } + } +} diff --git a/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/package-info.java b/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/package-info.java new file mode 100644 index 00000000..f556d829 --- /dev/null +++ b/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/package-info.java @@ -0,0 +1,4 @@ +/** + * Collections for use in LaunchDarkly SDKs and components. + */ +package com.launchdarkly.sdk.collections; diff --git a/lib/shared/common/src/test/java/com/launchdarkly/sdk/collections/IterableAsyncQueueTest.java b/lib/shared/common/src/test/java/com/launchdarkly/sdk/collections/IterableAsyncQueueTest.java new file mode 100644 index 00000000..59642fb1 --- /dev/null +++ b/lib/shared/common/src/test/java/com/launchdarkly/sdk/collections/IterableAsyncQueueTest.java @@ -0,0 +1,339 @@ +package com.launchdarkly.sdk.collections; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + +@SuppressWarnings("javadoc") +public class IterableAsyncQueueTest { + + @Test + public void putThenTakeReturnsImmediately() throws Exception { + IterableAsyncQueue queue = new IterableAsyncQueue<>(); + + queue.put("item1"); + + CompletableFuture future = queue.take(); + assertTrue("Future should be completed immediately", future.isDone()); + assertEquals("item1", future.get()); + } + + @Test + public void takeThenPutCompletesWaitingFuture() throws Exception { + IterableAsyncQueue queue = new IterableAsyncQueue<>(); + + CompletableFuture future = queue.take(); + assertFalse("Future should not be completed yet", future.isDone()); + + queue.put("item1"); + + assertTrue("Future should be completed after put", future.isDone()); + assertEquals("item1", future.get()); + } + + @Test + public void multiplePutsThenMultipleTakesPreservesOrder() throws Exception { + IterableAsyncQueue queue = new IterableAsyncQueue<>(); + + // Put multiple items + queue.put(1); + queue.put(2); + queue.put(3); + + // Take them in order + assertEquals(Integer.valueOf(1), queue.take().get()); + assertEquals(Integer.valueOf(2), queue.take().get()); + assertEquals(Integer.valueOf(3), queue.take().get()); + } + + @Test + public void multipleTakesThenMultiplePutsCompletesInOrder() throws Exception { + IterableAsyncQueue queue = new IterableAsyncQueue<>(); + + // Multiple takes when queue is empty + CompletableFuture future1 = queue.take(); + CompletableFuture future2 = queue.take(); + CompletableFuture future3 = queue.take(); + + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); + assertFalse(future3.isDone()); + + // Put items - should complete futures in FIFO order + queue.put(1); + assertTrue("First future should be completed", future1.isDone()); + assertFalse("Second future should not be completed yet", future2.isDone()); + assertFalse("Third future should not be completed yet", future3.isDone()); + assertEquals(Integer.valueOf(1), future1.get()); + + queue.put(2); + assertTrue("Second future should be completed", future2.isDone()); + assertFalse("Third future should not be completed yet", future3.isDone()); + assertEquals(Integer.valueOf(2), future2.get()); + + queue.put(3); + assertTrue("Third future should be completed", future3.isDone()); + assertEquals(Integer.valueOf(3), future3.get()); + } + + @Test + public void interleavedPutAndTakeOperations() throws Exception { + IterableAsyncQueue queue = new IterableAsyncQueue<>(); + + // Put one + queue.put("a"); + assertEquals("a", queue.take().get()); + + // Take when empty, then put + CompletableFuture future = queue.take(); + assertFalse(future.isDone()); + queue.put("b"); + assertEquals("b", future.get()); + + // Put multiple, take one, put one more, take remaining + queue.put("c"); + queue.put("d"); + assertEquals("c", queue.take().get()); + queue.put("e"); + assertEquals("d", queue.take().get()); + assertEquals("e", queue.take().get()); + } + + @Test + public void concurrentProducersAndConsumers() throws Exception { + IterableAsyncQueue queue = new IterableAsyncQueue<>(); + int itemCount = 1000; + int producerThreads = 5; + int consumerThreads = 5; + + ExecutorService executor = Executors.newFixedThreadPool(producerThreads + consumerThreads); + CountDownLatch producerLatch = new CountDownLatch(producerThreads); + CountDownLatch consumerLatch = new CountDownLatch(consumerThreads); + + List consumedItems = new ArrayList<>(); + Object consumedLock = new Object(); + + // Start producers + for (int t = 0; t < producerThreads; t++) { + final int threadId = t; + executor.submit(() -> { + try { + for (int i = 0; i < itemCount / producerThreads; i++) { + queue.put(threadId * 1000 + i); + Thread.sleep(1); // Small delay to encourage interleaving + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + producerLatch.countDown(); + } + }); + } + + // Start consumers + for (int t = 0; t < consumerThreads; t++) { + executor.submit(() -> { + try { + for (int i = 0; i < itemCount / consumerThreads; i++) { + Integer item = queue.take().get(5, TimeUnit.SECONDS); + synchronized (consumedLock) { + consumedItems.add(item); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + consumerLatch.countDown(); + } + }); + } + + // Wait for completion + assertTrue("Producers should complete", producerLatch.await(10, TimeUnit.SECONDS)); + assertTrue("Consumers should complete", consumerLatch.await(10, TimeUnit.SECONDS)); + + executor.shutdown(); + assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); + + // Verify all items were consumed + assertEquals("All items should be consumed", itemCount, consumedItems.size()); + } + + @Test + public void singleProducerAndConsumer() throws Exception { + IterableAsyncQueue queue = new IterableAsyncQueue<>(); + int itemCount = 10000; + + AtomicInteger producedCount = new AtomicInteger(0); + AtomicInteger consumedCount = new AtomicInteger(0); + + ExecutorService executor = Executors.newFixedThreadPool(2); + + // Producer + CompletableFuture producer = CompletableFuture.runAsync(() -> { + for (int i = 0; i < itemCount; i++) { + queue.put(i); + producedCount.incrementAndGet(); + } + }, executor); + + // Consumer + CompletableFuture consumer = CompletableFuture.runAsync(() -> { + try { + for (int i = 0; i < itemCount; i++) { + Integer item = queue.take().get(5, TimeUnit.SECONDS); + assertEquals(Integer.valueOf(i), item); + consumedCount.incrementAndGet(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }, executor); + + // Wait for both to complete + CompletableFuture.allOf(producer, consumer).get(10, TimeUnit.SECONDS); + + executor.shutdown(); + assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); + + assertEquals("All items should be produced", itemCount, producedCount.get()); + assertEquals("All items should be consumed", itemCount, consumedCount.get()); + } + + @Test + public void multipleProducersSingleConsumer() throws Exception { + IterableAsyncQueue queue = new IterableAsyncQueue<>(); + int producersCount = 10; + int itemsPerProducer = 100; + int totalItems = producersCount * itemsPerProducer; + + ExecutorService executor = Executors.newFixedThreadPool(producersCount + 1); + CountDownLatch producerLatch = new CountDownLatch(producersCount); + + // Start multiple producers + for (int p = 0; p < producersCount; p++) { + final int producerId = p; + executor.submit(() -> { + try { + for (int i = 0; i < itemsPerProducer; i++) { + queue.put("producer-" + producerId + "-item-" + i); + } + } finally { + producerLatch.countDown(); + } + }); + } + + // Single consumer + List consumed = new ArrayList<>(); + CompletableFuture consumer = CompletableFuture.runAsync(() -> { + try { + for (int i = 0; i < totalItems; i++) { + String item = queue.take().get(5, TimeUnit.SECONDS); + consumed.add(item); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }, executor); + + assertTrue("Producers should complete", producerLatch.await(10, TimeUnit.SECONDS)); + consumer.get(10, TimeUnit.SECONDS); + + executor.shutdown(); + assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); + + assertEquals("Consumer should receive all items", totalItems, consumed.size()); + } + + @Test + public void singleProducerMultipleConsumers() throws Exception { + IterableAsyncQueue queue = new IterableAsyncQueue<>(); + int consumersCount = 10; + int totalItems = 1000; + int itemsPerConsumer = totalItems / consumersCount; + + ExecutorService executor = Executors.newFixedThreadPool(consumersCount + 1); + CountDownLatch consumerLatch = new CountDownLatch(consumersCount); + + List allConsumed = new ArrayList<>(); + Object consumedLock = new Object(); + + // Start multiple consumers + for (int c = 0; c < consumersCount; c++) { + executor.submit(() -> { + try { + for (int i = 0; i < itemsPerConsumer; i++) { + Integer item = queue.take().get(5, TimeUnit.SECONDS); + synchronized (consumedLock) { + allConsumed.add(item); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + consumerLatch.countDown(); + } + }); + } + + // Single producer + CompletableFuture producer = CompletableFuture.runAsync(() -> { + for (int i = 0; i < totalItems; i++) { + queue.put(i); + } + }, executor); + + producer.get(5, TimeUnit.SECONDS); + assertTrue("Consumers should complete", consumerLatch.await(10, TimeUnit.SECONDS)); + + executor.shutdown(); + assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); + + assertEquals("All items should be consumed", totalItems, allConsumed.size()); + } + + @Test + public void nullValuesAreSupported() throws Exception { + IterableAsyncQueue queue = new IterableAsyncQueue<>(); + + queue.put(null); + queue.put("not-null"); + queue.put(null); + + assertNull(queue.take().get()); + assertEquals("not-null", queue.take().get()); + assertNull(queue.take().get()); + } + + @Test + public void takeCompletesAsynchronously() throws Exception { + IterableAsyncQueue queue = new IterableAsyncQueue<>(); + + CompletableFuture future = queue.take(); + AtomicInteger callbackInvoked = new AtomicInteger(0); + + // Attach callback + future.thenAccept(item -> { + assertEquals("async-item", item); + callbackInvoked.incrementAndGet(); + }); + + assertFalse("Future should not be completed yet", future.isDone()); + assertEquals(0, callbackInvoked.get()); + + // Put item should trigger callback + queue.put("async-item"); + + // Give callback time to execute + Thread.sleep(50); + + assertTrue("Future should be completed", future.isDone()); + assertEquals("Callback should have been invoked", 1, callbackInvoked.get()); + } +} diff --git a/lib/shared/common/src/test/java/com/launchdarkly/sdk/collections/package-info.java b/lib/shared/common/src/test/java/com/launchdarkly/sdk/collections/package-info.java new file mode 100644 index 00000000..5fef164e --- /dev/null +++ b/lib/shared/common/src/test/java/com/launchdarkly/sdk/collections/package-info.java @@ -0,0 +1,4 @@ +/** + * Tests for collections. + */ +package com.launchdarkly.sdk.collections; From c8eb15c9fa08fb8d451199eb11a6aeede71bdd42 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Thu, 29 Jan 2026 14:59:24 -0800 Subject: [PATCH 2/2] Complete promise outside lock. --- .../launchdarkly/sdk/collections/IterableAsyncQueue.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/IterableAsyncQueue.java b/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/IterableAsyncQueue.java index 6c55858e..baaa4399 100644 --- a/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/IterableAsyncQueue.java +++ b/lib/shared/common/src/main/java/com/launchdarkly/sdk/collections/IterableAsyncQueue.java @@ -37,14 +37,18 @@ class IterableAsyncQueue { * @param item the item to add (maybe null) */ public void put(T item) { + CompletableFuture pendingFuture = null; synchronized (lock) { CompletableFuture nextFuture = pendingFutures.pollFirst(); if(nextFuture != null) { - nextFuture.complete(item); + pendingFuture = nextFuture; + } else { + queue.addLast(item); return; } - queue.addLast(item); } + // Execute callback outside the lock. + pendingFuture.complete(item); } /** * Retrieves and removes an item from the queue, returning a future that completes with the item.