diff --git a/.autover/changes/45c6fa93-45cc-402a-869c-41dbcec2c799.json b/.autover/changes/45c6fa93-45cc-402a-869c-41dbcec2c799.json new file mode 100644 index 000000000..ce6c89fb6 --- /dev/null +++ b/.autover/changes/45c6fa93-45cc-402a-869c-41dbcec2c799.json @@ -0,0 +1,11 @@ +{ + "Projects": [ + { + "Name": "Amazon.Lambda.RuntimeSupport", + "Type": "Patch", + "ChangelogMessages": [ + "Fix thread pool starvation under multi-concurrency" + ] + } + ] +} \ No newline at end of file diff --git a/Libraries/src/Amazon.Lambda.RuntimeSupport/Bootstrap/LambdaBootstrap.cs b/Libraries/src/Amazon.Lambda.RuntimeSupport/Bootstrap/LambdaBootstrap.cs index 130ddf1d4..5f32158d6 100644 --- a/Libraries/src/Amazon.Lambda.RuntimeSupport/Bootstrap/LambdaBootstrap.cs +++ b/Libraries/src/Amazon.Lambda.RuntimeSupport/Bootstrap/LambdaBootstrap.cs @@ -218,6 +218,7 @@ internal LambdaBootstrap(HttpClient httpClient, LambdaBootstrapHandler handler, public async Task RunAsync(CancellationToken cancellationToken = default(CancellationToken)) { AdjustMemorySettings(); + AdjustThreadPoolSettings(); if (_configuration.IsCallPreJit) { @@ -629,6 +630,48 @@ private void AdjustMemorySettings() } #region IDisposable Support + + /// + /// When running in multi-concurrency mode, pre-size the .NET ThreadPool to ensure there are enough + /// threads available for both handler execution and polling task continuations. Without this, + /// blocking handlers (Thread.Sleep, .Result, .Wait()) can exhaust the ThreadPool, preventing + /// polling tasks from cycling back to /next and causing Runtime.Unavailable errors from RAPID. + /// + private void AdjustThreadPoolSettings() + { + try + { + var maxConcurrency = Utils.GetMaxConcurrency(_environmentVariables); + if (maxConcurrency <= 0) + return; + + // Compute the minimum threads needed: enough for all concurrent handlers plus + // overhead for polling task continuations and other runtime work. + var desiredMinThreads = maxConcurrency + Environment.ProcessorCount; + + ThreadPool.GetMinThreads(out int currentMinWorker, out int currentMinIO); + + // Only increase, never decrease — respect any higher value already set + // (e.g., by the customer in their code). + if (currentMinWorker >= desiredMinThreads) + return; + + var success = ThreadPool.SetMinThreads(desiredMinThreads, currentMinIO); + if (success) + { + _logger.LogInformation($"Adjusted ThreadPool minimum worker threads from {currentMinWorker} to {desiredMinThreads} for multi-concurrency mode (max concurrency: {maxConcurrency})."); + } + else + { + _logger.LogError(null, $"Failed to set ThreadPool minimum worker threads to {desiredMinThreads}."); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to adjust ThreadPool settings for multi-concurrency mode."); + } + } + private bool disposedValue = false; // To detect redundant calls /// diff --git a/Libraries/src/Amazon.Lambda.RuntimeSupport/Helpers/Utils.cs b/Libraries/src/Amazon.Lambda.RuntimeSupport/Helpers/Utils.cs index c0472d065..1b99448b8 100644 --- a/Libraries/src/Amazon.Lambda.RuntimeSupport/Helpers/Utils.cs +++ b/Libraries/src/Amazon.Lambda.RuntimeSupport/Helpers/Utils.cs @@ -56,13 +56,33 @@ internal static int DetermineProcessingTaskCount(IEnvironmentVariables environme } else { - processingTaskCount = Math.Max(2, processorCount); + // Use the max concurrency value as the default polling task count so there are + // enough polling tasks to fill all available concurrency slots. Fall back to + // the processor-based heuristic if the value cannot be parsed. + var maxConcurrency = GetMaxConcurrency(environmentVariables); + processingTaskCount = maxConcurrency > 0 + ? maxConcurrency + : Math.Max(2, processorCount); } } return processingTaskCount; } + /// + /// Parses the AWS_LAMBDA_MAX_CONCURRENCY environment variable as an integer. + /// Returns the parsed value if valid and greater than 0, otherwise returns 0. + /// + internal static int GetMaxConcurrency(IEnvironmentVariables environmentVariables) + { + var value = environmentVariables.GetEnvironmentVariable(Constants.ENVIRONMENT_VARIABLE_AWS_LAMBDA_MAX_CONCURRENCY); + if (!string.IsNullOrEmpty(value) && int.TryParse(value, out var maxConcurrency) && maxConcurrency > 0) + { + return maxConcurrency; + } + return 0; + } + /// /// Create an Action callback that can be used for setting the trace id on the AWS SDK for .NET if the SDK is present. /// If the AWS .NET SDK is not found then null is returned. diff --git a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/LambdaBootstrapMultiConcurrencyTests.cs b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/LambdaBootstrapMultiConcurrencyTests.cs index 2bee73286..3eb4bbe6e 100644 --- a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/LambdaBootstrapMultiConcurrencyTests.cs +++ b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/LambdaBootstrapMultiConcurrencyTests.cs @@ -112,6 +112,93 @@ public async Task ConfirmConcurrentInvocations(bool useAsyncHandler) Assert.Equal("end-request1,traceId-trace1", handlerEvents[3]); } + /// + /// Bug condition exploration test: Demonstrates thread pool starvation when MC is enabled + /// with blocking handlers. This test encodes the EXPECTED behavior after the fix is applied. + /// On unfixed code, this test is EXPECTED TO FAIL with a timeout because: + /// - Only 2 polling tasks are created (Math.Max(2, processorCount)) + /// - ThreadPool.MinThreads is constrained to 2 worker threads + /// - 10 blocking handlers (Thread.Sleep) exhaust the thread pool + /// - Polling task continuations cannot resume to call GetNextInvocationAsync + /// - Not all 10 invocations get dequeued within the timeout + /// + /// Validates: Requirements 1.1, 1.2, 1.3, 1.4 + /// + [Fact] + public async Task ThreadPoolStarvation_BlockingHandlers_AllInvocationsDequeued() + { + // Save original ThreadPool settings to restore after test + ThreadPool.GetMinThreads(out int originalMinWorker, out int originalMinIO); + + try + { + // Constrain ThreadPool to simulate Lambda's default environment (low thread count) + ThreadPool.SetMinThreads(2, 2); + + TestEnvironmentVariables environmentVariables = new TestEnvironmentVariables(); + environmentVariables.SetEnvironmentVariable( + Amazon.Lambda.RuntimeSupport.Bootstrap.Constants.ENVIRONMENT_VARIABLE_AWS_LAMBDA_MAX_CONCURRENCY, "10"); + + // Create 10 invocation events with blocking handlers + var invocationEvents = new TestMultiConcurrencyRuntimeApiClient.InvocationEvent[10]; + for (int i = 0; i < 10; i++) + { + invocationEvents[i] = new TestMultiConcurrencyRuntimeApiClient.InvocationEvent + { + Headers = CreateDefaultHeaders($"request{i}", $"trace{i}"), + FunctionInput = CreateFunctionInput(new SleepTimeEvent(3000, 0)) + }; + } + + var testRuntimeApiClient = new TestMultiConcurrencyRuntimeApiClient(environmentVariables, invocationEvents); + + // Use a thread-safe counter to track dequeued invocations + int dequeuedCount = 0; + var allDequeuedEvent = new ManualResetEventSlim(false); + + // Wrap the test client to track dequeue operations in a thread-safe manner + var originalGetNext = testRuntimeApiClient; + + // Handler that performs blocking work (Thread.Sleep) to exhaust the thread pool + var handler = HandlerWrapper.GetHandlerWrapper((SleepTimeEvent sleepTime, ILambdaContext context) => + { + // Blocking sleep to simulate CPU-bound or synchronous I/O work + Thread.Sleep(sleepTime.StartSleep); + }, _serializer).Handler; + + var lambdaBootstrap = new LambdaBootstrap( + httpClient: null, + handler: handler, + initializer: null, + ownsHttpClient: true, + environmentVariables: environmentVariables); + lambdaBootstrap.Client = testRuntimeApiClient; + + // Run with a 10-second timeout - if all 10 invocations are dequeued, the test passes + CancellationTokenSource cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromSeconds(10)); + + try + { + await lambdaBootstrap.RunAsync(cts.Token); + } + catch (OperationCanceledException) + { + // Expected when the cancellation token is triggered (timeout) + } + + // Assert that all 10 invocations were dequeued from the test client within the timeout. + // On unfixed code, this will fail because thread pool starvation prevents polling tasks + // from cycling back to GetNextInvocationAsync. + Assert.Equal(10, testRuntimeApiClient.ProcessInvocationEvents.Count); + } + finally + { + // Restore original ThreadPool settings + ThreadPool.SetMinThreads(originalMinWorker, originalMinIO); + } + } + private Dictionary> CreateDefaultHeaders(string requestId, string traceId) { return new Dictionary> diff --git a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/LambdaBootstrapMultiConcurrencyThreadPoolStarvationTests.cs b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/LambdaBootstrapMultiConcurrencyThreadPoolStarvationTests.cs new file mode 100644 index 000000000..1e64c75c8 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/LambdaBootstrapMultiConcurrencyThreadPoolStarvationTests.cs @@ -0,0 +1,249 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Amazon.Lambda.Core; +using Amazon.Lambda.RuntimeSupport.UnitTests.TestHelpers; +using Amazon.Lambda.Serialization.Json; +using Xunit; + +namespace Amazon.Lambda.RuntimeSupport.UnitTests +{ + /// + /// Tests for multi-concurrency thread pool starvation fix. + /// + /// The fix ensures that when AWS_LAMBDA_MAX_CONCURRENCY is set: + /// 1. The polling task count matches the MC value (not just Math.Max(2, processorCount)) + /// 2. ThreadPool.SetMinThreads is called to pre-size the pool for handler threads + polling continuations + /// + /// Without both changes, blocking handlers (Thread.Sleep, .Result, .Wait()) exhaust the + /// ThreadPool, preventing polling tasks from cycling back to /next. + /// + public class LambdaBootstrapMultiConcurrencyThreadPoolStarvationTests + { + private readonly JsonSerializer _serializer = new JsonSerializer(); + + /// + /// Verifies the fix works: with AdjustThreadPoolSettings pre-sizing the pool + /// and DetermineProcessingTaskCount using the MC value, all invocations are + /// dequeued promptly even with blocking handlers. + /// + /// This simulates the real Lambda environment where MaxThreads is not capped + /// but MinThreads starts low. The fix raises MinThreads so threads are + /// immediately available for both handlers and polling continuations. + /// + [Fact] + public async Task MultiConcurrency_BlockingHandlers_AllInvocationsDequeued() + { + const int mcCount = 10; + const int invocationCount = 10; + const int handlerBlockTimeMs = 3000; + var timeout = TimeSpan.FromSeconds(5); + + var environmentVariables = new TestEnvironmentVariables(); + environmentVariables.SetEnvironmentVariable( + Amazon.Lambda.RuntimeSupport.Bootstrap.Constants.ENVIRONMENT_VARIABLE_AWS_LAMBDA_MAX_CONCURRENCY, + mcCount.ToString()); + + var invocationEvents = new TestMultiConcurrencyRuntimeApiClient.InvocationEvent[invocationCount]; + for (int i = 0; i < invocationCount; i++) + { + invocationEvents[i] = new TestMultiConcurrencyRuntimeApiClient.InvocationEvent + { + Headers = CreateDefaultHeaders($"request{i + 1}", $"trace{i + 1}"), + FunctionInput = CreateFunctionInput(new BlockingEvent { BlockTimeMs = handlerBlockTimeMs }) + }; + } + + var testRuntimeApiClient = new TestMultiConcurrencyRuntimeApiClient(environmentVariables, invocationEvents); + var startedInvocations = new ConcurrentBag(); + + var handler = HandlerWrapper.GetHandlerWrapper((BlockingEvent input, ILambdaContext context) => + { + startedInvocations.Add(context.AwsRequestId); + Thread.Sleep(input.BlockTimeMs); + }, _serializer).Handler; + + var lambdaBootstrap = new LambdaBootstrap( + httpClient: null, + handler: handler, + initializer: null, + ownsHttpClient: true, + environmentVariables: environmentVariables); + lambdaBootstrap.Client = testRuntimeApiClient; + + var cts = new CancellationTokenSource(); + cts.CancelAfter(timeout); + + try + { + await lambdaBootstrap.RunAsync(cts.Token); + } + catch (OperationCanceledException) + { + // Expected — the bootstrap runs until cancelled + } + + // With the fix: 10 polling tasks + pre-sized ThreadPool = all invocations dequeued + Assert.Equal(invocationCount, testRuntimeApiClient.ProcessInvocationEvents.Count); + Assert.Equal(invocationCount, startedInvocations.Count); + } + + /// + /// Verifies the fix works at higher concurrency (MC=20). + /// + [Fact] + public async Task MultiConcurrency_HigherConcurrency_AllInvocationsDequeued() + { + const int mcCount = 20; + const int invocationCount = 20; + const int handlerBlockTimeMs = 2000; + var timeout = TimeSpan.FromSeconds(5); + + var environmentVariables = new TestEnvironmentVariables(); + environmentVariables.SetEnvironmentVariable( + Amazon.Lambda.RuntimeSupport.Bootstrap.Constants.ENVIRONMENT_VARIABLE_AWS_LAMBDA_MAX_CONCURRENCY, + mcCount.ToString()); + + var invocationEvents = new TestMultiConcurrencyRuntimeApiClient.InvocationEvent[invocationCount]; + for (int i = 0; i < invocationCount; i++) + { + invocationEvents[i] = new TestMultiConcurrencyRuntimeApiClient.InvocationEvent + { + Headers = CreateDefaultHeaders($"request{i + 1}", $"trace{i + 1}"), + FunctionInput = CreateFunctionInput(new BlockingEvent { BlockTimeMs = handlerBlockTimeMs }) + }; + } + + var testRuntimeApiClient = new TestMultiConcurrencyRuntimeApiClient(environmentVariables, invocationEvents); + var startedInvocations = new ConcurrentBag(); + + var handler = HandlerWrapper.GetHandlerWrapper((BlockingEvent input, ILambdaContext context) => + { + startedInvocations.Add(context.AwsRequestId); + Thread.Sleep(input.BlockTimeMs); + }, _serializer).Handler; + + var lambdaBootstrap = new LambdaBootstrap( + httpClient: null, + handler: handler, + initializer: null, + ownsHttpClient: true, + environmentVariables: environmentVariables); + lambdaBootstrap.Client = testRuntimeApiClient; + + var cts = new CancellationTokenSource(); + cts.CancelAfter(timeout); + + try + { + await lambdaBootstrap.RunAsync(cts.Token); + } + catch (OperationCanceledException) + { + // Expected + } + + Assert.Equal(invocationCount, testRuntimeApiClient.ProcessInvocationEvents.Count); + Assert.Equal(invocationCount, startedInvocations.Count); + } + + /// + /// Verifies that non-numeric AWS_LAMBDA_MAX_CONCURRENCY still works + /// (falls back to Math.Max(2, processorCount) for polling tasks, no ThreadPool adjustment). + /// + [Fact] + public async Task MultiConcurrency_NonNumericMcValue_FallsBackToProcessorCount() + { + const int invocationCount = 2; + const int handlerBlockTimeMs = 500; + var timeout = TimeSpan.FromSeconds(3); + + var environmentVariables = new TestEnvironmentVariables(); + // Non-numeric value — MC is enabled but value can't be parsed + environmentVariables.SetEnvironmentVariable( + Amazon.Lambda.RuntimeSupport.Bootstrap.Constants.ENVIRONMENT_VARIABLE_AWS_LAMBDA_MAX_CONCURRENCY, + "enabled"); + + var invocationEvents = new TestMultiConcurrencyRuntimeApiClient.InvocationEvent[invocationCount]; + for (int i = 0; i < invocationCount; i++) + { + invocationEvents[i] = new TestMultiConcurrencyRuntimeApiClient.InvocationEvent + { + Headers = CreateDefaultHeaders($"request{i + 1}", $"trace{i + 1}"), + FunctionInput = CreateFunctionInput(new BlockingEvent { BlockTimeMs = handlerBlockTimeMs }) + }; + } + + var testRuntimeApiClient = new TestMultiConcurrencyRuntimeApiClient(environmentVariables, invocationEvents); + var startedInvocations = new ConcurrentBag(); + + var handler = HandlerWrapper.GetHandlerWrapper((BlockingEvent input, ILambdaContext context) => + { + startedInvocations.Add(context.AwsRequestId); + Thread.Sleep(input.BlockTimeMs); + }, _serializer).Handler; + + var lambdaBootstrap = new LambdaBootstrap( + httpClient: null, + handler: handler, + initializer: null, + ownsHttpClient: true, + environmentVariables: environmentVariables); + lambdaBootstrap.Client = testRuntimeApiClient; + + var cts = new CancellationTokenSource(); + cts.CancelAfter(timeout); + + try + { + await lambdaBootstrap.RunAsync(cts.Token); + } + catch (OperationCanceledException) + { + // Expected + } + + // Should still process both invocations (fallback behavior works) + Assert.Equal(invocationCount, testRuntimeApiClient.ProcessInvocationEvents.Count); + Assert.Equal(invocationCount, startedInvocations.Count); + } + + #region Helper Methods + + private Dictionary> CreateDefaultHeaders(string requestId, string traceId) + { + return new Dictionary> + { + { RuntimeApiHeaders.HeaderAwsRequestId, new List { requestId } }, + { RuntimeApiHeaders.HeaderInvokedFunctionArn, new List { "invoked_function_arn" } }, + { RuntimeApiHeaders.HeaderTraceId, new List { traceId } } + }; + } + + private byte[] CreateFunctionInput(object input) + { + using (var ms = new System.IO.MemoryStream()) + { + _serializer.Serialize(input, ms); + return ms.ToArray(); + } + } + + #endregion + + #region Test Models + + public class BlockingEvent + { + public int BlockTimeMs { get; set; } + } + + #endregion + } +} diff --git a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/TestHelpers/TestMultiConcurrencyRuntimeApiClient.cs b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/TestHelpers/TestMultiConcurrencyRuntimeApiClient.cs index 198c83170..754bc1536 100644 --- a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/TestHelpers/TestMultiConcurrencyRuntimeApiClient.cs +++ b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/TestHelpers/TestMultiConcurrencyRuntimeApiClient.cs @@ -3,6 +3,7 @@ using Amazon.Lambda.RuntimeSupport.Helpers; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; @@ -18,7 +19,7 @@ internal class TestMultiConcurrencyRuntimeApiClient : IRuntimeApiClient private readonly IEnvironmentVariables _environmentVariables; public Queue InvocationEvents { get; } = new Queue(); - public Dictionary ProcessInvocationEvents { get; } = new Dictionary(); + public ConcurrentDictionary ProcessInvocationEvents { get; } = new ConcurrentDictionary(); public TestMultiConcurrencyRuntimeApiClient(IEnvironmentVariables environmentVariables, params InvocationEvent[] invocationEvents) { @@ -58,15 +59,30 @@ public string AwsRequestId public async Task GetNextInvocationAsync(CancellationToken cancellationToken = default) { - // If InvocationEvents is empty then all of the test events have been processed. - // At this point we just need to wait for the test verification to run and then the - // cancellationToken will be triggered to end delay. - if (InvocationEvents.Count == 0) + InvocationEvent data; + lock (InvocationEvents) + { + // If InvocationEvents is empty then all of the test events have been processed. + // At this point we just need to wait for the test verification to run and then the + // cancellationToken will be triggered to end delay. + if (InvocationEvents.Count == 0) + { + // Release the lock before awaiting + data = null; + } + else + { + data = InvocationEvents.Dequeue(); + } + } + + if (data == null) { await Task.Delay(TimeSpan.FromMinutes(10), cancellationToken); + // This line won't be reached in normal test flow since cancellation will throw + return null; } - var data = InvocationEvents.Dequeue(); ProcessInvocationEvents[data.AwsRequestId] = data; var inputStream = new MemoryStream(data.FunctionInput == null ? new byte[0] : data.FunctionInput); @@ -84,14 +100,16 @@ public async Task GetNextInvocationAsync(CancellationToken ca public Task SendResponseAsync(string awsRequestId, Stream outputStream, CancellationToken cancellationToken = default) { - var data = ProcessInvocationEvents[awsRequestId]; - data.Complete = true; - if (outputStream != null) + if (ProcessInvocationEvents.TryGetValue(awsRequestId, out var data)) { - // copy the stream because it gets disposed by the bootstrap - data.OutputStream = new MemoryStream((int)outputStream.Length); - outputStream.CopyTo(data.OutputStream); - data.OutputStream.Position = 0; + data.Complete = true; + if (outputStream != null) + { + // copy the stream because it gets disposed by the bootstrap + data.OutputStream = new MemoryStream((int)outputStream.Length); + outputStream.CopyTo(data.OutputStream); + data.OutputStream.Position = 0; + } } return Task.Run(() => { }); diff --git a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/UtilsTest.cs b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/UtilsTest.cs index 8068920be..a41e809ac 100644 --- a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/UtilsTest.cs +++ b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/UtilsTest.cs @@ -29,8 +29,11 @@ public void IsUsingMultiConcurrency(string concurrency, bool isMultiConcurrency) [Theory] [InlineData(null, 4, 1)] - [InlineData("5", 4, 4)] - [InlineData("5", 1, 2)] + [InlineData("5", 4, 5)] + [InlineData("5", 1, 5)] + [InlineData("10", 2, 10)] + [InlineData("enabled", 4, 4)] + [InlineData("enabled", 1, 2)] public void DetermineProcessingTaskCount(string concurrency, int processCount, int expected) { var envVars = new TestEnvironmentVariables();