Skip to content
Merged
90 changes: 71 additions & 19 deletions Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ extension Transaction {
case requestHeadSent
case producing
case paused(continuation: CheckedContinuation<Void, Error>?)
case endForwarded
case finished
}

Expand Down Expand Up @@ -97,7 +98,8 @@ extension Transaction {
bodyStreamContinuation: CheckedContinuation<Void, Error>?
)

case failRequestStreamContinuation(CheckedContinuation<Void, Error>, Error)
case failRequestStreamContinuation(CheckedContinuation<Void, Error>, Error, HTTPRequestExecutor)
case cancelExecutor(HTTPRequestExecutor)
}

mutating func fail(_ error: Error) -> FailAction {
Expand Down Expand Up @@ -135,7 +137,7 @@ extension Transaction {
bodyStreamContinuation: continuation
)

case .requestHeadSent, .finished, .producing, .paused(continuation: .none):
case .requestHeadSent, .endForwarded, .finished, .producing, .paused(continuation: .none):
self.state = .finished(error: error)
return .failResponseHead(
context.continuation,
Expand All @@ -156,12 +158,29 @@ extension Transaction {
context.executor,
bodyStreamContinuation: bodyStreamContinuation
)
case .finished, .producing, .requestHeadSent:
case .endForwarded, .finished, .producing, .requestHeadSent:
return .failResponseStream(source, error, context.executor, bodyStreamContinuation: nil)
}

case .finished(error: _),
.executing(_, _, .finished):
case .executing(let context, let requestStreamState, .finished):
// an error occured after full response received, but before the full request was sent
self.state = .finished(error: error)
switch requestStreamState {
case .paused(let bodyStreamContinuation):
if let bodyStreamContinuation {
return .failRequestStreamContinuation(
bodyStreamContinuation,
error,
context.executor
)
} else {
return .cancelExecutor(context.executor)
}
case .endForwarded, .finished, .producing, .requestHeadSent:
return .cancelExecutor(context.executor)
}

case .finished(error: _):
return .none
}
}
Expand Down Expand Up @@ -232,7 +251,7 @@ extension Transaction {
self.state = .executing(context, .producing, responseState)
return .resumeStream(continuation)

case .executing(_, .finished, _):
case .executing(_, .endForwarded, _), .executing(_, .finished, _):
// the channels writability changed to writable after we have forwarded all the
// request bytes. Can be ignored.
return .none
Expand All @@ -254,6 +273,7 @@ extension Transaction {
self.state = .executing(context, .paused(continuation: nil), responseSteam)

case .executing(_, .paused, _),
.executing(_, .endForwarded, _),
.executing(_, .finished, _),
.finished:
// the channels writability changed to paused after we have already forwarded all
Expand Down Expand Up @@ -298,7 +318,7 @@ extension Transaction {
"A write continuation already exists, but we tried to set another one. Invalid state: \(self.state)"
)

case .finished, .executing(_, .finished, _):
case .finished, .executing(_, .endForwarded, _), .executing(_, .finished, _):
return .fail
}
}
Expand All @@ -309,6 +329,7 @@ extension Transaction {
.queued,
.deadlineExceededWhileQueued,
.executing(_, .requestHeadSent, _),
.executing(_, .endForwarded, _),
.executing(_, .finished, _):
preconditionFailure(
"A request stream can only produce, if the request was started. Invalid state: \(self.state)"
Expand Down Expand Up @@ -343,6 +364,7 @@ extension Transaction {
case .initialized,
.queued,
.deadlineExceededWhileQueued,
.executing(_, .endForwarded, _),
.executing(_, .finished, _):
preconditionFailure("Invalid state: \(self.state)")

Expand All @@ -355,17 +377,38 @@ extension Transaction {
.executing(let context, .paused(continuation: .none), let responseState),
.executing(let context, .requestHeadSent, let responseState):

switch responseState {
case .finished:
// if the response stream has already finished before the request, we must succeed
// the final continuation.
self.state = .finished(error: nil)
return .forwardStreamFinished(context.executor)
self.state = .executing(context, .endForwarded, responseState)
return .forwardStreamFinished(context.executor)

case .waitingForResponseHead, .streamingBody:
self.state = .executing(context, .finished, responseState)
return .forwardStreamFinished(context.executor)
}
case .finished:
return .none
}
}

enum RequestBodyStreamSentAction {
case none
case failure(Error)
}

mutating func requestBodyStreamSent() -> RequestBodyStreamSentAction {
switch self.state {
case .initialized,
.queued,
.deadlineExceededWhileQueued,
.executing(_, .requestHeadSent, _),
.executing(_, .finished, _),
.executing(_, .producing, _),
.executing(_, .paused, _):
assertionFailure("Invalid state: \(self.state)")
return .failure(HTTPClientError.internalStateFailure())

case .executing(_, .endForwarded, .finished):
self.state = .finished(error: nil)
return .none

case .executing(let context, .endForwarded, let responseState):
self.state = .executing(context, .finished, responseState)
return .none

case .finished:
return .none
Expand Down Expand Up @@ -482,7 +525,7 @@ extension Transaction {
switch requestState {
case .finished:
self.state = .finished(error: nil)
case .paused, .producing, .requestHeadSent:
case .paused, .producing, .requestHeadSent, .endForwarded:
self.state = .executing(context, requestState, .finished)
}
return .finishResponseStream(source, finalBody: newChunks)
Expand All @@ -497,6 +540,15 @@ extension Transaction {
}
}

mutating func httpResponseStreamTerminated() -> FailAction {
switch self.state {
case .executing(_, _, .finished), .finished:
return .none
default:
return self.fail(HTTPClientError.cancelled)
}
}

enum DeadlineExceededAction {
case none
case cancelSchedulerOnly(scheduler: HTTPRequestScheduler)
Expand Down Expand Up @@ -538,7 +590,7 @@ extension Transaction {
executor: context.executor,
bodyStreamContinuation: continuation
)
case .requestHeadSent, .finished, .producing, .paused(continuation: .none):
case .requestHeadSent, .endForwarded, .finished, .producing, .paused(continuation: .none):
self.state = .finished(error: error)
return .cancel(
requestContinuation: context.continuation,
Expand Down
32 changes: 29 additions & 3 deletions Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ extension Transaction: HTTPExecutableRequest {
}
}

func requestHeadSent() {}
func requestHeadSent() {
// protocol requirement. Intentionally not needed.
}

func resumeRequestBodyStream() {
let action = self.state.withLockedValue { state in
Expand Down Expand Up @@ -245,6 +247,19 @@ extension Transaction: HTTPExecutableRequest {
}
}

func requestBodyStreamSent() {
let action = self.state.withLockedValue { state in
state.requestBodyStreamSent()
}

switch action {
case .none:
break
case .failure(let error):
self.fail(error)
}
}

// MARK: Response

func receiveResponseHead(_ head: HTTPResponseHead) {
Expand Down Expand Up @@ -302,6 +317,13 @@ extension Transaction: HTTPExecutableRequest {
}
}

func httpResponseStreamTerminated() {
let action = self.state.withLockedValue { state in
state.httpResponseStreamTerminated()
}
self.performFailAction(action)
}

func fail(_ error: Error) {
let action = self.state.withLockedValue { state in
state.fail(error)
Expand All @@ -325,8 +347,12 @@ extension Transaction: HTTPExecutableRequest {
requestBodyStreamContinuation?.resume(throwing: error)
executor.cancelRequest(self)

case .failRequestStreamContinuation(let bodyStreamContinuation, let error):
case .failRequestStreamContinuation(let bodyStreamContinuation, let error, let executor):
bodyStreamContinuation.resume(throwing: error)
executor.cancelRequest(self)

case .cancelExecutor(let executor):
executor.cancelRequest(self)
}
}

Expand Down Expand Up @@ -369,6 +395,6 @@ extension Transaction: NIOAsyncSequenceProducerDelegate {

@usableFromInline
func didTerminate() {
self.fail(HTTPClientError.cancelled)
self.httpResponseStreamTerminated()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,46 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
case .sendBodyPart(let part, let writePromise):
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: writePromise)

case .sendRequestEnd(let writePromise):
case .sendRequestEnd(let writePromise, let finalAction):

let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
// We need to defer succeeding the old request to avoid ordering issues

writePromise.futureResult.hop(to: context.eventLoop).assumeIsolated().whenComplete { result in
guard let oldRequest = self.request else {
// in the meantime an error might have happened, which is why this request is
// not reference anymore.
return
}
oldRequest.requestBodyStreamSent()
switch result {
case .success:
// If our final action is not `none`, that means we've already received
// the complete response. As a result, once we've uploaded all the body parts
// we need to tell the pool that the connection is idle or, if we were asked to
// close when we're done, send the close. Either way, we then succeed the request

switch finalAction {
case .none:
// we must not nil out the request here, as we are still uploading the request
// and therefore still need the reference to it.
break

case .informConnectionIsIdle:
self.request = nil
self.onConnectionIdle()

case .close:
self.request = nil
context.close(promise: nil)
}

case .failure(let error):
context.close(promise: nil)
oldRequest.fail(error)
}
}

context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)

if let readTimeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
Expand Down Expand Up @@ -300,7 +339,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
// that the request is neither failed nor finished yet
self.request!.receiveResponseBodyParts(buffer)

case .succeedRequest(let finalAction, let buffer):
case .forwardResponseEnd(let finalAction, let buffer):
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request is neither failed nor finished yet

Expand All @@ -312,39 +351,20 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
// other way around.

let oldRequest = self.request!
self.request = nil
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)
self.runTimeoutAction(.clearIdleWriteTimeoutTimer, context: context)

switch finalAction {
case .close:
self.request = nil
context.close(promise: nil)
oldRequest.receiveResponseEnd(buffer, trailers: nil)
case .sendRequestEnd(let writePromise, let shouldClose):
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
// We need to defer succeeding the old request to avoid ordering issues
writePromise.futureResult.hop(to: context.eventLoop).assumeIsolated().whenComplete { result in
switch result {
case .success:
// If our final action was `sendRequestEnd`, that means we've already received
// the complete response. As a result, once we've uploaded all the body parts
// we need to tell the pool that the connection is idle or, if we were asked to
// close when we're done, send the close. Either way, we then succeed the request
if shouldClose {
context.close(promise: nil)
} else {
self.onConnectionIdle()
}

oldRequest.receiveResponseEnd(buffer, trailers: nil)
case .failure(let error):
context.close(promise: nil)
oldRequest.fail(error)
}
}

context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
case .none:
oldRequest.receiveResponseEnd(buffer, trailers: nil)

case .informConnectionIsIdle:
self.request = nil
self.onConnectionIdle()
oldRequest.receiveResponseEnd(buffer, trailers: nil)
}
Expand Down
Loading