From be14a38c529028d0cba804d7b8fc1a762000e846 Mon Sep 17 00:00:00 2001 From: James Croft Date: Fri, 15 May 2026 16:15:37 +0100 Subject: [PATCH 1/2] feat(threading): add AdaptiveSemaphore for dynamic concurrency control Introduces AdaptiveSemaphore, an async-compatible semaphore that allows adjusting the concurrency limit at runtime to respond to backpressure. - WaitAsync/Wait with IDisposable releaser for using pattern - TryShrinkAsync to reduce concurrency by permanently acquiring a permit - TryGrow to increase concurrency by releasing an additional permit - Configurable minimum and optional maximum bounds - Constructor validation and clamping of initial value - Full test coverage across construction, acquire/release, shrink, grow, and disposal scenarios --- src/MADE.Threading/AdaptiveSemaphore.cs | 182 ++++++++++ .../Tests/AdaptiveSemaphoreTests.cs | 313 ++++++++++++++++++ 2 files changed, 495 insertions(+) create mode 100644 src/MADE.Threading/AdaptiveSemaphore.cs create mode 100644 tests/MADE.Threading.Tests/Tests/AdaptiveSemaphoreTests.cs diff --git a/src/MADE.Threading/AdaptiveSemaphore.cs b/src/MADE.Threading/AdaptiveSemaphore.cs new file mode 100644 index 00000000..d0e3d000 --- /dev/null +++ b/src/MADE.Threading/AdaptiveSemaphore.cs @@ -0,0 +1,182 @@ +// MADE Apps licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace MADE.Threading; + +/// +/// Defines a semaphore that allows adjusting the concurrency limit at runtime to respond to backpressure. +/// +/// +/// This is useful for scenarios where the permitted concurrency should change dynamically, +/// such as reducing parallelism when a downstream service returns rate-limit (429) responses +/// or increasing it when conditions improve. +/// +/// var semaphore = new AdaptiveSemaphore(initial: 10, minimum: 1, maximum: 20); +/// +/// // Normal usage - acquire and release a permit. +/// using (await semaphore.WaitAsync()) +/// { +/// await httpClient.SendAsync(request); +/// } +/// +/// // Reduce concurrency on backpressure. +/// semaphore.TryShrink(); +/// +/// +public sealed class AdaptiveSemaphore : IDisposable +{ + private readonly int minimum; + private readonly int? maximum; + private readonly SemaphoreSlim semaphore; + private readonly object adjustLock = new(); + + private int limit; + private bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// The initial number of concurrent permits. + /// The minimum concurrency limit. Defaults to 1. + /// The optional maximum concurrency limit. + /// Thrown when is less than 1, or is less than . + public AdaptiveSemaphore(int initial, int minimum = 1, int? maximum = null) + { + ArgumentOutOfRangeException.ThrowIfLessThan(minimum, 1); + + if (maximum.HasValue) + { + ArgumentOutOfRangeException.ThrowIfLessThan(maximum.Value, minimum, nameof(maximum)); + } + + this.minimum = minimum; + this.maximum = maximum; + + var bounded = Math.Max(minimum, initial); + if (maximum.HasValue) + { + bounded = Math.Min(bounded, maximum.Value); + } + + this.limit = bounded; + this.semaphore = new SemaphoreSlim(bounded); + } + + /// + /// Gets the current concurrency limit. + /// + public int Limit => this.limit; + + /// + /// Gets the number of permits currently available. + /// + public int Available => this.semaphore.CurrentCount; + + /// + /// Reduces the concurrency limit by one by permanently acquiring a permit. + /// + /// + /// If the limit is already at the , this method does nothing. + /// The acquired permit is not released, effectively reducing the pool of available permits. + /// + /// The cancellation token. + /// The new concurrency limit. + /// Thrown if the semaphore has been disposed. + public async Task TryShrinkAsync(CancellationToken cancellationToken = default) + { + ObjectDisposedException.ThrowIf(this.disposed, this); + + lock (this.adjustLock) + { + if (this.limit <= this.minimum) + { + return this.limit; + } + + this.limit--; + } + + await this.semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + return this.limit; + } + + /// + /// Increases the concurrency limit by one by releasing an additional permit. + /// + /// + /// If the limit is already at the , this method does nothing. + /// This is the inverse of and should be called when + /// conditions improve and more concurrency is desirable. + /// + /// The new concurrency limit. + /// Thrown if the semaphore has been disposed. + public int TryGrow() + { + ObjectDisposedException.ThrowIf(this.disposed, this); + + lock (this.adjustLock) + { + if (this.maximum.HasValue && this.limit >= this.maximum.Value) + { + return this.limit; + } + + this.limit++; + this.semaphore.Release(); + return this.limit; + } + } + + /// + /// Asynchronously waits to acquire a permit from the semaphore. + /// + /// The cancellation token. + /// An that releases the permit when disposed. + /// Thrown if the semaphore has been disposed. + public async Task WaitAsync(CancellationToken cancellationToken = default) + { + ObjectDisposedException.ThrowIf(this.disposed, this); + await this.semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + return new SemaphoreReleaser(this.semaphore); + } + + /// + /// Synchronously waits to acquire a permit from the semaphore. + /// + /// The cancellation token. + /// An that releases the permit when disposed. + /// Thrown if the semaphore has been disposed. + public IDisposable Wait(CancellationToken cancellationToken = default) + { + ObjectDisposedException.ThrowIf(this.disposed, this); + this.semaphore.Wait(cancellationToken); + return new SemaphoreReleaser(this.semaphore); + } + + /// + public void Dispose() + { + if (this.disposed) + { + return; + } + + this.semaphore.Dispose(); + this.disposed = true; + } + + private sealed class SemaphoreReleaser : IDisposable + { + private SemaphoreSlim? semaphore; + + public SemaphoreReleaser(SemaphoreSlim semaphore) + { + this.semaphore = semaphore; + } + + public void Dispose() + { + Interlocked.Exchange(ref this.semaphore, null)?.Release(); + } + } +} diff --git a/tests/MADE.Threading.Tests/Tests/AdaptiveSemaphoreTests.cs b/tests/MADE.Threading.Tests/Tests/AdaptiveSemaphoreTests.cs new file mode 100644 index 00000000..a7095e63 --- /dev/null +++ b/tests/MADE.Threading.Tests/Tests/AdaptiveSemaphoreTests.cs @@ -0,0 +1,313 @@ +using System.Diagnostics.CodeAnalysis; +using NUnit.Framework; +using Shouldly; + +namespace MADE.Threading.Tests.Tests; + +[ExcludeFromCodeCoverage] +[TestFixture] +public class AdaptiveSemaphoreTests +{ + public class WhenConstructing + { + [Test] + public void ShouldClampInitialToMinimum() + { + // Arrange & Act + using var semaphore = new AdaptiveSemaphore(initial: 0, minimum: 3); + + // Assert + semaphore.Limit.ShouldBe(3); + semaphore.Available.ShouldBe(3); + } + + [Test] + public void ShouldClampInitialToMaximum() + { + // Arrange & Act + using var semaphore = new AdaptiveSemaphore(initial: 20, minimum: 1, maximum: 5); + + // Assert + semaphore.Limit.ShouldBe(5); + semaphore.Available.ShouldBe(5); + } + + [Test] + public void ShouldThrowWhenMinimumIsLessThanOne() + { + Should.Throw(() => new AdaptiveSemaphore(initial: 1, minimum: 0)); + } + + [Test] + public void ShouldThrowWhenMaximumIsLessThanMinimum() + { + Should.Throw(() => new AdaptiveSemaphore(initial: 5, minimum: 3, maximum: 2)); + } + } + + public class WhenAcquiringAndReleasing + { + [Test] + public async Task ShouldAcquireAndReleasePermit() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 2); + + // Act + using (await semaphore.WaitAsync()) + { + semaphore.Available.ShouldBe(1); + } + + // Assert + semaphore.Available.ShouldBe(2); + } + + [Test] + public async Task ShouldLimitConcurrency() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 2); + int concurrent = 0; + int maxConcurrent = 0; + + // Act + var tasks = Enumerable.Range(0, 10).Select(async _ => + { + using (await semaphore.WaitAsync()) + { + var current = Interlocked.Increment(ref concurrent); + InterlockedMax(ref maxConcurrent, current); + await Task.Delay(50); + Interlocked.Decrement(ref concurrent); + } + }); + + await Task.WhenAll(tasks); + + // Assert + maxConcurrent.ShouldBeLessThanOrEqualTo(2); + } + + [Test] + public void ShouldAcquireAndReleaseSynchronously() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 2); + + // Act + using (semaphore.Wait()) + { + semaphore.Available.ShouldBe(1); + } + + // Assert + semaphore.Available.ShouldBe(2); + } + } + + public class WhenShrinking + { + [Test] + public async Task ShouldReduceLimitByOne() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 5, minimum: 1); + + // Act + int newLimit = await semaphore.TryShrinkAsync(); + + // Assert + newLimit.ShouldBe(4); + semaphore.Limit.ShouldBe(4); + semaphore.Available.ShouldBe(4); + } + + [Test] + public async Task ShouldNotShrinkBelowMinimum() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 2, minimum: 2); + + // Act + int newLimit = await semaphore.TryShrinkAsync(); + + // Assert + newLimit.ShouldBe(2); + semaphore.Limit.ShouldBe(2); + } + + [Test] + public async Task ShouldReduceAvailablePermits() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 3, minimum: 1); + + // Act + await semaphore.TryShrinkAsync(); + await semaphore.TryShrinkAsync(); + + // Assert + semaphore.Limit.ShouldBe(1); + semaphore.Available.ShouldBe(1); + } + } + + public class WhenGrowing + { + [Test] + public void ShouldIncreaseLimitByOne() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 3, minimum: 1, maximum: 10); + + // Act + int newLimit = semaphore.TryGrow(); + + // Assert + newLimit.ShouldBe(4); + semaphore.Limit.ShouldBe(4); + semaphore.Available.ShouldBe(4); + } + + [Test] + public void ShouldNotGrowAboveMaximum() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 5, minimum: 1, maximum: 5); + + // Act + int newLimit = semaphore.TryGrow(); + + // Assert + newLimit.ShouldBe(5); + semaphore.Limit.ShouldBe(5); + } + + [Test] + public void ShouldGrowWithoutMaximum() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 3, minimum: 1); + + // Act + int newLimit = semaphore.TryGrow(); + + // Assert + newLimit.ShouldBe(4); + semaphore.Available.ShouldBe(4); + } + } + + public class WhenShrinkingAndGrowing + { + [Test] + public async Task ShouldRestoreLimitAfterShrinkAndGrow() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 5, minimum: 1, maximum: 10); + + // Act + await semaphore.TryShrinkAsync(); + await semaphore.TryShrinkAsync(); + semaphore.Limit.ShouldBe(3); + + semaphore.TryGrow(); + semaphore.TryGrow(); + + // Assert + semaphore.Limit.ShouldBe(5); + semaphore.Available.ShouldBe(5); + } + + [Test] + public async Task ShouldReduceEffectiveConcurrency() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 4, minimum: 1); + int concurrent = 0; + int maxConcurrent = 0; + + // Act - shrink to 2 + await semaphore.TryShrinkAsync(); + await semaphore.TryShrinkAsync(); + + var tasks = Enumerable.Range(0, 10).Select(async _ => + { + using (await semaphore.WaitAsync()) + { + var current = Interlocked.Increment(ref concurrent); + InterlockedMax(ref maxConcurrent, current); + await Task.Delay(50); + Interlocked.Decrement(ref concurrent); + } + }); + + await Task.WhenAll(tasks); + + // Assert + maxConcurrent.ShouldBeLessThanOrEqualTo(2); + } + } + + public class WhenDisposed + { + [Test] + public void ShouldThrowOnWaitAfterDispose() + { + // Arrange + var semaphore = new AdaptiveSemaphore(initial: 1); + semaphore.Dispose(); + + // Act & Assert + Should.Throw(() => semaphore.Wait()); + } + + [Test] + public async Task ShouldThrowOnWaitAsyncAfterDispose() + { + // Arrange + var semaphore = new AdaptiveSemaphore(initial: 1); + semaphore.Dispose(); + + // Act & Assert + await Should.ThrowAsync(async () => await semaphore.WaitAsync()); + } + + [Test] + public async Task ShouldThrowOnShrinkAfterDispose() + { + // Arrange + var semaphore = new AdaptiveSemaphore(initial: 5, minimum: 1); + semaphore.Dispose(); + + // Act & Assert + await Should.ThrowAsync(async () => await semaphore.TryShrinkAsync()); + } + + [Test] + public void ShouldThrowOnGrowAfterDispose() + { + // Arrange + var semaphore = new AdaptiveSemaphore(initial: 5, minimum: 1, maximum: 10); + semaphore.Dispose(); + + // Act & Assert + Should.Throw(() => semaphore.TryGrow()); + } + } + + private static void InterlockedMax(ref int location, int value) + { + int current; + do + { + current = location; + if (value <= current) + { + return; + } + } + while (Interlocked.CompareExchange(ref location, value, current) != current); + } +} From d28efebf52da7d1c15f09a8aaeaf5d6fc0a845fc Mon Sep 17 00:00:00 2001 From: James Croft Date: Fri, 15 May 2026 16:27:48 +0100 Subject: [PATCH 2/2] test: add rollback limit test for AdaptiveSemaphore on cancellation --- src/MADE.Threading/AdaptiveSemaphore.cs | 42 +++++++++++++++---- .../Tests/AdaptiveSemaphoreTests.cs | 22 ++++++++++ 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/src/MADE.Threading/AdaptiveSemaphore.cs b/src/MADE.Threading/AdaptiveSemaphore.cs index d0e3d000..42ba8386 100644 --- a/src/MADE.Threading/AdaptiveSemaphore.cs +++ b/src/MADE.Threading/AdaptiveSemaphore.cs @@ -20,7 +20,7 @@ namespace MADE.Threading; /// } /// /// // Reduce concurrency on backpressure. -/// semaphore.TryShrink(); +/// await semaphore.TryShrinkAsync(); /// /// public sealed class AdaptiveSemaphore : IDisposable @@ -96,7 +96,20 @@ public async Task TryShrinkAsync(CancellationToken cancellationToken = defa this.limit--; } - await this.semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + await this.semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + } + catch + { + lock (this.adjustLock) + { + this.limit++; + } + + throw; + } + return this.limit; } @@ -122,7 +135,17 @@ public int TryGrow() } this.limit++; - this.semaphore.Release(); + + try + { + this.semaphore.Release(); + } + catch + { + this.limit--; + throw; + } + return this.limit; } } @@ -156,13 +179,16 @@ public IDisposable Wait(CancellationToken cancellationToken = default) /// public void Dispose() { - if (this.disposed) + lock (this.adjustLock) { - return; - } + if (this.disposed) + { + return; + } - this.semaphore.Dispose(); - this.disposed = true; + this.semaphore.Dispose(); + this.disposed = true; + } } private sealed class SemaphoreReleaser : IDisposable diff --git a/tests/MADE.Threading.Tests/Tests/AdaptiveSemaphoreTests.cs b/tests/MADE.Threading.Tests/Tests/AdaptiveSemaphoreTests.cs index a7095e63..462ce99b 100644 --- a/tests/MADE.Threading.Tests/Tests/AdaptiveSemaphoreTests.cs +++ b/tests/MADE.Threading.Tests/Tests/AdaptiveSemaphoreTests.cs @@ -151,6 +151,28 @@ public async Task ShouldReduceAvailablePermits() semaphore.Limit.ShouldBe(1); semaphore.Available.ShouldBe(1); } + + [Test] + public async Task ShouldRollBackLimitWhenCancelled() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 1, minimum: 1, maximum: 2); + semaphore.TryGrow(); // limit = 2, available = 2 + + // Exhaust both permits so the next WaitAsync will block. + using var hold1 = await semaphore.WaitAsync(); + using var hold2 = await semaphore.WaitAsync(); + + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + // Act - shrink should decrement limit then fail to acquire, rolling back. + await Should.ThrowAsync( + async () => await semaphore.TryShrinkAsync(cts.Token)); + + // Assert - limit should be restored to 2. + semaphore.Limit.ShouldBe(2); + } } public class WhenGrowing