Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,43 @@ import NIOPosix

@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
extension NIOHTTPServer {
/// Serves incoming plaintext HTTP/1.1 connections.
///
/// Each connection is handled concurrently in its own child task. Individual connection errors are handled within
/// the child tasks and do not affect other connections.
///
/// - Parameters:
/// - serverChannel: The async channel that produces incoming HTTP/1.1 connections.
/// - handler: The request handler.
///
/// - Throws: If an error occurs while iterating the incoming connection stream.
func serveInsecureHTTP1_1(
serverChannel: NIOAsyncChannel<NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>, Never>,
handler: some HTTPServerRequestHandler<RequestConcludingReader, ResponseConcludingWriter>
) async throws {
try await withThrowingDiscardingTaskGroup { group in
try await serverChannel.executeThenClose { inbound in
for try await http1Channel in inbound {
group.addTask {
try await self.handleRequestChannel(
channel: http1Channel,
handler: handler
)
try await serverChannel.executeThenClose { inbound in
// We don't use a `withThrowingDiscardingTaskGroup` here because an error thrown from the body or a child
// task would immediately propagate upwards, cancelling all child tasks and bringing down the entire server.
// We instead use a non-throwing discarding task group so that errors in the body (e.g. from iterating
// `inbound`) must be caught and handled directly.
let inboundConnectionIterationError = await withDiscardingTaskGroup { group -> (any Error)? in
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we just use withThrowingDiscardingTaskGroup here? The only error we're throwing is the one coming from iterating inbound.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is primarily to make it explicit that if you call a throwing function from within the body of the task group, the error must be handled directly rather than letting it propagate upwards and resulting in active tasks in the group being cancelled.

Currently iterating on inbound is the only source of a potential error, but if this was a withThrowingDiscardingTaskGroup then in the future we could inadvertently introduce a call to a throwing function without the compiler warning us.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add some comment here (and in the other func) explaining this rationale? I am not convinced just doing things this way would prevent a future change from changing the behaviour unless it's more documented.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, addressed in commit cb84c52.

do {
for try await http1Channel in inbound {
group.addTask {
await self.handleRequestChannel(channel: http1Channel, handler: handler)
}
}

return nil
} catch {
return error
}
}

if let inboundConnectionIterationError {
// The error occurred while iterating the inbound connection stream
throw inboundConnectionIterationError
}
}
}

Expand Down
142 changes: 106 additions & 36 deletions Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,50 +35,120 @@ extension NIOHTTPServer {
(any Channel, NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>>)
>

/// Serves incoming connections. Each connection undergoes ALPN negotiation to determine whether to use HTTP/1.1 or
/// HTTP/2, and requests are then handled over the negotiated protocol.
///
/// Each accepted connection is handled concurrently in its own child task. Individual negotiation errors and
/// connection errors are handled within the child tasks and do not affect other connections.
///
/// - Parameters:
/// - serverChannel: The async channel that produces incoming connections.
/// - handler: The request handler.
///
/// - Throws: If an error occurs while iterating the incoming connection stream.
func serveSecureUpgrade(
serverChannel: NIOAsyncChannel<EventLoopFuture<NegotiatedChannel>, Never>,
handler: some HTTPServerRequestHandler<RequestConcludingReader, ResponseConcludingWriter>
) async throws {
try await withThrowingDiscardingTaskGroup { group in
try await serverChannel.executeThenClose { inbound in
for try await upgradeResult in inbound {
group.addTask {
do {
try await withThrowingDiscardingTaskGroup { connectionGroup in
switch try await upgradeResult.get() {
case .http1_1(let http1Channel):
let chainFuture = http1Channel.channel.nioSSL_peerValidatedCertificateChain()
Self.$connectionContext.withValue(ConnectionContext(chainFuture)) {
connectionGroup.addTask {
try await self.handleRequestChannel(
channel: http1Channel,
handler: handler
)
}
}
case .http2((let http2Connection, let http2Multiplexer)):
do {
let chainFuture = http2Connection.nioSSL_peerValidatedCertificateChain()
try await Self.$connectionContext.withValue(ConnectionContext(chainFuture)) {
for try await http2StreamChannel in http2Multiplexer.inbound {
connectionGroup.addTask {
try await self.handleRequestChannel(
channel: http2StreamChannel,
handler: handler
)
}
}
}
} catch {
self.logger.debug("HTTP2 connection closed: \(error)")
}
}
try await serverChannel.executeThenClose { inbound in
// We don't use a `withThrowingDiscardingTaskGroup` here because an error thrown from the body or a child
// task would immediately propagate upwards, cancelling all child tasks and bringing down the entire server.
// We instead use a non-throwing discarding task group so that errors in the body (e.g. from iterating
// `inbound`) must be caught and handled directly.
let inboundConnectionIterationError = await withDiscardingTaskGroup { connectionGroup -> (any Error)? in
do {
for try await upgradeResult in inbound {
connectionGroup.addTask {
let negotiatedChannel: NegotiatedChannel

do {
negotiatedChannel = try await upgradeResult.get()
} catch {
self.logger.debug("Negotiating ALPN failed", metadata: ["error": "\(error)"])
return
}
} catch {
self.logger.debug("Negotiating ALPN failed: \(error)")

switch negotiatedChannel {
case .http1_1(let requestChannel):
await self.serveHTTP1Connection(
requestChannel: requestChannel,
handler: handler
)

case .http2((let connectionChannel, let multiplexer)):
await self.serveHTTP2Connection(
connectionChannel: connectionChannel,
multiplexer: multiplexer,
handler: handler
)
}
}
}

return nil
} catch {
return error
}
}

if let inboundConnectionIterationError {
// The error occurred while iterating the inbound connection stream
throw inboundConnectionIterationError
}
}
}

/// Serves a HTTP/1.1 connection.
///
/// - Parameters:
/// - requestChannel: The HTTP/1.1 request channel.
/// - handler: The request handler.
private func serveHTTP1Connection(
requestChannel: NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>,
handler: some HTTPServerRequestHandler<RequestConcludingReader, ResponseConcludingWriter>
) async {
let chainFuture = requestChannel.channel.nioSSL_peerValidatedCertificateChain()

await Self.$connectionContext.withValue(ConnectionContext(chainFuture)) {
await self.handleRequestChannel(
channel: requestChannel,
handler: handler
)
}
}

/// Serves a HTTP/2 connection by iterating the stream channels and handling each stream concurrently.
///
/// - Note: Stream iteration errors are logged but do not propagate to the caller.
///
/// - Parameters:
/// - connectionChannel: The underlying NIO channel for the HTTP/2 connection.
/// - multiplexer: The HTTP/2 stream multiplexer.
/// - handler: The request handler.
private func serveHTTP2Connection(
connectionChannel: any Channel,
multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>>,
handler: some HTTPServerRequestHandler<RequestConcludingReader, ResponseConcludingWriter>
) async {
await withDiscardingTaskGroup { streamGroup in
do {
let chainFuture = connectionChannel.nioSSL_peerValidatedCertificateChain()

try await Self.$connectionContext.withValue(ConnectionContext(chainFuture)) {
for try await streamChannel in multiplexer.inbound {
streamGroup.addTask {
await self.handleRequestChannel(
channel: streamChannel,
handler: handler
)
}
}
}
} catch {
self.logger.error(
"Error thrown while iterating over incoming HTTP/2 streams",
metadata: ["error": "\(error)"]
)
}
}
}
Expand Down
147 changes: 83 additions & 64 deletions Sources/NIOHTTPServer/NIOHTTPServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -213,82 +213,101 @@ public struct NIOHTTPServer: HTTPServer {
}
}

/// Handles a single HTTP request.
///
/// - Note: Errors do not propagate to the caller. When an error occurs, it is logged and the channel is closed.
///
/// - Parameters:
/// - channel: The async channel to read the request from and write the response to.
/// - handler: The request handler.
func handleRequestChannel(
channel: NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>,
handler: some HTTPServerRequestHandler<RequestConcludingReader, ResponseConcludingWriter>
) async throws {
) async {
do {
try await channel
.executeThenClose { inbound, outbound in
var iterator = inbound.makeAsyncIterator()

let httpRequest: HTTPRequest
switch try await iterator.next() {
case .head(let request):
httpRequest = request
case .body:
self.logger.debug("Unexpectedly received body on connection. Closing now")
outbound.finish()
return
case .end:
self.logger.debug("Unexpectedly received end on connection. Closing now")
outbound.finish()
return
case .none:
self.logger.trace("No more requests parts on connection")
return
}
try await channel.executeThenClose { inbound, outbound in
var iterator = inbound.makeAsyncIterator()

let nextPart: HTTPRequestPart?
do {
nextPart = try await iterator.next()
} catch {
self.logger.error(
"Error thrown while advancing the request iterator",
metadata: ["error": "\(error)"]
)
throw error
}

let readerState = HTTPRequestConcludingAsyncReader.ReaderState()
let writerState = HTTPResponseConcludingAsyncWriter.WriterState()

do {
try await handler.handle(
request: httpRequest,
requestContext: HTTPRequestContext(),
requestBodyAndTrailers: HTTPRequestConcludingAsyncReader(
iterator: iterator,
readerState: readerState
),
responseSender: HTTPResponseSender { response in
try await outbound.write(.head(response))
return HTTPResponseConcludingAsyncWriter(
writer: outbound,
writerState: writerState
)
} sendInformational: { response in
try await outbound.write(.head(response))
}
)
} catch {
logger.error("Error thrown while handling connection: \(error)")
if !readerState.wrapped.withLock({ $0.finishedReading }) {
logger.error("Did not finish reading but error thrown.")
// TODO: if h2 reset stream; if h1 try draining request?
}
if !writerState.wrapped.withLock({ $0.finishedWriting }) {
logger.error("Did not write response but error thrown.")
// TODO: we need to do something, possibly just close the connection or
// reset the stream with the appropriate error.
let httpRequest: HTTPRequest
switch nextPart {
case .head(let request):
httpRequest = request
case .body:
self.logger.debug("Unexpectedly received body on connection. Closing now")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have something like a connectionID here? Can easily be done in a follow up...

outbound.finish()
return
case .end:
self.logger.debug("Unexpectedly received end on connection. Closing now")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have something like a connectionID here? Can easily be done in a follow up...

outbound.finish()
return
case .none:
self.logger.trace("No more requests parts on connection")
return
}

let readerState = HTTPRequestConcludingAsyncReader.ReaderState()
let writerState = HTTPResponseConcludingAsyncWriter.WriterState()

do {
try await handler.handle(
request: httpRequest,
requestContext: HTTPRequestContext(),
requestBodyAndTrailers: HTTPRequestConcludingAsyncReader(
iterator: iterator,
readerState: readerState
),
responseSender: HTTPResponseSender { response in
try await outbound.write(.head(response))
return HTTPResponseConcludingAsyncWriter(
writer: outbound,
writerState: writerState
)
} sendInformational: { response in
try await outbound.write(.head(response))
}
throw error
)
} catch {
if !readerState.wrapped.withLock({ $0.finishedReading }) {
self.logger.error("Did not finish reading but error thrown.")
// TODO: if h2 reset stream; if h1 try draining request?
}

// TODO: handle other state scenarios.
// For example, if we're using h2 and we didn't finish reading but we wrote back
// a response, we should send a RST_STREAM with NO_ERROR set.
// If we finished reading but we didn't write back a response, then RST_STREAM
// is also likely appropriate but unclear about the error.
// For h1, we should close the connection.
if !writerState.wrapped.withLock({ $0.finishedWriting }) {
self.logger.error("Did not write response but error thrown.")
// TODO: we need to do something, possibly just close the connection or
// reset the stream with the appropriate error.
}

// Finish the outbound and wait on the close future to make sure all pending
// writes are actually written.
outbound.finish()
try await channel.channel.closeFuture.get()
throw error
}

// TODO: handle other state scenarios.
// For example, if we're using h2 and we didn't finish reading but we wrote back
// a response, we should send a RST_STREAM with NO_ERROR set.
// If we finished reading but we didn't write back a response, then RST_STREAM
// is also likely appropriate but unclear about the error.
// For h1, we should close the connection.

// Finish the outbound and wait on the close future to make sure all pending
// writes are actually written.
outbound.finish()
try await channel.channel.closeFuture.get()
}
} catch {
self.logger.debug("Error thrown while handling connection: \(error)")
// TODO: We need to send a response head here potentially
self.logger.error("Error thrown while handling connection", metadata: ["error": "\(error)"])
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connectionID?


try? await channel.channel.close()
}
}
Expand Down
Loading