-
Notifications
You must be signed in to change notification settings - Fork 494
Add support for Lambda Response Streaming #2288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/response-streaming
Are you sure you want to change the base?
Changes from all commits
22d1469
5e0f810
20e5ba8
0cdb159
0aa892b
5e29c21
603612d
63224bf
14c993b
0b8dd52
414a449
21d82d8
556b726
4d5dee2
d8322ff
645771e
1f17a58
99b83c2
ab93ce9
f679efa
bd0170e
d60bb93
9b308ae
d0861c6
3c86629
c637ef1
42d3212
b7b51bd
2eb2eb4
1a86609
98d2d25
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| { | ||
| "Projects": [ | ||
| { | ||
| "Name": "Amazon.Lambda.RuntimeSupport", | ||
| "Type": "Minor", | ||
| "ChangelogMessages": [ | ||
| "(Preview) Add response streaming support" | ||
| ] | ||
| }, | ||
| { | ||
| "Name": "Amazon.Lambda.Core", | ||
| "Type": "Minor", | ||
| "ChangelogMessages": [ | ||
| "(Preview) Add response streaming support" | ||
| ] | ||
| } | ||
| ] | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,8 @@ | |
| *.suo | ||
| *.user | ||
|
|
||
| **/.kiro/ | ||
|
|
||
| #################### | ||
| # Build/Test folders | ||
| #################### | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| #if NET8_0_OR_GREATER | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Net; | ||
| using System.Runtime.Versioning; | ||
| using System.Text.Json; | ||
|
|
||
| namespace Amazon.Lambda.Core.ResponseStreaming | ||
| { | ||
| /// <summary> | ||
| /// The HTTP response prelude to be sent as the first chunk of a streaming response when using <see cref="LambdaResponseStreamFactory.CreateHttpStream"/>. | ||
| /// </summary> | ||
| [RequiresPreviewFeatures(LambdaResponseStreamFactory.PreviewMessage)] | ||
| public class HttpResponseStreamPrelude | ||
| { | ||
| /// <summary> | ||
| /// The Http status code to include in the response prelude. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: all docs for the fields of this class has "to include in the response prelude.". It feels redundant. |
||
| /// </summary> | ||
| public HttpStatusCode? StatusCode { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// The response headers to include in the response prelude. This collection supports setting single value for the same headers. | ||
| /// </summary> | ||
| public IDictionary<string, string> Headers { get; set; } = new Dictionary<string, string>(); | ||
|
|
||
| /// <summary> | ||
| /// The response headers to include in the response prelude. This collection supports setting multiple values for the same headers. | ||
| /// </summary> | ||
| public IDictionary<string, IList<string>> MultiValueHeaders { get; set; } = new Dictionary<string, IList<string>>(); | ||
|
Comment on lines
+26
to
+31
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a reason why both are needed? why not just have 1 that supports multi value? |
||
|
|
||
| /// <summary> | ||
| /// The list of cookies to include in the response prelude. This is used for Lambda Function URL responses, which support a separate "cookies" field in the response JSON for setting cookies, rather than requiring cookies to be set via the "Set-Cookie" header. | ||
| /// </summary> | ||
| public IList<string> Cookies { get; set; } = new List<string>(); | ||
|
|
||
| internal byte[] ToByteArray() | ||
| { | ||
| var bufferWriter = new System.Buffers.ArrayBufferWriter<byte>(); | ||
| using (var writer = new Utf8JsonWriter(bufferWriter)) | ||
| { | ||
| writer.WriteStartObject(); | ||
|
|
||
| if (StatusCode.HasValue) | ||
| writer.WriteNumber("statusCode", (int)StatusCode); | ||
|
|
||
| if (Headers?.Count > 0) | ||
| { | ||
| writer.WriteStartObject("headers"); | ||
| foreach (var header in Headers) | ||
| { | ||
| writer.WriteString(header.Key, header.Value); | ||
| } | ||
| writer.WriteEndObject(); | ||
| } | ||
|
|
||
| if (MultiValueHeaders?.Count > 0) | ||
| { | ||
| writer.WriteStartObject("multiValueHeaders"); | ||
| foreach (var header in MultiValueHeaders) | ||
| { | ||
| writer.WriteStartArray(header.Key); | ||
| foreach (var value in header.Value) | ||
| { | ||
| writer.WriteStringValue(value); | ||
| } | ||
| writer.WriteEndArray(); | ||
| } | ||
| writer.WriteEndObject(); | ||
| } | ||
|
|
||
| if (Cookies?.Count > 0) | ||
| { | ||
| writer.WriteStartArray("cookies"); | ||
| foreach (var cookie in Cookies) | ||
| { | ||
| writer.WriteStringValue(cookie); | ||
| } | ||
| writer.WriteEndArray(); | ||
| } | ||
|
|
||
| writer.WriteEndObject(); | ||
| } | ||
|
|
||
| if (string.Equals(Environment.GetEnvironmentVariable("LAMBDA_NET_SERIALIZER_DEBUG"), "true", StringComparison.OrdinalIgnoreCase)) | ||
| { | ||
| LambdaLogger.Log(LogLevel.Information, "HTTP Response Stream Prelude JSON: {Prelude}", System.Text.Encoding.UTF8.GetString(bufferWriter.WrittenSpan)); | ||
| } | ||
|
|
||
| return bufferWriter.WrittenSpan.ToArray(); | ||
| } | ||
| } | ||
| } | ||
| #endif | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| #if NET8_0_OR_GREATER | ||
| using System; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
|
|
||
| namespace Amazon.Lambda.Core.ResponseStreaming | ||
| { | ||
| /// <summary> | ||
| /// Interface for writing streaming responses in AWS Lambda functions. | ||
| /// Obtained by calling <see cref="LambdaResponseStreamFactory.CreateStream"/> within a handler. | ||
| /// </summary> | ||
| internal interface ILambdaResponseStream : IDisposable | ||
| { | ||
| /// <summary> | ||
| /// Asynchronously writes a portion of a byte array to the response stream. | ||
| /// </summary> | ||
| /// <param name="buffer">The byte array containing data to write.</param> | ||
| /// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes.</param> | ||
| /// <param name="count">The number of bytes to write.</param> | ||
| /// <param name="cancellationToken">Optional cancellation token.</param> | ||
| /// <returns>A task representing the asynchronous operation.</returns> | ||
| /// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception> | ||
| Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default); | ||
|
|
||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: remove extra whitespace |
||
| /// <summary> | ||
| /// Gets the total number of bytes written to the stream so far. | ||
| /// </summary> | ||
| long BytesWritten { get; } | ||
|
|
||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: remove extra whitespace |
||
| /// <summary> | ||
| /// Gets whether an error has been reported. | ||
| /// </summary> | ||
| bool HasError { get; } | ||
| } | ||
| } | ||
| #endif | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| #if NET8_0_OR_GREATER | ||
|
|
||
| using System; | ||
| using System.IO; | ||
| using System.Runtime.Versioning; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
|
|
||
| namespace Amazon.Lambda.Core.ResponseStreaming | ||
| { | ||
| /// <summary> | ||
| /// A write-only, non-seekable <see cref="Stream"/> subclass that streams response data | ||
| /// to the Lambda Runtime API. Returned by <see cref="LambdaResponseStreamFactory.CreateStream"/>. | ||
| /// Integrates with standard .NET stream consumers such as <see cref="System.IO.StreamWriter"/>. | ||
| /// </summary> | ||
| [RequiresPreviewFeatures(LambdaResponseStreamFactory.PreviewMessage)] | ||
| public class LambdaResponseStream : Stream | ||
| { | ||
| private readonly ILambdaResponseStream _responseStream; | ||
|
|
||
| internal LambdaResponseStream(ILambdaResponseStream responseStream) | ||
| { | ||
| _responseStream = responseStream; | ||
normj marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| /// <summary> | ||
| /// The number of bytes written to the Lambda response stream so far. | ||
| /// </summary> | ||
| public long BytesWritten => _responseStream.BytesWritten; | ||
|
|
||
| /// <summary> | ||
| /// Asynchronously writes a byte array to the response stream. | ||
| /// </summary> | ||
| /// <param name="buffer">The byte array to write.</param> | ||
| /// <param name="cancellationToken">Optional cancellation token.</param> | ||
| /// <returns>A task representing the asynchronous operation.</returns> | ||
| /// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception> | ||
| public async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken = default) | ||
| { | ||
| if (buffer == null) | ||
| throw new ArgumentNullException(nameof(buffer)); | ||
|
|
||
| await WriteAsync(buffer, 0, buffer.Length, cancellationToken); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Asynchronously writes a portion of a byte array to the response stream. | ||
| /// </summary> | ||
| /// <param name="buffer">The byte array containing data to write.</param> | ||
| /// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes.</param> | ||
| /// <param name="count">The number of bytes to write.</param> | ||
| /// <param name="cancellationToken">Optional cancellation token.</param> | ||
| /// <returns>A task representing the asynchronous operation.</returns> | ||
| /// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception> | ||
| public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default) | ||
| { | ||
| await _responseStream.WriteAsync(buffer, offset, count, cancellationToken); | ||
| } | ||
|
|
||
| #region Noop Overrides | ||
|
|
||
| /// <summary>Gets a value indicating whether the stream supports reading. Always <c>false</c>.</summary> | ||
| public override bool CanRead => false; | ||
|
|
||
| /// <summary>Gets a value indicating whether the stream supports seeking. Always <c>false</c>.</summary> | ||
| public override bool CanSeek => false; | ||
|
|
||
| /// <summary>Gets a value indicating whether the stream supports writing. Always <c>true</c>.</summary> | ||
| public override bool CanWrite => true; | ||
|
|
||
| /// <summary> | ||
| /// Gets the total number of bytes written to the stream so far. | ||
| /// Equivalent to <see cref="BytesWritten"/>. | ||
| /// </summary> | ||
| public override long Length => BytesWritten; | ||
|
|
||
| /// <summary> | ||
| /// Getting or setting the position is not supported. | ||
| /// </summary> | ||
| /// <exception cref="NotSupportedException">Always thrown.</exception> | ||
| public override long Position | ||
| { | ||
| get => throw new NotSupportedException($"{nameof(LambdaResponseStream)} does not support seeking."); | ||
| set => throw new NotSupportedException($"{nameof(LambdaResponseStream)} does not support seeking."); | ||
| } | ||
|
|
||
| /// <summary>Not supported.</summary> | ||
| /// <exception cref="NotImplementedException">Always thrown.</exception> | ||
| public override long Seek(long offset, SeekOrigin origin) | ||
| => throw new NotImplementedException($"{nameof(LambdaResponseStream)} does not support seeking."); | ||
|
|
||
| /// <summary>Not supported.</summary> | ||
| /// <exception cref="NotImplementedException">Always thrown.</exception> | ||
| public override int Read(byte[] buffer, int offset, int count) | ||
| => throw new NotImplementedException($"{nameof(LambdaResponseStream)} does not support reading."); | ||
|
|
||
| /// <summary>Not supported.</summary> | ||
| /// <exception cref="NotImplementedException">Always thrown.</exception> | ||
| public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | ||
| => throw new NotImplementedException($"{nameof(LambdaResponseStream)} does not support reading."); | ||
|
|
||
| /// <summary> | ||
| /// Writes a sequence of bytes to the stream. Delegates to the async path synchronously. | ||
| /// Prefer <see cref="WriteAsync(byte[], int, int, CancellationToken)"/> to avoid blocking. | ||
| /// </summary> | ||
| public override void Write(byte[] buffer, int offset, int count) | ||
| => WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); | ||
|
|
||
| /// <summary> | ||
| /// Flush is a no-op; data is sent to the Runtime API immediately on each write. | ||
| /// </summary> | ||
| public override void Flush() { } | ||
|
|
||
| /// <summary>Not supported.</summary> | ||
| /// <exception cref="NotSupportedException">Always thrown.</exception> | ||
| public override void SetLength(long value) | ||
| => throw new NotSupportedException($"{nameof(LambdaResponseStream)} does not support SetLength."); | ||
| #endregion | ||
| } | ||
| } | ||
| #endif | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you give more context as to why the prelude is needed?