From 1b197d48bf884f0273a3bd6f374aeaa9f22ac9fb Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 7 Jan 2026 13:14:49 +0100 Subject: [PATCH 01/14] Rename `succeedRequest` -> `forwardResponseEnd` in `HTTPRequestStateMachine.Action` --- .../ConnectionPool/HTTPRequestStateMachine.swift | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift index e06389360..7e579a34b 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift @@ -111,9 +111,9 @@ struct HTTPRequestStateMachine { case forwardResponseHead(HTTPResponseHead, pauseRequestBodyStream: Bool) case forwardResponseBodyParts(CircularBuffer) + case forwardResponseEnd(FinalSuccessfulRequestAction, CircularBuffer) case failRequest(Error, FinalFailedRequestAction) - case succeedRequest(FinalSuccessfulRequestAction, CircularBuffer) case read case wait @@ -395,7 +395,7 @@ struct HTTPRequestStateMachine { } self.state = .finished - return .succeedRequest(.sendRequestEnd(promise), .init()) + return .forwardResponseEnd(.sendRequestEnd(promise), .init()) case .failed(let error): return .failSendStreamFinished(error, promise) @@ -671,7 +671,7 @@ struct HTTPRequestStateMachine { // connection should be closed anyway. let (remainingBuffer, _) = responseStreamState.end() state = .finished - return .succeedRequest(.close, remainingBuffer) + return .forwardResponseEnd(.close, remainingBuffer) } case .running(.endSent, .receivingBody(_, var responseStreamState)): @@ -680,9 +680,9 @@ struct HTTPRequestStateMachine { state = .finished switch action { case .none: - return .succeedRequest(.none, remainingBuffer) + return .forwardResponseEnd(.none, remainingBuffer) case .close: - return .succeedRequest(.close, remainingBuffer) + return .forwardResponseEnd(.close, remainingBuffer) } } From b325c1681c8b827f9e5f55468298b8d16b479b6f Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 7 Jan 2026 15:56:13 +0100 Subject: [PATCH 02/14] Lots of changes --- .../AsyncAwait/Transaction+StateMachine.swift | 53 +++++++++----- .../AsyncAwait/Transaction.swift | 6 ++ .../HTTP1/HTTP1ClientChannelHandler.swift | 63 ++++++++++------- .../HTTP1/HTTP1ConnectionStateMachine.swift | 60 ++++++++++------ .../HTTP2/HTTP2ClientRequestHandler.swift | 10 ++- .../HTTPExecutableRequest.swift | 5 ++ .../HTTPRequestStateMachine.swift | 15 ++-- Sources/AsyncHTTPClient/RequestBag.swift | 14 ++++ .../HTTP1ConnectionStateMachineTests.swift | 38 +++------- ...HTTPConnectionPool+RequestQueueTests.swift | 4 ++ .../HTTPRequestStateMachineTests.swift | 69 +++++++------------ .../Mocks/MockConnectionPool.swift | 4 ++ .../Mocks/MockHTTPExecutableRequest.swift | 26 +++++-- 13 files changed, 212 insertions(+), 155 deletions(-) diff --git a/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift b/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift index 49d734df0..a33844c28 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift @@ -38,6 +38,7 @@ extension Transaction { case requestHeadSent case producing case paused(continuation: CheckedContinuation?) + case endForwarded case finished } @@ -135,7 +136,7 @@ extension Transaction { bodyStreamContinuation: continuation ) - case .requestHeadSent, .finished, .producing, .paused(continuation: .none): + case .requestHeadSent, .endForwarded, .finished, .producing, .paused(continuation: .none): self.state = .finished(error: error) return .failResponseHead( context.continuation, @@ -156,7 +157,7 @@ extension Transaction { context.executor, bodyStreamContinuation: bodyStreamContinuation ) - case .finished, .producing, .requestHeadSent: + case .endForwarded, .finished, .producing, .requestHeadSent: return .failResponseStream(source, error, context.executor, bodyStreamContinuation: nil) } @@ -232,7 +233,7 @@ extension Transaction { self.state = .executing(context, .producing, responseState) return .resumeStream(continuation) - case .executing(_, .finished, _): + case .executing(_, .endForwarded, _), .executing(_, .finished, _): // the channels writability changed to writable after we have forwarded all the // request bytes. Can be ignored. return .none @@ -254,6 +255,7 @@ extension Transaction { self.state = .executing(context, .paused(continuation: nil), responseSteam) case .executing(_, .paused, _), + .executing(_, .endForwarded, _), .executing(_, .finished, _), .finished: // the channels writability changed to paused after we have already forwarded all @@ -298,7 +300,7 @@ extension Transaction { "A write continuation already exists, but we tried to set another one. Invalid state: \(self.state)" ) - case .finished, .executing(_, .finished, _): + case .finished, .executing(_, .endForwarded, _), .executing(_, .finished, _): return .fail } } @@ -309,6 +311,7 @@ extension Transaction { .queued, .deadlineExceededWhileQueued, .executing(_, .requestHeadSent, _), + .executing(_, .endForwarded, _), .executing(_, .finished, _): preconditionFailure( "A request stream can only produce, if the request was started. Invalid state: \(self.state)" @@ -343,6 +346,7 @@ extension Transaction { case .initialized, .queued, .deadlineExceededWhileQueued, + .executing(_, .endForwarded, _), .executing(_, .finished, _): preconditionFailure("Invalid state: \(self.state)") @@ -355,23 +359,38 @@ extension Transaction { .executing(let context, .paused(continuation: .none), let responseState), .executing(let context, .requestHeadSent, let responseState): - switch responseState { - case .finished: - // if the response stream has already finished before the request, we must succeed - // the final continuation. - self.state = .finished(error: nil) - return .forwardStreamFinished(context.executor) - - case .waitingForResponseHead, .streamingBody: - self.state = .executing(context, .finished, responseState) - return .forwardStreamFinished(context.executor) - } + self.state = .executing(context, .endForwarded, responseState) + return .forwardStreamFinished(context.executor) case .finished: return .none } } + mutating func requestBodyStreamSent() { + switch self.state { + case .initialized, + .queued, + .deadlineExceededWhileQueued, + .executing(_, .requestHeadSent, _), + .executing(_, .finished, _), + .executing(_, .producing, _), + .executing(_, .paused, _): + preconditionFailure("Invalid state: \(self.state)") + + case .executing(_, .endForwarded, .finished): + self.state = .finished(error: nil) +// return . + + case .executing(let context, .endForwarded, let responseState): + self.state = .executing(context, .finished, responseState) +// return . + + case .finished: + break + } + } + // MARK: - Response - enum ReceiveResponseHeadAction { @@ -482,7 +501,7 @@ extension Transaction { switch requestState { case .finished: self.state = .finished(error: nil) - case .paused, .producing, .requestHeadSent: + case .paused, .producing, .requestHeadSent, .endForwarded: self.state = .executing(context, requestState, .finished) } return .finishResponseStream(source, finalBody: newChunks) @@ -538,7 +557,7 @@ extension Transaction { executor: context.executor, bodyStreamContinuation: continuation ) - case .requestHeadSent, .finished, .producing, .paused(continuation: .none): + case .requestHeadSent, .endForwarded, .finished, .producing, .paused(continuation: .none): self.state = .finished(error: error) return .cancel( requestContinuation: context.continuation, diff --git a/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift b/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift index c8bf54b09..e64400e88 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift @@ -245,6 +245,12 @@ extension Transaction: HTTPExecutableRequest { } } + func requestBodyStreamSent() { + self.state.withLockedValue { state in + state.requestBodyStreamSent() + } + } + // MARK: Response func receiveResponseHead(_ head: HTTPResponseHead) { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift index 02ebab916..0105237a4 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift @@ -242,7 +242,40 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { case .sendBodyPart(let part, let writePromise): context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: writePromise) - case .sendRequestEnd(let writePromise): + case .sendRequestEnd(let writePromise, let finalAction): + + let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self) + // We need to defer succeeding the old request to avoid ordering issues + + writePromise.futureResult.hop(to: context.eventLoop).assumeIsolated().whenComplete { result in + let oldRequest = self.request! + oldRequest.requestBodyStreamSent() + switch result { + case .success: + // If our final action is not `none`, that means we've already received + // the complete response. As a result, once we've uploaded all the body parts + // we need to tell the pool that the connection is idle or, if we were asked to + // close when we're done, send the close. Either way, we then succeed the request + + switch finalAction { + case .none: + break + + case .informConnectionIsIdle: + self.request = nil + self.onConnectionIdle() + + case .close: + self.request = nil + context.close(promise: nil) + } + + case .failure(let error): + context.close(promise: nil) + oldRequest.fail(error) + } + } + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise) if let readTimeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() { @@ -300,7 +333,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { // that the request is neither failed nor finished yet self.request!.receiveResponseBodyParts(buffer) - case .succeedRequest(let finalAction, let buffer): + case .forwardResponseEnd(let finalAction, let buffer): // We can force unwrap the request here, as we have just validated in the state machine, // that the request is neither failed nor finished yet @@ -320,30 +353,10 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { case .close: context.close(promise: nil) oldRequest.receiveResponseEnd(buffer, trailers: nil) - case .sendRequestEnd(let writePromise, let shouldClose): - let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self) - // We need to defer succeeding the old request to avoid ordering issues - writePromise.futureResult.hop(to: context.eventLoop).assumeIsolated().whenComplete { result in - switch result { - case .success: - // If our final action was `sendRequestEnd`, that means we've already received - // the complete response. As a result, once we've uploaded all the body parts - // we need to tell the pool that the connection is idle or, if we were asked to - // close when we're done, send the close. Either way, we then succeed the request - if shouldClose { - context.close(promise: nil) - } else { - self.onConnectionIdle() - } - - oldRequest.receiveResponseEnd(buffer, trailers: nil) - case .failure(let error): - context.close(promise: nil) - oldRequest.fail(error) - } - } - context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise) + case .none: + oldRequest.receiveResponseEnd(buffer, trailers: nil) + case .informConnectionIsIdle: self.onConnectionIdle() oldRequest.receiveResponseEnd(buffer, trailers: nil) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift index 2cde1df3f..3b2da8bb6 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift @@ -27,18 +27,12 @@ struct HTTP1ConnectionStateMachine { } enum Action { - /// A action to execute, when we consider a request "done". + /// An additional action to execute, when either the response or request stream has finished. enum FinalSuccessfulStreamAction { + /// Nothing todo + case none /// Close the connection case close - /// If the server has replied, with a status of 200...300 before all data was sent, a request is considered succeeded, - /// as soon as we wrote the request end onto the wire. - /// - /// The promise is an optional write promise. - /// - /// `shouldClose` records whether we have attached a Connection: close header to this request, and so the connection should - /// be terminated - case sendRequestEnd(EventLoopPromise?, shouldClose: Bool) /// Inform an observer that the connection has become idle case informConnectionIsIdle } @@ -63,7 +57,7 @@ struct HTTP1ConnectionStateMachine { startIdleTimer: Bool ) case sendBodyPart(IOData, EventLoopPromise?) - case sendRequestEnd(EventLoopPromise?) + case sendRequestEnd(EventLoopPromise?, FinalSuccessfulStreamAction) case failSendBodyPart(Error, EventLoopPromise?) case failSendStreamFinished(Error, EventLoopPromise?) @@ -72,9 +66,9 @@ struct HTTP1ConnectionStateMachine { case forwardResponseHead(HTTPResponseHead, pauseRequestBodyStream: Bool) case forwardResponseBodyParts(CircularBuffer) + case forwardResponseEnd(FinalSuccessfulStreamAction, CircularBuffer) case failRequest(Error, FinalFailedStreamAction) - case succeedRequest(FinalSuccessfulStreamAction, CircularBuffer) case read case close @@ -433,13 +427,34 @@ extension HTTP1ConnectionStateMachine.State { return .resumeRequestBodyStream case .sendBodyPart(let part, let writePromise): return .sendBodyPart(part, writePromise) - case .sendRequestEnd(let writePromise): - return .sendRequestEnd(writePromise) + case .sendRequestEnd(let writePromise, let finalAction): + guard case .inRequest(_, close: let close) = self else { + fatalError("Invalid state: \(self)") + } + + let newFinalAction: HTTP1ConnectionStateMachine.Action.FinalSuccessfulStreamAction + switch finalAction { + case .close: + self = .closing + newFinalAction = .close + case .requestDone: + if close { + self = .closing + newFinalAction = .close + } else { + self = .idle + newFinalAction = .informConnectionIsIdle + } + case .none: + newFinalAction = .none + } + return .sendRequestEnd(writePromise, newFinalAction) + case .forwardResponseHead(let head, let pauseRequestBodyStream): return .forwardResponseHead(head, pauseRequestBodyStream: pauseRequestBodyStream) case .forwardResponseBodyParts(let parts): return .forwardResponseBodyParts(parts) - case .succeedRequest(let finalAction, let finalParts): + case .forwardResponseEnd(let finalAction, let finalParts): guard case .inRequest(_, close: let close) = self else { fatalError("Invalid state: \(self)") } @@ -449,14 +464,19 @@ extension HTTP1ConnectionStateMachine.State { case .close: self = .closing newFinalAction = .close - case .sendRequestEnd(let writePromise): - self = .idle - newFinalAction = .sendRequestEnd(writePromise, shouldClose: close) + case .requestDone: + if close { + self = .closing + newFinalAction = .close + } else { + self = .idle + newFinalAction = .informConnectionIsIdle + } case .none: - self = .idle - newFinalAction = close ? .close : .informConnectionIsIdle + // request is ongoing. request stream is still alive + newFinalAction = .none } - return .succeedRequest(newFinalAction, finalParts) + return .forwardResponseEnd(newFinalAction, finalParts) case .failRequest(let error, let finalAction): switch self { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift index b010b672e..244d2e8f7 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift @@ -196,7 +196,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { case .sendBodyPart(let data, let writePromise): context.writeAndFlush(self.wrapOutboundOut(.body(data)), promise: writePromise) - case .sendRequestEnd(let writePromise): + case .sendRequestEnd(let writePromise, let finalAction): context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise) if let readTimeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() { @@ -206,6 +206,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { if let writeTimeoutAction = self.idleWriteTimeoutStateMachine?.requestEndSent() { self.runTimeoutAction(writeTimeoutAction, context: context) } + self.runSuccessfulFinalAction(finalAction, context: context) case .read: context.read() @@ -247,7 +248,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { // the right result for HTTP/1). In the h2 case we MUST always close. self.runFailedFinalAction(finalAction, context: context, error: error) - case .succeedRequest(let finalAction, let finalParts): + case .forwardResponseEnd(let finalAction, let finalParts): // We can force unwrap the request here, as we have just validated in the state machine, // that the request object is still present. self.request!.receiveResponseEnd(finalParts, trailers: nil) @@ -277,14 +278,11 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { context: ChannelHandlerContext ) { switch action { - case .close, .none: + case .close, .none, .requestDone: // The actions returned here come from an `HTTPRequestStateMachine` that assumes http/1.1 // semantics. For this reason we can ignore the close here, since an h2 stream is closed // after every request anyway. break - - case .sendRequestEnd(let writePromise): - context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise) } } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift index 0635c7978..b502ad034 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift @@ -244,6 +244,11 @@ protocol HTTPExecutableRequest: AnyObject, Sendable { /// This will be called on the Channel's EventLoop. Do **not block** during your execution! func pauseRequestBodyStream() + /// Will be called by the ChannelHandler to indicate that the request body stream has been sent. + /// + /// This will be called on the Channel's EventLoop. Do **not block** during your execution! + func requestBodyStreamSent() + /// Receive a response head. /// /// Please note that `receiveResponseHead` and `receiveResponseBodyPart` may diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift index 7e579a34b..58b8d7e37 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift @@ -73,7 +73,7 @@ struct HTTPRequestStateMachine { } enum Action { - /// A action to execute, when we consider a successful request "done". + /// A action to execute, when we consider a request or response stream "done". enum FinalSuccessfulRequestAction { /// Close the connection case close @@ -81,7 +81,7 @@ struct HTTPRequestStateMachine { /// as soon as we wrote the request end onto the wire. /// /// The promise is an optional write promise. - case sendRequestEnd(EventLoopPromise?) + case requestDone /// Do nothing. This is action is used, if the request failed, before we the request head was written onto the wire. /// This might happen if the request is cancelled, or the request failed the soundness check. case none @@ -102,7 +102,7 @@ struct HTTPRequestStateMachine { startIdleTimer: Bool ) case sendBodyPart(IOData, EventLoopPromise?) - case sendRequestEnd(EventLoopPromise?) + case sendRequestEnd(EventLoopPromise?, FinalSuccessfulRequestAction) case failSendBodyPart(Error, EventLoopPromise?) case failSendStreamFinished(Error, EventLoopPromise?) @@ -370,7 +370,7 @@ struct HTTPRequestStateMachine { } self.state = .running(.endSent, .waitingForHead) - return .sendRequestEnd(promise) + return .sendRequestEnd(promise, .none) case .running( .streaming(let expectedBodyLength, let sentBodyBytes, _), @@ -385,7 +385,7 @@ struct HTTPRequestStateMachine { } self.state = .running(.endSent, .receivingBody(head, streamState)) - return .sendRequestEnd(promise) + return .sendRequestEnd(promise, .none) case .running(.streaming(let expectedBodyLength, let sentBodyBytes, _), .endReceived): if let expected = expectedBodyLength, expected != sentBodyBytes { @@ -395,7 +395,7 @@ struct HTTPRequestStateMachine { } self.state = .finished - return .forwardResponseEnd(.sendRequestEnd(promise), .init()) + return .sendRequestEnd(promise, .none) case .failed(let error): return .failSendStreamFinished(error, promise) @@ -648,7 +648,8 @@ struct HTTPRequestStateMachine { ), .endReceived ) - return .forwardResponseBodyParts(remainingBuffer) + return .forwardResponseEnd(.none, remainingBuffer) + case .close: // If we receive a `.close` as a connectionAction from the responseStreamState // this means, that the response end was signaled by a connection close. Since diff --git a/Sources/AsyncHTTPClient/RequestBag.swift b/Sources/AsyncHTTPClient/RequestBag.swift index a743f0814..132d01798 100644 --- a/Sources/AsyncHTTPClient/RequestBag.swift +++ b/Sources/AsyncHTTPClient/RequestBag.swift @@ -187,6 +187,10 @@ final class RequestBag: Sendabl self.loopBoundState.value.state.pauseRequestBodyStream() } + private func requestBodyStreamSent0() { + + } + private func writeNextRequestPart(_ part: IOData) -> EventLoopFuture { if self.eventLoop.inEventLoop { return self.writeNextRequestPart0(part) @@ -518,6 +522,16 @@ extension RequestBag: HTTPExecutableRequest { } } + func requestBodyStreamSent() { + if self.task.eventLoop.inEventLoop { + self.requestBodyStreamSent0() + } else { + self.task.eventLoop.execute { + self.requestBodyStreamSent0() + } + } + } + func receiveResponseHead(_ head: HTTPResponseHead) { if self.task.eventLoop.inEventLoop { self.receiveResponseHead0(head) diff --git a/Tests/AsyncHTTPClientTests/HTTP1ConnectionStateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTP1ConnectionStateMachineTests.swift index 1c6e9659f..c1f4ba0a1 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ConnectionStateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ConnectionStateMachineTests.swift @@ -52,7 +52,7 @@ class HTTP1ConnectionStateMachineTests: XCTestCase { // once we receive a writable event again, we can allow the producer to produce more data XCTAssertEqual(state.writabilityChanged(writable: true), .resumeRequestBodyStream) XCTAssertEqual(state.requestStreamPartReceived(part3, promise: nil), .sendBodyPart(part3, nil)) - XCTAssertEqual(state.requestStreamFinished(promise: nil), .sendRequestEnd(nil)) + XCTAssertEqual(state.requestStreamFinished(promise: nil), .sendRequestEnd(nil, .none)) let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) XCTAssertEqual( @@ -61,7 +61,7 @@ class HTTP1ConnectionStateMachineTests: XCTestCase { ) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.informConnectionIsIdle, .init([responseBody]))) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.informConnectionIsIdle, .init([responseBody]))) XCTAssertEqual(state.channelReadComplete(), .wait) } @@ -96,7 +96,7 @@ class HTTP1ConnectionStateMachineTests: XCTestCase { XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part2]))) XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) XCTAssertEqual(state.read(), .read) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.informConnectionIsIdle, .init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.informConnectionIsIdle, .init())) XCTAssertEqual(state.channelReadComplete(), .wait) XCTAssertEqual(state.read(), .read) } @@ -140,7 +140,7 @@ class HTTP1ConnectionStateMachineTests: XCTestCase { ) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, .init([responseBody]))) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.close, .init([responseBody]))) XCTAssertEqual(state.channelInactive(), .fireChannelInactive) } @@ -163,7 +163,7 @@ class HTTP1ConnectionStateMachineTests: XCTestCase { ) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, .init([responseBody]))) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.close, .init([responseBody]))) XCTAssertEqual(state.channelInactive(), .fireChannelInactive) } @@ -190,7 +190,7 @@ class HTTP1ConnectionStateMachineTests: XCTestCase { ) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.informConnectionIsIdle, .init([responseBody]))) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.informConnectionIsIdle, .init([responseBody]))) XCTAssertEqual(state.channelInactive(), .fireChannelInactive) } @@ -214,7 +214,7 @@ class HTTP1ConnectionStateMachineTests: XCTestCase { ) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, .init([responseBody]))) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.close, .init([responseBody]))) } func testNIOTriggersChannelActiveTwice() { @@ -367,7 +367,7 @@ class HTTP1ConnectionStateMachineTests: XCTestCase { state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false) ) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, [])) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.close, [])) } func testWeDontCrashAfterEarlyHintsAndConnectionClose() { @@ -445,8 +445,8 @@ extension HTTP1ConnectionStateMachine.Action: Equatable { return lhsData == rhsData case ( - .succeedRequest(let lhsFinalAction, let lhsFinalBuffer), - .succeedRequest(let rhsFinalAction, let rhsFinalBuffer) + .forwardResponseEnd(let lhsFinalAction, let lhsFinalBuffer), + .forwardResponseEnd(let rhsFinalAction, let rhsFinalBuffer) ): return lhsFinalAction == rhsFinalAction && lhsFinalBuffer == rhsFinalBuffer @@ -468,24 +468,6 @@ extension HTTP1ConnectionStateMachine.Action: Equatable { } } -extension HTTP1ConnectionStateMachine.Action.FinalSuccessfulStreamAction: Equatable { - public static func == ( - lhs: HTTP1ConnectionStateMachine.Action.FinalSuccessfulStreamAction, - rhs: HTTP1ConnectionStateMachine.Action.FinalSuccessfulStreamAction - ) -> Bool { - switch (lhs, rhs) { - case (.close, .close): - return true - case (sendRequestEnd(let lhsPromise, let lhsShouldClose), sendRequestEnd(let rhsPromise, let rhsShouldClose)): - return lhsPromise?.futureResult == rhsPromise?.futureResult && lhsShouldClose == rhsShouldClose - case (informConnectionIsIdle, informConnectionIsIdle): - return true - default: - return false - } - } -} - extension HTTP1ConnectionStateMachine.Action.FinalFailedStreamAction: Equatable { public static func == ( lhs: HTTP1ConnectionStateMachine.Action.FinalFailedStreamAction, diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift index 99a61fe47..6fbdda385 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift @@ -126,6 +126,10 @@ final private class MockScheduledRequest: HTTPSchedulableRequest { preconditionFailure("Unimplemented") } + func requestBodyStreamSent() { + preconditionFailure("Unimplemented") + } + func receiveResponseHead(_: HTTPResponseHead) { preconditionFailure("Unimplemented") } diff --git a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift index 8fe879745..b5394ce8e 100644 --- a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift @@ -37,7 +37,7 @@ class HTTPRequestStateMachineTests: XCTestCase { ) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init([responseBody]))) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init([responseBody]))) XCTAssertEqual(state.channelReadComplete(), .wait) } @@ -77,7 +77,7 @@ class HTTPRequestStateMachineTests: XCTestCase { // once we receive a writable event again, we can allow the producer to produce more data XCTAssertEqual(state.writabilityChanged(writable: true), .resumeRequestBodyStream) XCTAssertEqual(state.requestStreamPartReceived(part3, promise: nil), .sendBodyPart(part3, nil)) - XCTAssertEqual(state.requestStreamFinished(promise: nil), .sendRequestEnd(nil)) + XCTAssertEqual(state.requestStreamFinished(promise: nil), .sendRequestEnd(nil, .none)) let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) XCTAssertEqual( @@ -86,7 +86,7 @@ class HTTPRequestStateMachineTests: XCTestCase { ) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init([responseBody]))) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init([responseBody]))) XCTAssertEqual(state.channelReadComplete(), .wait) } @@ -169,7 +169,7 @@ class HTTPRequestStateMachineTests: XCTestCase { "Expected to drop all stream data after having received a response head, with status >= 300" ) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, .init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.close, .init())) XCTAssertEqual( state.requestStreamPartReceived(part, promise: nil), @@ -230,7 +230,7 @@ class HTTPRequestStateMachineTests: XCTestCase { "Expected to drop all stream data after having received a response head, with status >= 300" ) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, .init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.close, .init())) XCTAssertEqual( state.requestStreamPartReceived(part, promise: nil), @@ -267,13 +267,13 @@ class HTTPRequestStateMachineTests: XCTestCase { state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false) ) - XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseBodyParts(.init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init())) let part1 = IOData.byteBuffer(ByteBuffer(bytes: 4...7)) XCTAssertEqual(state.requestStreamPartReceived(part1, promise: nil), .sendBodyPart(part1, nil)) let part2 = IOData.byteBuffer(ByteBuffer(bytes: 8...11)) XCTAssertEqual(state.requestStreamPartReceived(part2, promise: nil), .sendBodyPart(part2, nil)) - XCTAssertEqual(state.requestStreamFinished(promise: nil), .succeedRequest(.sendRequestEnd(nil), .init())) + XCTAssertEqual(state.requestStreamFinished(promise: nil), .sendRequestEnd(nil, .none)) XCTAssertEqual( state.requestStreamPartReceived(part2, promise: nil), @@ -308,9 +308,9 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.requestStreamPartReceived(part1, promise: nil), .sendBodyPart(part1, nil)) let part2 = IOData.byteBuffer(ByteBuffer(bytes: 8...11)) XCTAssertEqual(state.requestStreamPartReceived(part2, promise: nil), .sendBodyPart(part2, nil)) - XCTAssertEqual(state.requestStreamFinished(promise: nil), .sendRequestEnd(nil)) + XCTAssertEqual(state.requestStreamFinished(promise: nil), .sendRequestEnd(nil, .none)) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init())) } func testRequestIsFailedIfRequestBodySizeIsWrongEvenAfterServerRespondedWith200() { @@ -387,7 +387,7 @@ class HTTPRequestStateMachineTests: XCTestCase { ) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init([responseBody]))) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init([responseBody]))) XCTAssertEqual(state.channelInactive(), .wait) } @@ -430,7 +430,7 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part2]))) XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) XCTAssertEqual(state.read(), .read) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init())) XCTAssertEqual(state.channelReadComplete(), .wait) XCTAssertEqual(state.read(), .read) XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) @@ -467,7 +467,7 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.channelReadComplete(), .wait) XCTAssertEqual(state.read(), .read) XCTAssertEqual(state.channelRead(.body(part2)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init([part2]))) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init([part2]))) XCTAssertEqual(state.channelReadComplete(), .wait) XCTAssertEqual(state.read(), .read) XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) @@ -513,7 +513,7 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part2]))) XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) XCTAssertEqual(state.read(), .read) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init())) XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) XCTAssertEqual(state.read(), .read) } @@ -551,7 +551,7 @@ class HTTPRequestStateMachineTests: XCTestCase { ) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init([responseBody]))) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init([responseBody]))) XCTAssertEqual(state.channelReadComplete(), .wait) } @@ -630,7 +630,7 @@ class HTTPRequestStateMachineTests: XCTestCase { state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false) ) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init())) XCTAssertEqual(state.channelReadComplete(), .wait) XCTAssertEqual(state.read(), .read) } @@ -649,7 +649,7 @@ class HTTPRequestStateMachineTests: XCTestCase { state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false) ) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init())) XCTAssertEqual(state.idleReadTimeoutTriggered(), .wait, "A read timeout that fires to late must be ignored") } @@ -667,7 +667,7 @@ class HTTPRequestStateMachineTests: XCTestCase { state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false) ) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init())) XCTAssertEqual(state.requestCancelled(), .wait, "A cancellation that happens to late is ignored") } @@ -705,7 +705,7 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.channelReadComplete(), .wait) XCTAssertEqual(state.read(), .read) XCTAssertEqual(state.channelReadComplete(), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, [])) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.close, [])) XCTAssertEqual(state.channelInactive(), .wait) } @@ -729,7 +729,7 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.read(), .read) XCTAssertEqual(state.channelReadComplete(), .wait) XCTAssertEqual(state.channelRead(.body(body)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, [body])) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.close, [body])) XCTAssertEqual(state.channelInactive(), .wait) } @@ -951,7 +951,7 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.channelRead(.body(part3)), .wait) XCTAssertEqual(state.channelReadComplete(), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, [part1, part2, part3])) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.close, [part1, part2, part3])) XCTAssertEqual(state.channelReadComplete(), .wait) XCTAssertEqual(state.channelInactive(), .wait) @@ -973,8 +973,8 @@ extension HTTPRequestStateMachine.Action: Equatable { case (.sendBodyPart(let lhsData, let lhsPromise), .sendBodyPart(let rhsData, let rhsPromise)): return lhsData == rhsData && lhsPromise?.futureResult == rhsPromise?.futureResult - case (.sendRequestEnd(let lhsPromise), .sendRequestEnd(let rhsPromise)): - return lhsPromise?.futureResult == rhsPromise?.futureResult + case (.sendRequestEnd(let lhsPromise, let lhsAction), .sendRequestEnd(let rhsPromise, let rhsAction)): + return lhsPromise?.futureResult == rhsPromise?.futureResult && lhsAction == rhsAction case (.pauseRequestBodyStream, .pauseRequestBodyStream): return true @@ -991,8 +991,8 @@ extension HTTPRequestStateMachine.Action: Equatable { return lhsData == rhsData case ( - .succeedRequest(let lhsFinalAction, let lhsFinalBuffer), - .succeedRequest(let rhsFinalAction, let rhsFinalBuffer) + .forwardResponseEnd(let lhsFinalAction, let lhsFinalBuffer), + .forwardResponseEnd(let rhsFinalAction, let rhsFinalBuffer) ): return lhsFinalAction == rhsFinalAction && lhsFinalBuffer == rhsFinalBuffer @@ -1023,27 +1023,6 @@ extension HTTPRequestStateMachine.Action: Equatable { } } -extension HTTPRequestStateMachine.Action.FinalSuccessfulRequestAction: Equatable { - public static func == ( - lhs: HTTPRequestStateMachine.Action.FinalSuccessfulRequestAction, - rhs: HTTPRequestStateMachine.Action.FinalSuccessfulRequestAction - ) -> Bool { - switch (lhs, rhs) { - case (.close, close): - return true - - case (.sendRequestEnd(let lhsPromise), .sendRequestEnd(let rhsPromise)): - return lhsPromise?.futureResult == rhsPromise?.futureResult - - case (.none, .none): - return true - - default: - return false - } - } -} - extension HTTPRequestStateMachine.Action.FinalFailedRequestAction: Equatable { public static func == ( lhs: HTTPRequestStateMachine.Action.FinalFailedRequestAction, diff --git a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift index 756758131..bd8f0736a 100644 --- a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift +++ b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift @@ -749,6 +749,10 @@ final class MockHTTPScheduableRequest: HTTPSchedulableRequest { preconditionFailure("Unimplemented") } + func requestBodyStreamSent() { + preconditionFailure("Unimplemented") + } + func receiveResponseHead(_: HTTPResponseHead) { preconditionFailure("Unimplemented") } diff --git a/Tests/AsyncHTTPClientTests/Mocks/MockHTTPExecutableRequest.swift b/Tests/AsyncHTTPClientTests/Mocks/MockHTTPExecutableRequest.swift index 61b59b7b7..3347cac2e 100644 --- a/Tests/AsyncHTTPClientTests/Mocks/MockHTTPExecutableRequest.swift +++ b/Tests/AsyncHTTPClientTests/Mocks/MockHTTPExecutableRequest.swift @@ -28,9 +28,10 @@ final class MockHTTPExecutableRequest: HTTPExecutableRequest { case requestHeadSent case resumeRequestBodyStream case pauseRequestBodyStream + case requestBodySent case receiveResponseHead case receiveResponseBodyParts - case succeedRequest + case receiveResponseEnd case fail } @@ -38,9 +39,10 @@ final class MockHTTPExecutableRequest: HTTPExecutableRequest { case requestHeadSent case resumeRequestBodyStream case pauseRequestBodyStream + case requestBodySent case receiveResponseHead(HTTPResponseHead) case receiveResponseBodyParts(CircularBuffer) - case succeedRequest(CircularBuffer?) + case receiveResponseEnd(CircularBuffer?, HTTPHeaders?) case fail(Error) var kind: Kind { @@ -49,9 +51,10 @@ final class MockHTTPExecutableRequest: HTTPExecutableRequest { case .requestHeadSent: return .requestHeadSent case .resumeRequestBodyStream: return .resumeRequestBodyStream case .pauseRequestBodyStream: return .pauseRequestBodyStream + case .requestBodySent: return .requestBodySent case .receiveResponseHead: return .receiveResponseHead case .receiveResponseBodyParts: return .receiveResponseBodyParts - case .succeedRequest: return .succeedRequest + case .receiveResponseEnd: return .receiveResponseEnd case .fail: return .fail } } @@ -73,9 +76,10 @@ final class MockHTTPExecutableRequest: HTTPExecutableRequest { let requestHeadSentCallback: (@Sendable () -> Void)? = nil let resumeRequestBodyStreamCallback: (@Sendable () -> Void)? = nil let pauseRequestBodyStreamCallback: (@Sendable () -> Void)? = nil + let requestBodyStreamSentCallback: (@Sendable () -> Void)? = nil let receiveResponseHeadCallback: (@Sendable (HTTPResponseHead) -> Void)? = nil let receiveResponseBodyPartsCallback: (@Sendable (CircularBuffer) -> Void)? = nil - let succeedRequestCallback: (@Sendable (CircularBuffer?) -> Void)? = nil + let receiveResponseEndCallback: (@Sendable (CircularBuffer?, HTTPHeaders?) -> Void)? = nil let failCallback: (@Sendable (Error) -> Void)? = nil /// captures all ``HTTPExecutableRequest`` method calls in the order of occurrence, including arguments. @@ -141,6 +145,14 @@ final class MockHTTPExecutableRequest: HTTPExecutableRequest { pauseRequestBodyStreamCallback() } + func requestBodyStreamSent() { + self.events.append(.requestBodySent) + guard let requestBodyStreamSentCallback = self.requestBodyStreamSentCallback else { + return self.calledUnimplementedMethod(#function) + } + requestBodyStreamSentCallback() + } + func receiveResponseHead(_ head: HTTPResponseHead) { self.events.append(.receiveResponseHead(head)) guard let receiveResponseHeadCallback = receiveResponseHeadCallback else { @@ -158,11 +170,11 @@ final class MockHTTPExecutableRequest: HTTPExecutableRequest { } func receiveResponseEnd(_ buffer: CircularBuffer?, trailers: HTTPHeaders?) { - self.events.append(.succeedRequest(buffer)) - guard let succeedRequestCallback = succeedRequestCallback else { + self.events.append(.receiveResponseEnd(buffer, nil)) + guard let receiveResponseEndCallback = self.receiveResponseEndCallback else { return self.calledUnimplementedMethod(#function) } - succeedRequestCallback(buffer) + receiveResponseEndCallback(buffer, nil) } func fail(_ error: Error) { From ed72a0d4a0c5392eeefc6bb4f134a0fd05cd5313 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Thu, 8 Jan 2026 14:17:55 +0100 Subject: [PATCH 03/14] Forward progress --- .../AsyncAwait/Transaction+StateMachine.swift | 26 ++++++++++++++++--- .../AsyncAwait/Transaction.swift | 10 +++++-- .../HTTP1/HTTP1ClientChannelHandler.swift | 3 ++- .../HTTPRequestStateMachine.swift | 2 +- 4 files changed, 34 insertions(+), 7 deletions(-) diff --git a/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift b/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift index a33844c28..5adcceae8 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift @@ -98,7 +98,8 @@ extension Transaction { bodyStreamContinuation: CheckedContinuation? ) - case failRequestStreamContinuation(CheckedContinuation, Error) + case failRequestStreamContinuation(CheckedContinuation, Error, HTTPRequestExecutor) + case cancelExecutor(HTTPRequestExecutor) } mutating func fail(_ error: Error) -> FailAction { @@ -161,8 +162,23 @@ extension Transaction { return .failResponseStream(source, error, context.executor, bodyStreamContinuation: nil) } - case .finished(error: _), - .executing(_, _, .finished): + case .executing(let context, let requestStreamState, .finished): + // an error occured after full response received, but before the full request was sent + self.state = .finished(error: error) + switch requestStreamState { + case .paused(let bodyStreamContinuation): + if let bodyStreamContinuation { + return .failRequestStreamContinuation( + bodyStreamContinuation, error, context.executor + ) + } else { + return .cancelExecutor(context.executor) + } + case .endForwarded, .finished, .producing, .requestHeadSent: + return .cancelExecutor(context.executor) + } + + case .finished(error: _): return .none } } @@ -199,6 +215,10 @@ extension Transaction { } } + mutating func requestHeadSent() { + + } + enum ResumeProducingAction { case startStream(ByteBufferAllocator) case resumeStream(CheckedContinuation) diff --git a/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift b/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift index e64400e88..8dbdb8856 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift @@ -206,7 +206,9 @@ extension Transaction: HTTPExecutableRequest { } } - func requestHeadSent() {} + func requestHeadSent() { +// state. + } func resumeRequestBodyStream() { let action = self.state.withLockedValue { state in @@ -331,8 +333,12 @@ extension Transaction: HTTPExecutableRequest { requestBodyStreamContinuation?.resume(throwing: error) executor.cancelRequest(self) - case .failRequestStreamContinuation(let bodyStreamContinuation, let error): + case .failRequestStreamContinuation(let bodyStreamContinuation, let error, let executor): bodyStreamContinuation.resume(throwing: error) + executor.cancelRequest(self) + + case .cancelExecutor(let executor): + executor.cancelRequest(self) } } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift index 0105237a4..e1869bc90 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift @@ -345,12 +345,12 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { // other way around. let oldRequest = self.request! - self.request = nil self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context) self.runTimeoutAction(.clearIdleWriteTimeoutTimer, context: context) switch finalAction { case .close: + self.request = nil context.close(promise: nil) oldRequest.receiveResponseEnd(buffer, trailers: nil) @@ -358,6 +358,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { oldRequest.receiveResponseEnd(buffer, trailers: nil) case .informConnectionIsIdle: + self.request = nil self.onConnectionIdle() oldRequest.receiveResponseEnd(buffer, trailers: nil) } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift index 58b8d7e37..408929478 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift @@ -681,7 +681,7 @@ struct HTTPRequestStateMachine { state = .finished switch action { case .none: - return .forwardResponseEnd(.none, remainingBuffer) + return .forwardResponseEnd(.requestDone, remainingBuffer) case .close: return .forwardResponseEnd(.close, remainingBuffer) } From 385c84283cca4a2d037319b6cc6bb14bdafe8624 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Mon, 19 Jan 2026 13:15:15 +0100 Subject: [PATCH 04/14] Tests pass --- .../HTTP1/HTTP1ClientChannelHandler.swift | 1 + .../HTTPRequestStateMachine.swift | 2 +- .../RequestBag+StateMachine.swift | 74 +++++++++++++++---- Sources/AsyncHTTPClient/RequestBag.swift | 33 +++++++-- .../HTTPRequestStateMachineTests.swift | 26 +++---- .../RequestBagTests.swift | 7 +- 6 files changed, 103 insertions(+), 40 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift index e1869bc90..f487f6cd0 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift @@ -24,6 +24,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { private var state: HTTP1ConnectionStateMachine = .init() { didSet { self.eventLoop.assertInEventLoop() +// print(self.state) } } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift index 408929478..f876a321b 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift @@ -395,7 +395,7 @@ struct HTTPRequestStateMachine { } self.state = .finished - return .sendRequestEnd(promise, .none) + return .sendRequestEnd(promise, .requestDone) case .failed(let error): return .failSendStreamFinished(error, promise) diff --git a/Sources/AsyncHTTPClient/RequestBag+StateMachine.swift b/Sources/AsyncHTTPClient/RequestBag+StateMachine.swift index abfad7312..7accdc51a 100644 --- a/Sources/AsyncHTTPClient/RequestBag+StateMachine.swift +++ b/Sources/AsyncHTTPClient/RequestBag+StateMachine.swift @@ -59,12 +59,15 @@ extension RequestBag { case initialized(RedirectHandler?) case buffering(CircularBuffer, next: Next) case waitingForRemote + case endReceived } private var state: State + private let requestFramingMetadata: RequestFramingMetadata - init(redirectHandler: RedirectHandler?) { + init(redirectHandler: RedirectHandler?, requestFramingMetadata: RequestFramingMetadata) { self.state = .initialized(redirectHandler) + self.requestFramingMetadata = requestFramingMetadata } } } @@ -100,6 +103,20 @@ extension RequestBag.StateMachine { case none } + mutating func requestHeadSent() { + switch self.state { + case .initialized: + fatalError() + case .executing(let executor, .initialized, let responseStream): + if self.requestFramingMetadata.body == .fixedSize(0) { + self.state = .executing(executor, .finished, responseStream) + } + + default: + break + } + } + mutating func willExecuteRequest(_ executor: HTTPRequestExecutor) -> WillExecuteRequestAction { switch self.state { case .initialized(let redirectHandler), .queued(_, let redirectHandler): @@ -143,7 +160,9 @@ extension RequestBag.StateMachine { // request bytes. Can be ignored. return .none - case .executing(_, .initialized, .buffering), .executing(_, .initialized, .waitingForRemote): + case .executing(_, .initialized, .buffering), + .executing(_, .initialized, .waitingForRemote), + .executing(_, .initialized, .endReceived): preconditionFailure("Invalid states: Response can not be received before request") case .redirected: @@ -239,6 +258,7 @@ extension RequestBag.StateMachine { enum FinishAction { case forwardStreamFinished(HTTPRequestExecutor, EventLoopPromise?) + case forwardStreamFinishedAndSucceedTask(HTTPRequestExecutor, EventLoopPromise?) case forwardStreamFailureAndFailTask(HTTPRequestExecutor, Error, EventLoopPromise?) case none } @@ -254,8 +274,15 @@ extension RequestBag.StateMachine { case .producing: switch result { case .success: - self.state = .executing(executor, .finished, responseState) - return .forwardStreamFinished(executor, nil) + switch responseState { + case .initialized, .buffering, .waitingForRemote: + self.state = .executing(executor, .finished, responseState) + return .forwardStreamFinished(executor, nil) + case .endReceived: + self.state = .finished(error: nil) + return .forwardStreamFinishedAndSucceedTask(executor, nil) + } + case .failure(let error): self.state = .finished(error: error) return .forwardStreamFailureAndFailTask(executor, error, nil) @@ -264,8 +291,15 @@ extension RequestBag.StateMachine { case .paused(let promise): switch result { case .success: - self.state = .executing(executor, .finished, responseState) - return .forwardStreamFinished(executor, promise) + switch responseState { + case .initialized, .buffering, .waitingForRemote: + self.state = .executing(executor, .finished, responseState) + return .forwardStreamFinished(executor, promise) + case .endReceived: + self.state = .finished(error: nil) + return .forwardStreamFinishedAndSucceedTask(executor, promise) + } + case .failure(let error): self.state = .finished(error: error) return .forwardStreamFailureAndFailTask(executor, error, promise) @@ -346,7 +380,7 @@ extension RequestBag.StateMachine { switch self.state { case .initialized, .queued, .deadlineExceededWhileQueued: preconditionFailure("How can we receive a response body part, if the request hasn't started yet.") - case .executing(_, _, .initialized): + case .executing(_, _, .initialized), .executing(_, _, .endReceived): preconditionFailure("If we receive a response body, we must have received a head before") case .executing(let executor, let requestState, .buffering(var currentBuffer, next: let next)): @@ -405,7 +439,7 @@ extension RequestBag.StateMachine { switch self.state { case .initialized, .queued, .deadlineExceededWhileQueued: preconditionFailure("How can we receive a response body part, if the request hasn't started yet.") - case .executing(_, _, .initialized): + case .executing(_, _, .initialized), .executing(_, _, .endReceived): preconditionFailure("If we receive a response body, we must have received a head before") case .executing(let executor, let requestState, .buffering(var buffer, next: let next)): @@ -426,8 +460,15 @@ extension RequestBag.StateMachine { case .executing(let executor, let requestState, .waitingForRemote): guard var newChunks = newChunks, !newChunks.isEmpty else { - self.state = .finished(error: nil) - return .succeedRequest + switch requestState { + case .initialized, .paused, .producing: + self.state = .executing(executor, requestState, .endReceived) + return .none + + case .finished: + self.state = .finished(error: nil) + return .succeedRequest + } } let first = newChunks.removeFirst() @@ -484,7 +525,7 @@ extension RequestBag.StateMachine { self.state = .finished(error: error) return .failTask(error, executorToCancel: executor) - case .executing(_, _, .waitingForRemote): + case .executing(_, _, .waitingForRemote), .executing(_, _, .endReceived): preconditionFailure( "Invalid state... We just returned from a consumption function. We can't already be waiting" ) @@ -550,6 +591,10 @@ extension RequestBag.StateMachine { "Invalid state... We just returned from a consumption function. We can't already be waiting" ) + case .executing(_, _, .endReceived): + // we can't succeed the request here, as we have not sent all request parts. + return .doNothing + case .redirected: return .doNothing @@ -614,10 +659,9 @@ extension RequestBag.StateMachine { case .executing(let executor, _, .buffering(_, next: .error(_))): // this would override another error, let's keep the first one return .cancelExecutor(executor) - case .executing(let executor, _, .initialized): - self.state = .finished(error: error) - return .failTask(error, nil, executor) - case .executing(let executor, _, .waitingForRemote): + case .executing(let executor, _, .initialized), + .executing(let executor, _, .waitingForRemote), + .executing(let executor, _, .endReceived): self.state = .finished(error: error) return .failTask(error, nil, executor) case .redirected: diff --git a/Sources/AsyncHTTPClient/RequestBag.swift b/Sources/AsyncHTTPClient/RequestBag.swift index 132d01798..852c524f5 100644 --- a/Sources/AsyncHTTPClient/RequestBag.swift +++ b/Sources/AsyncHTTPClient/RequestBag.swift @@ -100,9 +100,13 @@ final class RequestBag: Sendabl self.eventLoopPreference = eventLoopPreference self.task = task + let (head, metadata) = try request.createRequestHead() + self.requestHead = head + self.requestFramingMetadata = metadata + let loopBoundState = LoopBoundState( request: request, - state: StateMachine(redirectHandler: redirectHandler), + state: StateMachine(redirectHandler: redirectHandler, requestFramingMetadata: metadata), consumeBodyPartStackDepth: 0, tracing: task.tracing ) @@ -111,10 +115,6 @@ final class RequestBag: Sendabl self.requestOptions = requestOptions self.delegate = delegate - let (head, metadata) = try request.createRequestHead() - self.requestHead = head - self.requestFramingMetadata = metadata - self.tlsConfiguration = request.tlsConfiguration self.task.taskDelegate = self @@ -150,9 +150,11 @@ final class RequestBag: Sendabl } private func requestHeadSent0() { + self.loopBoundState.value.state.requestHeadSent() + self.delegate.didSendRequestHead(task: self.task, self.requestHead) - if self.loopBoundState.value.request.body == nil { + if self.requestFramingMetadata.body == .fixedSize(0) { self.delegate.didSendRequest(task: self.task) } } @@ -236,6 +238,25 @@ final class RequestBag: Sendabl } writer.finishRequestBodyStream(self, promise: promise) + case .forwardStreamFinishedAndSucceedTask(let writer, let writerPromise): + let promise = writerPromise ?? self.task.eventLoop.makePromise(of: Void.self) + promise.futureResult.whenComplete { result in + switch result { + case .success: + self.delegate.didSendRequest(task: self.task) + do { + let response = try self.delegate.didFinishRequest(task: self.task) + self.task.promise.succeed(response) + } catch { + self.task.promise.fail(error) + } + + case .failure(let error): + self.task.promise.fail(error) + } + } + writer.finishRequestBodyStream(self, promise: promise) + case .forwardStreamFailureAndFailTask(let writer, let error, let promise): writer.cancelRequest(self) promise?.fail(error) diff --git a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift index b5394ce8e..c5823ff52 100644 --- a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift @@ -37,7 +37,7 @@ class HTTPRequestStateMachineTests: XCTestCase { ) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init([responseBody]))) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.requestDone, .init([responseBody]))) XCTAssertEqual(state.channelReadComplete(), .wait) } @@ -86,7 +86,7 @@ class HTTPRequestStateMachineTests: XCTestCase { ) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init([responseBody]))) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.requestDone, .init([responseBody]))) XCTAssertEqual(state.channelReadComplete(), .wait) } @@ -273,7 +273,7 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.requestStreamPartReceived(part1, promise: nil), .sendBodyPart(part1, nil)) let part2 = IOData.byteBuffer(ByteBuffer(bytes: 8...11)) XCTAssertEqual(state.requestStreamPartReceived(part2, promise: nil), .sendBodyPart(part2, nil)) - XCTAssertEqual(state.requestStreamFinished(promise: nil), .sendRequestEnd(nil, .none)) + XCTAssertEqual(state.requestStreamFinished(promise: nil), .sendRequestEnd(nil, .requestDone)) XCTAssertEqual( state.requestStreamPartReceived(part2, promise: nil), @@ -310,7 +310,7 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.requestStreamPartReceived(part2, promise: nil), .sendBodyPart(part2, nil)) XCTAssertEqual(state.requestStreamFinished(promise: nil), .sendRequestEnd(nil, .none)) - XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.requestDone, .init())) } func testRequestIsFailedIfRequestBodySizeIsWrongEvenAfterServerRespondedWith200() { @@ -335,7 +335,7 @@ class HTTPRequestStateMachineTests: XCTestCase { state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false) ) - XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseBodyParts(.init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, [])) let part1 = IOData.byteBuffer(ByteBuffer(bytes: 4...7)) XCTAssertEqual(state.requestStreamPartReceived(part1, promise: nil), .sendBodyPart(part1, nil)) @@ -387,7 +387,7 @@ class HTTPRequestStateMachineTests: XCTestCase { ) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init([responseBody]))) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.requestDone, .init([responseBody]))) XCTAssertEqual(state.channelInactive(), .wait) } @@ -430,7 +430,7 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part2]))) XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) XCTAssertEqual(state.read(), .read) - XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.requestDone, .init())) XCTAssertEqual(state.channelReadComplete(), .wait) XCTAssertEqual(state.read(), .read) XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) @@ -467,7 +467,7 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.channelReadComplete(), .wait) XCTAssertEqual(state.read(), .read) XCTAssertEqual(state.channelRead(.body(part2)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init([part2]))) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.requestDone, .init([part2]))) XCTAssertEqual(state.channelReadComplete(), .wait) XCTAssertEqual(state.read(), .read) XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) @@ -513,7 +513,7 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part2]))) XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) XCTAssertEqual(state.read(), .read) - XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.requestDone, .init())) XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) XCTAssertEqual(state.read(), .read) } @@ -551,7 +551,7 @@ class HTTPRequestStateMachineTests: XCTestCase { ) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init([responseBody]))) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.requestDone, .init([responseBody]))) XCTAssertEqual(state.channelReadComplete(), .wait) } @@ -630,7 +630,7 @@ class HTTPRequestStateMachineTests: XCTestCase { state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false) ) - XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.requestDone, .init())) XCTAssertEqual(state.channelReadComplete(), .wait) XCTAssertEqual(state.read(), .read) } @@ -649,7 +649,7 @@ class HTTPRequestStateMachineTests: XCTestCase { state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false) ) - XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.requestDone, .init())) XCTAssertEqual(state.idleReadTimeoutTriggered(), .wait, "A read timeout that fires to late must be ignored") } @@ -667,7 +667,7 @@ class HTTPRequestStateMachineTests: XCTestCase { state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false) ) - XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.none, .init())) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.requestDone, .init())) XCTAssertEqual(state.requestCancelled(), .wait, "A cancellation that happens to late is ignored") } diff --git a/Tests/AsyncHTTPClientTests/RequestBagTests.swift b/Tests/AsyncHTTPClientTests/RequestBagTests.swift index e68bc3f2a..6d985cbac 100644 --- a/Tests/AsyncHTTPClientTests/RequestBagTests.swift +++ b/Tests/AsyncHTTPClientTests/RequestBagTests.swift @@ -601,11 +601,6 @@ final class RequestBagTests: XCTestCase { writer.write(.byteBuffer(.init(bytes: 4...7))) }.always { result in XCTAssertTrue(firstWriteSuccess.withLockedValue { $0 }) - - guard case .failure(let error) = result else { - return XCTFail("Expected the second write to fail") - } - XCTAssertEqual(error as? HTTPClientError, .requestStreamCancelled) } } ) @@ -641,9 +636,11 @@ final class RequestBagTests: XCTestCase { bag.receiveResponseHead(.init(version: .http1_1, status: .movedPermanently)) XCTAssertNoThrow(try XCTUnwrap(delegate.backpressurePromise).succeed(())) bag.receiveResponseEnd([], trailers: nil) + XCTAssertEqual(delegate.hitDidReceiveResponse, 0) // if we now write our second part of the response this should fail the backpressure promise writeSecondPartPromise.succeed(()) + XCTAssertEqual(delegate.hitDidReceiveResponse, 1) XCTAssertEqual(delegate.receivedHead?.status, .movedPermanently) XCTAssertNoThrow(try bag.task.futureResult.wait()) From 7a4c4b280721609902ebce976ce15b8832c04134 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Mon, 19 Jan 2026 15:21:19 +0100 Subject: [PATCH 05/14] Added a new integration test! --- .../AsyncAwait/Transaction+StateMachine.swift | 6 -- .../AsyncAwait/Transaction.swift | 2 +- .../BidirectionalStreamingTests.swift | 96 +++++++++++++++++++ .../RequestBagTests.swift | 8 ++ 4 files changed, 105 insertions(+), 7 deletions(-) create mode 100644 Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift diff --git a/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift b/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift index 5adcceae8..40d30b58f 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift @@ -215,10 +215,6 @@ extension Transaction { } } - mutating func requestHeadSent() { - - } - enum ResumeProducingAction { case startStream(ByteBufferAllocator) case resumeStream(CheckedContinuation) @@ -400,11 +396,9 @@ extension Transaction { case .executing(_, .endForwarded, .finished): self.state = .finished(error: nil) -// return . case .executing(let context, .endForwarded, let responseState): self.state = .executing(context, .finished, responseState) -// return . case .finished: break diff --git a/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift b/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift index 8dbdb8856..d550b5296 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift @@ -207,7 +207,7 @@ extension Transaction: HTTPExecutableRequest { } func requestHeadSent() { -// state. + // protocol requirement. Intentionally not needed. } func resumeRequestBodyStream() { diff --git a/Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift b/Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift new file mode 100644 index 000000000..b62386abe --- /dev/null +++ b/Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift @@ -0,0 +1,96 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2026 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Testing +import AsyncHTTPClient +import NIOCore +import NIOHTTP1 + +@Suite("Request and response streaming") +struct BidirectionalStreamingTests { + @Test func requestStreamCanOutliveResponse() async throws { + let (bodyStream, bodyWriteContinuation) = AsyncStream.makeStream(of: AsyncStream.self) + let httpBin = HTTPBin { _ in + let (stream, continuation) = AsyncStream.makeStream(of: ByteBuffer.self) + bodyWriteContinuation.yield(stream) + return HTTPRequestStreamingChannel(bodyStreamContinuation: continuation) + } + + defer { #expect(throws: Never.self) { try httpBin.shutdown() } } + + let httpClient = HTTPClient(eventLoopGroupProvider: .singleton) + + var request = HTTPClientRequest(url: "http://localhost:\(httpBin.port)") + let (stream, continuation) = AsyncStream.makeStream(of: ByteBuffer.self) + request.body = .stream(stream, length: .unknown) + + await #expect(throws: Never.self) { + let response = try await httpClient.execute(request, timeout: .seconds(60), logger: nil) + var iterator = response.body.makeAsyncIterator() + #expect(try await iterator.next() == nil) + } + + var bodyStreamIterator = bodyStream.makeAsyncIterator() + + let serverRequestStream = await bodyStreamIterator.next() + guard let serverRequestStream else { + Issue.record("Could not get the server request stream") + return + } + var receivedWritesIterator = serverRequestStream.makeAsyncIterator() + + let payload1 = ByteBuffer(string: "Hello World! 1") + continuation.yield(payload1) + #expect(await receivedWritesIterator.next() == payload1) + let payload2 = ByteBuffer(string: "Hello World! 2") + continuation.yield(payload2) + #expect(await receivedWritesIterator.next() == payload2) + let payload3 = ByteBuffer(string: "Hello World! 3") + continuation.yield(payload3) + #expect(await receivedWritesIterator.next() == payload3) + let payload4 = ByteBuffer(string: "Hello World! 4") + continuation.yield(payload4) + #expect(await receivedWritesIterator.next() == payload4) + continuation.finish() + #expect(await receivedWritesIterator.next() == nil) + } +} + +final class HTTPRequestStreamingChannel: ChannelInboundHandler & AHCTestSendableMetatype { + typealias InboundIn = HTTPServerRequestPart + typealias OutboundOut = HTTPServerResponsePart + + let bodyStreamContinuation: AsyncStream.Continuation + + init(bodyStreamContinuation: AsyncStream.Continuation) { + self.bodyStreamContinuation = bodyStreamContinuation + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let reqPart = self.unwrapInboundIn(data) + switch reqPart { + case .head: + let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) + context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil) + context.write(self.wrapOutboundOut(.end(nil)), promise: nil) + context.flush() + case .body(let body): + self.bodyStreamContinuation.yield(body) + case .end: + self.bodyStreamContinuation.finish() + @unknown default: + Issue.record("Unhandled case: \(reqPart)") + } + } +} diff --git a/Tests/AsyncHTTPClientTests/RequestBagTests.swift b/Tests/AsyncHTTPClientTests/RequestBagTests.swift index 6d985cbac..51621c7a6 100644 --- a/Tests/AsyncHTTPClientTests/RequestBagTests.swift +++ b/Tests/AsyncHTTPClientTests/RequestBagTests.swift @@ -601,6 +601,14 @@ final class RequestBagTests: XCTestCase { writer.write(.byteBuffer(.init(bytes: 4...7))) }.always { result in XCTAssertTrue(firstWriteSuccess.withLockedValue { $0 }) + + switch result { + case .success: + // upload can now continue even after we have received the response end. + break + case .failure(let failure): + XCTFail("Unexpected error: \(failure)") + } } } ) From c05dbc22c99f3224001ef3504eda79c6ca61ceab Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Mon, 19 Jan 2026 15:33:13 +0100 Subject: [PATCH 06/14] Remove debug line --- .../ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift index f487f6cd0..e1869bc90 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift @@ -24,7 +24,6 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { private var state: HTTP1ConnectionStateMachine = .init() { didSet { self.eventLoop.assertInEventLoop() -// print(self.state) } } From a733912dda1ae46e248ac3c483dc786641298367 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Mon, 19 Jan 2026 16:50:21 +0100 Subject: [PATCH 07/14] swift format --- .../AsyncAwait/Transaction+StateMachine.swift | 4 +++- .../HTTP1/HTTP1ClientChannelHandler.swift | 6 +++++- .../BidirectionalStreamingTests.swift | 2 +- .../HTTP1ConnectionStateMachineTests.swift | 10 ++++++++-- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift b/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift index 40d30b58f..bb9d2807c 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift @@ -169,7 +169,9 @@ extension Transaction { case .paused(let bodyStreamContinuation): if let bodyStreamContinuation { return .failRequestStreamContinuation( - bodyStreamContinuation, error, context.executor + bodyStreamContinuation, + error, + context.executor ) } else { return .cancelExecutor(context.executor) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift index e1869bc90..529d71ec8 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift @@ -248,7 +248,11 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { // We need to defer succeeding the old request to avoid ordering issues writePromise.futureResult.hop(to: context.eventLoop).assumeIsolated().whenComplete { result in - let oldRequest = self.request! + guard let oldRequest = self.request else { + // in the meantime an error might have happened, which is why this request is + // not reference anymore. + return + } oldRequest.requestBodyStreamSent() switch result { case .success: diff --git a/Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift b/Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift index b62386abe..b07782c07 100644 --- a/Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift +++ b/Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift @@ -12,10 +12,10 @@ // //===----------------------------------------------------------------------===// -import Testing import AsyncHTTPClient import NIOCore import NIOHTTP1 +import Testing @Suite("Request and response streaming") struct BidirectionalStreamingTests { diff --git a/Tests/AsyncHTTPClientTests/HTTP1ConnectionStateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTP1ConnectionStateMachineTests.swift index c1f4ba0a1..a2fdfc51c 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ConnectionStateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ConnectionStateMachineTests.swift @@ -61,7 +61,10 @@ class HTTP1ConnectionStateMachineTests: XCTestCase { ) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.informConnectionIsIdle, .init([responseBody]))) + XCTAssertEqual( + state.channelRead(.end(nil)), + .forwardResponseEnd(.informConnectionIsIdle, .init([responseBody])) + ) XCTAssertEqual(state.channelReadComplete(), .wait) } @@ -190,7 +193,10 @@ class HTTP1ConnectionStateMachineTests: XCTestCase { ) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) - XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseEnd(.informConnectionIsIdle, .init([responseBody]))) + XCTAssertEqual( + state.channelRead(.end(nil)), + .forwardResponseEnd(.informConnectionIsIdle, .init([responseBody])) + ) XCTAssertEqual(state.channelInactive(), .fireChannelInactive) } From 15c2ee5d79bb3fa423c493f625dc191073b3879f Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 20 Jan 2026 12:49:14 +0100 Subject: [PATCH 08/14] Fix hanging test --- .../AsyncAwait/Transaction+StateMachine.swift | 9 +++++++++ Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift | 9 ++++++++- .../BidirectionalStreamingTests.swift | 6 ++++-- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift b/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift index bb9d2807c..53c3c3bf6 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift @@ -532,6 +532,15 @@ extension Transaction { } } + mutating func httpResponseStreamTerminated() -> FailAction { + switch self.state { + case .executing(_, _, .finished), .finished: + return .none + default: + return self.fail(HTTPClientError.cancelled) + } + } + enum DeadlineExceededAction { case none case cancelSchedulerOnly(scheduler: HTTPRequestScheduler) diff --git a/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift b/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift index d550b5296..5bbcfdf98 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift @@ -310,6 +310,13 @@ extension Transaction: HTTPExecutableRequest { } } + func httpResponseStreamTerminated() { + let action = self.state.withLockedValue { state in + state.httpResponseStreamTerminated() + } + self.performFailAction(action) + } + func fail(_ error: Error) { let action = self.state.withLockedValue { state in state.fail(error) @@ -381,6 +388,6 @@ extension Transaction: NIOAsyncSequenceProducerDelegate { @usableFromInline func didTerminate() { - self.fail(HTTPClientError.cancelled) + self.httpResponseStreamTerminated() } } diff --git a/Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift b/Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift index b07782c07..9a8c01410 100644 --- a/Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift +++ b/Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift @@ -21,7 +21,7 @@ import Testing struct BidirectionalStreamingTests { @Test func requestStreamCanOutliveResponse() async throws { let (bodyStream, bodyWriteContinuation) = AsyncStream.makeStream(of: AsyncStream.self) - let httpBin = HTTPBin { _ in + let httpBin = HTTPBin(.http1_1(ssl: false)) { _ in let (stream, continuation) = AsyncStream.makeStream(of: ByteBuffer.self) bodyWriteContinuation.yield(stream) return HTTPRequestStreamingChannel(bodyStreamContinuation: continuation) @@ -31,6 +31,8 @@ struct BidirectionalStreamingTests { let httpClient = HTTPClient(eventLoopGroupProvider: .singleton) + defer { #expect(throws: Never.self) { try httpClient.shutdown() } } + var request = HTTPClientRequest(url: "http://localhost:\(httpBin.port)") let (stream, continuation) = AsyncStream.makeStream(of: ByteBuffer.self) request.body = .stream(stream, length: .unknown) @@ -38,7 +40,7 @@ struct BidirectionalStreamingTests { await #expect(throws: Never.self) { let response = try await httpClient.execute(request, timeout: .seconds(60), logger: nil) var iterator = response.body.makeAsyncIterator() - #expect(try await iterator.next() == nil) + #expect(try await iterator.next() == nil) // response is finished. } var bodyStreamIterator = bodyStream.makeAsyncIterator() From d1acadaf54a7ad19ef7dfdc4e48c3d277bc4f721 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 20 Jan 2026 13:19:05 +0100 Subject: [PATCH 09/14] fix tests --- Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift b/Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift index 9a8c01410..68adda983 100644 --- a/Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift +++ b/Tests/AsyncHTTPClientTests/BidirectionalStreamingTests.swift @@ -31,7 +31,7 @@ struct BidirectionalStreamingTests { let httpClient = HTTPClient(eventLoopGroupProvider: .singleton) - defer { #expect(throws: Never.self) { try httpClient.shutdown() } } + defer { #expect(throws: Never.self) { try httpClient.syncShutdown() } } var request = HTTPClientRequest(url: "http://localhost:\(httpBin.port)") let (stream, continuation) = AsyncStream.makeStream(of: ByteBuffer.self) From 1cf8786735beb445fd8c1b1ea616c10ff181e58b Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Fri, 30 Jan 2026 12:47:07 +0100 Subject: [PATCH 10/14] PR comments addressed --- .../AsyncAwait/Transaction+StateMachine.swift | 14 +++++++++++--- .../AsyncHTTPClient/AsyncAwait/Transaction.swift | 9 ++++++++- .../HTTP1/HTTP1ConnectionStateMachine.swift | 8 ++++++-- Sources/AsyncHTTPClient/HTTPClient.swift | 8 ++++++++ Sources/AsyncHTTPClient/RequestBag.swift | 3 ++- 5 files changed, 35 insertions(+), 7 deletions(-) diff --git a/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift b/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift index 53c3c3bf6..4128998b9 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift @@ -385,7 +385,12 @@ extension Transaction { } } - mutating func requestBodyStreamSent() { + enum RequestBodyStreamSentAction { + case none + case failure(Error) + } + + mutating func requestBodyStreamSent() -> RequestBodyStreamSentAction { switch self.state { case .initialized, .queued, @@ -394,16 +399,19 @@ extension Transaction { .executing(_, .finished, _), .executing(_, .producing, _), .executing(_, .paused, _): - preconditionFailure("Invalid state: \(self.state)") + assertionFailure("Invalid state: \(self.state)") + return .failure(HTTPClientError.internalStateFailure()) case .executing(_, .endForwarded, .finished): self.state = .finished(error: nil) + return .none case .executing(let context, .endForwarded, let responseState): self.state = .executing(context, .finished, responseState) + return .none case .finished: - break + return .none } } diff --git a/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift b/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift index 5bbcfdf98..49104863c 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift @@ -248,9 +248,16 @@ extension Transaction: HTTPExecutableRequest { } func requestBodyStreamSent() { - self.state.withLockedValue { state in + let action = self.state.withLockedValue { state in state.requestBodyStreamSent() } + + switch action { + case .none: + break + case .failure(let error): + self.fail(error) + } } // MARK: Response diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift index 3b2da8bb6..60a98b333 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift @@ -429,7 +429,9 @@ extension HTTP1ConnectionStateMachine.State { return .sendBodyPart(part, writePromise) case .sendRequestEnd(let writePromise, let finalAction): guard case .inRequest(_, close: let close) = self else { - fatalError("Invalid state: \(self)") + assertionFailure("Invalid state: \(self)") + self = .closing + return .failRequest(HTTPClientError.internalStateFailure(), .close(writePromise)) } let newFinalAction: HTTP1ConnectionStateMachine.Action.FinalSuccessfulStreamAction @@ -456,7 +458,9 @@ extension HTTP1ConnectionStateMachine.State { return .forwardResponseBodyParts(parts) case .forwardResponseEnd(let finalAction, let finalParts): guard case .inRequest(_, close: let close) = self else { - fatalError("Invalid state: \(self)") + assertionFailure("Invalid state: \(self)") + self = .closing + return .failRequest(HTTPClientError.internalStateFailure(), .close(nil)) } let newFinalAction: HTTP1ConnectionStateMachine.Action.FinalSuccessfulStreamAction diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 80df3b946..583f3ef7b 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -1394,6 +1394,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible { case deadlineExceeded case httpEndReceivedAfterHeadWith1xx case shutdownUnsupported + case internalStateFailure(file: String, line: UInt) } private var code: Code @@ -1479,6 +1480,8 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible { return "HTTP end received after head with 1xx" case .shutdownUnsupported: return "The global singleton HTTP client cannot be shut down" + case .internalStateFailure(let file, let line): + return "An internal state failure has occurred (File: \(file), line: \(line)). Please open an issue with a reproducer if possible" } } @@ -1570,6 +1573,11 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible { /// - Tasks are not processed fast enough on the existing connections, to process all waiters in time public static let getConnectionFromPoolTimeout = HTTPClientError(code: .getConnectionFromPoolTimeout) + /// A state machine has reached an unsupported state, that wasn't considered when implementing. + public static func internalStateFailure(file: String = #fileID, line: UInt = #line) -> HTTPClientError { + HTTPClientError(code: .internalStateFailure(file: file, line: line)) + } + @available( *, deprecated, diff --git a/Sources/AsyncHTTPClient/RequestBag.swift b/Sources/AsyncHTTPClient/RequestBag.swift index 852c524f5..6bfbf09f4 100644 --- a/Sources/AsyncHTTPClient/RequestBag.swift +++ b/Sources/AsyncHTTPClient/RequestBag.swift @@ -190,7 +190,8 @@ final class RequestBag: Sendabl } private func requestBodyStreamSent0() { - + // Intentionally empty: This hook is provided for consistency with the protocol + // but requires no action in this implementation. } private func writeNextRequestPart(_ part: IOData) -> EventLoopFuture { From 47b628f5fbd847bca2abc88a4a3d6425828e307a Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Fri, 30 Jan 2026 12:53:02 +0100 Subject: [PATCH 11/14] Ensure `requestBodyStreamSent` is sent in H2 use-cases --- .../ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift index 244d2e8f7..f75d4691f 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift @@ -197,7 +197,13 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { context.writeAndFlush(self.wrapOutboundOut(.body(data)), promise: writePromise) case .sendRequestEnd(let writePromise, let finalAction): - context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise) + let promise = writePromise ?? context.eventLoop.makePromise(of: Void.self) + let request = self.request! + promise.futureResult.whenSuccess { + request.requestBodyStreamSent() + } + + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: promise) if let readTimeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() { self.runTimeoutAction(readTimeoutAction, context: context) From da9cc05b786e60f106af01f12c4e911b3b764eb5 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Mon, 2 Feb 2026 10:54:39 +0100 Subject: [PATCH 12/14] swift format fix --- Sources/AsyncHTTPClient/HTTPClient.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 583f3ef7b..d7a8b7f18 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -1481,7 +1481,8 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible { case .shutdownUnsupported: return "The global singleton HTTP client cannot be shut down" case .internalStateFailure(let file, let line): - return "An internal state failure has occurred (File: \(file), line: \(line)). Please open an issue with a reproducer if possible" + return + "An internal state failure has occurred (File: \(file), line: \(line)). Please open an issue with a reproducer if possible" } } From 7ecbd4ab15a648d5aa0f869a43a6467935f63158 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 3 Feb 2026 12:03:25 +0100 Subject: [PATCH 13/14] Code review --- .../ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift | 2 ++ .../ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift | 2 ++ 2 files changed, 4 insertions(+) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift index 529d71ec8..0801e2bdb 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift @@ -263,6 +263,8 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { switch finalAction { case .none: + // we must nil out the request here, as we are still uploading the request + // and therefore still need the reference to it. break case .informConnectionIsIdle: diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift index f75d4691f..3daa95289 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift @@ -198,6 +198,8 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { case .sendRequestEnd(let writePromise, let finalAction): let promise = writePromise ?? context.eventLoop.makePromise(of: Void.self) + // We can force unwrap the request here, as we have just validated in the state machine, + // that the request is neither failed nor finished yet let request = self.request! promise.futureResult.whenSuccess { request.requestBodyStreamSent() From 8d8d441b93bd7485de33a3b63e0ab77856814ae2 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 3 Feb 2026 12:11:05 +0100 Subject: [PATCH 14/14] Update Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift Co-authored-by: George Barnett --- .../ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift index 0801e2bdb..fc7e0af49 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift @@ -263,7 +263,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { switch finalAction { case .none: - // we must nil out the request here, as we are still uploading the request + // we must not nil out the request here, as we are still uploading the request // and therefore still need the reference to it. break