diff --git a/src/MADE.Threading/AdaptiveSemaphore.cs b/src/MADE.Threading/AdaptiveSemaphore.cs new file mode 100644 index 00000000..42ba8386 --- /dev/null +++ b/src/MADE.Threading/AdaptiveSemaphore.cs @@ -0,0 +1,208 @@ +// MADE Apps licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace MADE.Threading; + +/// +/// Defines a semaphore that allows adjusting the concurrency limit at runtime to respond to backpressure. +/// +/// +/// This is useful for scenarios where the permitted concurrency should change dynamically, +/// such as reducing parallelism when a downstream service returns rate-limit (429) responses +/// or increasing it when conditions improve. +/// +/// var semaphore = new AdaptiveSemaphore(initial: 10, minimum: 1, maximum: 20); +/// +/// // Normal usage - acquire and release a permit. +/// using (await semaphore.WaitAsync()) +/// { +/// await httpClient.SendAsync(request); +/// } +/// +/// // Reduce concurrency on backpressure. +/// await semaphore.TryShrinkAsync(); +/// +/// +public sealed class AdaptiveSemaphore : IDisposable +{ + private readonly int minimum; + private readonly int? maximum; + private readonly SemaphoreSlim semaphore; + private readonly object adjustLock = new(); + + private int limit; + private bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// The initial number of concurrent permits. + /// The minimum concurrency limit. Defaults to 1. + /// The optional maximum concurrency limit. + /// Thrown when is less than 1, or is less than . + public AdaptiveSemaphore(int initial, int minimum = 1, int? maximum = null) + { + ArgumentOutOfRangeException.ThrowIfLessThan(minimum, 1); + + if (maximum.HasValue) + { + ArgumentOutOfRangeException.ThrowIfLessThan(maximum.Value, minimum, nameof(maximum)); + } + + this.minimum = minimum; + this.maximum = maximum; + + var bounded = Math.Max(minimum, initial); + if (maximum.HasValue) + { + bounded = Math.Min(bounded, maximum.Value); + } + + this.limit = bounded; + this.semaphore = new SemaphoreSlim(bounded); + } + + /// + /// Gets the current concurrency limit. + /// + public int Limit => this.limit; + + /// + /// Gets the number of permits currently available. + /// + public int Available => this.semaphore.CurrentCount; + + /// + /// Reduces the concurrency limit by one by permanently acquiring a permit. + /// + /// + /// If the limit is already at the , this method does nothing. + /// The acquired permit is not released, effectively reducing the pool of available permits. + /// + /// The cancellation token. + /// The new concurrency limit. + /// Thrown if the semaphore has been disposed. + public async Task TryShrinkAsync(CancellationToken cancellationToken = default) + { + ObjectDisposedException.ThrowIf(this.disposed, this); + + lock (this.adjustLock) + { + if (this.limit <= this.minimum) + { + return this.limit; + } + + this.limit--; + } + + try + { + await this.semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + } + catch + { + lock (this.adjustLock) + { + this.limit++; + } + + throw; + } + + return this.limit; + } + + /// + /// Increases the concurrency limit by one by releasing an additional permit. + /// + /// + /// If the limit is already at the , this method does nothing. + /// This is the inverse of and should be called when + /// conditions improve and more concurrency is desirable. + /// + /// The new concurrency limit. + /// Thrown if the semaphore has been disposed. + public int TryGrow() + { + ObjectDisposedException.ThrowIf(this.disposed, this); + + lock (this.adjustLock) + { + if (this.maximum.HasValue && this.limit >= this.maximum.Value) + { + return this.limit; + } + + this.limit++; + + try + { + this.semaphore.Release(); + } + catch + { + this.limit--; + throw; + } + + return this.limit; + } + } + + /// + /// Asynchronously waits to acquire a permit from the semaphore. + /// + /// The cancellation token. + /// An that releases the permit when disposed. + /// Thrown if the semaphore has been disposed. + public async Task WaitAsync(CancellationToken cancellationToken = default) + { + ObjectDisposedException.ThrowIf(this.disposed, this); + await this.semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + return new SemaphoreReleaser(this.semaphore); + } + + /// + /// Synchronously waits to acquire a permit from the semaphore. + /// + /// The cancellation token. + /// An that releases the permit when disposed. + /// Thrown if the semaphore has been disposed. + public IDisposable Wait(CancellationToken cancellationToken = default) + { + ObjectDisposedException.ThrowIf(this.disposed, this); + this.semaphore.Wait(cancellationToken); + return new SemaphoreReleaser(this.semaphore); + } + + /// + public void Dispose() + { + lock (this.adjustLock) + { + if (this.disposed) + { + return; + } + + this.semaphore.Dispose(); + this.disposed = true; + } + } + + private sealed class SemaphoreReleaser : IDisposable + { + private SemaphoreSlim? semaphore; + + public SemaphoreReleaser(SemaphoreSlim semaphore) + { + this.semaphore = semaphore; + } + + public void Dispose() + { + Interlocked.Exchange(ref this.semaphore, null)?.Release(); + } + } +} diff --git a/tests/MADE.Threading.Tests/Tests/AdaptiveSemaphoreTests.cs b/tests/MADE.Threading.Tests/Tests/AdaptiveSemaphoreTests.cs new file mode 100644 index 00000000..462ce99b --- /dev/null +++ b/tests/MADE.Threading.Tests/Tests/AdaptiveSemaphoreTests.cs @@ -0,0 +1,335 @@ +using System.Diagnostics.CodeAnalysis; +using NUnit.Framework; +using Shouldly; + +namespace MADE.Threading.Tests.Tests; + +[ExcludeFromCodeCoverage] +[TestFixture] +public class AdaptiveSemaphoreTests +{ + public class WhenConstructing + { + [Test] + public void ShouldClampInitialToMinimum() + { + // Arrange & Act + using var semaphore = new AdaptiveSemaphore(initial: 0, minimum: 3); + + // Assert + semaphore.Limit.ShouldBe(3); + semaphore.Available.ShouldBe(3); + } + + [Test] + public void ShouldClampInitialToMaximum() + { + // Arrange & Act + using var semaphore = new AdaptiveSemaphore(initial: 20, minimum: 1, maximum: 5); + + // Assert + semaphore.Limit.ShouldBe(5); + semaphore.Available.ShouldBe(5); + } + + [Test] + public void ShouldThrowWhenMinimumIsLessThanOne() + { + Should.Throw(() => new AdaptiveSemaphore(initial: 1, minimum: 0)); + } + + [Test] + public void ShouldThrowWhenMaximumIsLessThanMinimum() + { + Should.Throw(() => new AdaptiveSemaphore(initial: 5, minimum: 3, maximum: 2)); + } + } + + public class WhenAcquiringAndReleasing + { + [Test] + public async Task ShouldAcquireAndReleasePermit() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 2); + + // Act + using (await semaphore.WaitAsync()) + { + semaphore.Available.ShouldBe(1); + } + + // Assert + semaphore.Available.ShouldBe(2); + } + + [Test] + public async Task ShouldLimitConcurrency() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 2); + int concurrent = 0; + int maxConcurrent = 0; + + // Act + var tasks = Enumerable.Range(0, 10).Select(async _ => + { + using (await semaphore.WaitAsync()) + { + var current = Interlocked.Increment(ref concurrent); + InterlockedMax(ref maxConcurrent, current); + await Task.Delay(50); + Interlocked.Decrement(ref concurrent); + } + }); + + await Task.WhenAll(tasks); + + // Assert + maxConcurrent.ShouldBeLessThanOrEqualTo(2); + } + + [Test] + public void ShouldAcquireAndReleaseSynchronously() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 2); + + // Act + using (semaphore.Wait()) + { + semaphore.Available.ShouldBe(1); + } + + // Assert + semaphore.Available.ShouldBe(2); + } + } + + public class WhenShrinking + { + [Test] + public async Task ShouldReduceLimitByOne() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 5, minimum: 1); + + // Act + int newLimit = await semaphore.TryShrinkAsync(); + + // Assert + newLimit.ShouldBe(4); + semaphore.Limit.ShouldBe(4); + semaphore.Available.ShouldBe(4); + } + + [Test] + public async Task ShouldNotShrinkBelowMinimum() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 2, minimum: 2); + + // Act + int newLimit = await semaphore.TryShrinkAsync(); + + // Assert + newLimit.ShouldBe(2); + semaphore.Limit.ShouldBe(2); + } + + [Test] + public async Task ShouldReduceAvailablePermits() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 3, minimum: 1); + + // Act + await semaphore.TryShrinkAsync(); + await semaphore.TryShrinkAsync(); + + // Assert + semaphore.Limit.ShouldBe(1); + semaphore.Available.ShouldBe(1); + } + + [Test] + public async Task ShouldRollBackLimitWhenCancelled() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 1, minimum: 1, maximum: 2); + semaphore.TryGrow(); // limit = 2, available = 2 + + // Exhaust both permits so the next WaitAsync will block. + using var hold1 = await semaphore.WaitAsync(); + using var hold2 = await semaphore.WaitAsync(); + + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + // Act - shrink should decrement limit then fail to acquire, rolling back. + await Should.ThrowAsync( + async () => await semaphore.TryShrinkAsync(cts.Token)); + + // Assert - limit should be restored to 2. + semaphore.Limit.ShouldBe(2); + } + } + + public class WhenGrowing + { + [Test] + public void ShouldIncreaseLimitByOne() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 3, minimum: 1, maximum: 10); + + // Act + int newLimit = semaphore.TryGrow(); + + // Assert + newLimit.ShouldBe(4); + semaphore.Limit.ShouldBe(4); + semaphore.Available.ShouldBe(4); + } + + [Test] + public void ShouldNotGrowAboveMaximum() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 5, minimum: 1, maximum: 5); + + // Act + int newLimit = semaphore.TryGrow(); + + // Assert + newLimit.ShouldBe(5); + semaphore.Limit.ShouldBe(5); + } + + [Test] + public void ShouldGrowWithoutMaximum() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 3, minimum: 1); + + // Act + int newLimit = semaphore.TryGrow(); + + // Assert + newLimit.ShouldBe(4); + semaphore.Available.ShouldBe(4); + } + } + + public class WhenShrinkingAndGrowing + { + [Test] + public async Task ShouldRestoreLimitAfterShrinkAndGrow() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 5, minimum: 1, maximum: 10); + + // Act + await semaphore.TryShrinkAsync(); + await semaphore.TryShrinkAsync(); + semaphore.Limit.ShouldBe(3); + + semaphore.TryGrow(); + semaphore.TryGrow(); + + // Assert + semaphore.Limit.ShouldBe(5); + semaphore.Available.ShouldBe(5); + } + + [Test] + public async Task ShouldReduceEffectiveConcurrency() + { + // Arrange + using var semaphore = new AdaptiveSemaphore(initial: 4, minimum: 1); + int concurrent = 0; + int maxConcurrent = 0; + + // Act - shrink to 2 + await semaphore.TryShrinkAsync(); + await semaphore.TryShrinkAsync(); + + var tasks = Enumerable.Range(0, 10).Select(async _ => + { + using (await semaphore.WaitAsync()) + { + var current = Interlocked.Increment(ref concurrent); + InterlockedMax(ref maxConcurrent, current); + await Task.Delay(50); + Interlocked.Decrement(ref concurrent); + } + }); + + await Task.WhenAll(tasks); + + // Assert + maxConcurrent.ShouldBeLessThanOrEqualTo(2); + } + } + + public class WhenDisposed + { + [Test] + public void ShouldThrowOnWaitAfterDispose() + { + // Arrange + var semaphore = new AdaptiveSemaphore(initial: 1); + semaphore.Dispose(); + + // Act & Assert + Should.Throw(() => semaphore.Wait()); + } + + [Test] + public async Task ShouldThrowOnWaitAsyncAfterDispose() + { + // Arrange + var semaphore = new AdaptiveSemaphore(initial: 1); + semaphore.Dispose(); + + // Act & Assert + await Should.ThrowAsync(async () => await semaphore.WaitAsync()); + } + + [Test] + public async Task ShouldThrowOnShrinkAfterDispose() + { + // Arrange + var semaphore = new AdaptiveSemaphore(initial: 5, minimum: 1); + semaphore.Dispose(); + + // Act & Assert + await Should.ThrowAsync(async () => await semaphore.TryShrinkAsync()); + } + + [Test] + public void ShouldThrowOnGrowAfterDispose() + { + // Arrange + var semaphore = new AdaptiveSemaphore(initial: 5, minimum: 1, maximum: 10); + semaphore.Dispose(); + + // Act & Assert + Should.Throw(() => semaphore.TryGrow()); + } + } + + private static void InterlockedMax(ref int location, int value) + { + int current; + do + { + current = location; + if (value <= current) + { + return; + } + } + while (Interlocked.CompareExchange(ref location, value, current) != current); + } +}