Skip to content

Commit

Permalink
Merge branch 'main' into fs-task-logger-access-level
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianfett authored Jul 20, 2022
2 parents 1e901b9 + 2adca4b commit c15b42a
Show file tree
Hide file tree
Showing 35 changed files with 993 additions and 133 deletions.
7 changes: 7 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ let package = Package(
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.10.0"),
.package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.11.4"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.4.0"),
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"),
],
targets: [
.target(name: "CAsyncHTTPClient"),
Expand All @@ -46,6 +47,7 @@ let package = Package(
.product(name: "NIOSOCKS", package: "swift-nio-extras"),
.product(name: "NIOTransportServices", package: "swift-nio-transport-services"),
.product(name: "Logging", package: "swift-log"),
.product(name: "Atomics", package: "swift-atomics"),
]
),
.testTarget(
Expand All @@ -61,6 +63,11 @@ let package = Package(
.product(name: "NIOHTTP2", package: "swift-nio-http2"),
.product(name: "NIOSOCKS", package: "swift-nio-extras"),
.product(name: "Logging", package: "swift-log"),
.product(name: "Atomics", package: "swift-atomics"),
],
resources: [
.copy("Resources/self_signed_cert.pem"),
.copy("Resources/self_signed_key.pem"),
]
),
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,14 +261,25 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
case .close:
context.close(promise: nil)
oldRequest.succeedRequest(buffer)
case .sendRequestEnd(let writePromise):
case .sendRequestEnd(let writePromise, let shouldClose):
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
// We need to defer succeeding the old request to avoid ordering issues
writePromise.futureResult.whenComplete { result in
writePromise.futureResult.hop(to: context.eventLoop).whenComplete { result in
switch result {
case .success:
// If our final action was `sendRequestEnd`, that means we've already received
// the complete response. As a result, once we've uploaded all the body parts
// we need to tell the pool that the connection is idle or, if we were asked to
// close when we're done, send the close. Either way, we then succeed the request
if shouldClose {
context.close(promise: nil)
} else {
self.connection.taskCompleted()
}

oldRequest.succeedRequest(buffer)
case .failure(let error):
context.close(promise: nil)
oldRequest.fail(error)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ struct HTTP1ConnectionStateMachine {
/// as soon as we wrote the request end onto the wire.
///
/// The promise is an optional write promise.
case sendRequestEnd(EventLoopPromise<Void>?)
///
/// `shouldClose` records whether we have attached a Connection: close header to this request, and so the connection should
/// be terminated
case sendRequestEnd(EventLoopPromise<Void>?, shouldClose: Bool)
/// Inform an observer that the connection has become idle
case informConnectionIsIdle
}
Expand Down Expand Up @@ -412,7 +415,8 @@ extension HTTP1ConnectionStateMachine.State {
self = .closing
newFinalAction = .close
case .sendRequestEnd(let writePromise):
newFinalAction = .sendRequestEnd(writePromise)
self = .idle
newFinalAction = .sendRequestEnd(writePromise, shouldClose: close)
case .none:
self = .idle
newFinalAction = close ? .close : .informConnectionIsIdle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ protocol HTTPConnectionRequester {
func http1ConnectionCreated(_: HTTP1Connection)
func http2ConnectionCreated(_: HTTP2Connection, maximumStreams: Int)
func failedToCreateHTTPConnection(_: HTTPConnectionPool.Connection.ID, error: Error)
func waitingForConnectivity(_: HTTPConnectionPool.Connection.ID, error: Error)
}

extension HTTPConnectionPool.ConnectionFactory {
Expand All @@ -62,7 +63,7 @@ extension HTTPConnectionPool.ConnectionFactory {
var logger = logger
logger[metadataKey: "ahc-connection-id"] = "\(connectionID)"

self.makeChannel(connectionID: connectionID, deadline: deadline, eventLoop: eventLoop, logger: logger).whenComplete { result in
self.makeChannel(requester: requester, connectionID: connectionID, deadline: deadline, eventLoop: eventLoop, logger: logger).whenComplete { result in
switch result {
case .success(.http1_1(let channel)):
do {
Expand Down Expand Up @@ -104,13 +105,15 @@ extension HTTPConnectionPool.ConnectionFactory {
case http2(Channel)
}

func makeHTTP1Channel(
func makeHTTP1Channel<Requester: HTTPConnectionRequester>(
requester: Requester,
connectionID: HTTPConnectionPool.Connection.ID,
deadline: NIODeadline,
eventLoop: EventLoop,
logger: Logger
) -> EventLoopFuture<Channel> {
self.makeChannel(
requester: requester,
connectionID: connectionID,
deadline: deadline,
eventLoop: eventLoop,
Expand All @@ -137,7 +140,8 @@ extension HTTPConnectionPool.ConnectionFactory {
}
}

func makeChannel(
func makeChannel<Requester: HTTPConnectionRequester>(
requester: Requester,
connectionID: HTTPConnectionPool.Connection.ID,
deadline: NIODeadline,
eventLoop: EventLoop,
Expand All @@ -150,6 +154,7 @@ extension HTTPConnectionPool.ConnectionFactory {
case .socks:
channelFuture = self.makeSOCKSProxyChannel(
proxy,
requester: requester,
connectionID: connectionID,
deadline: deadline,
eventLoop: eventLoop,
Expand All @@ -158,14 +163,15 @@ extension HTTPConnectionPool.ConnectionFactory {
case .http:
channelFuture = self.makeHTTPProxyChannel(
proxy,
requester: requester,
connectionID: connectionID,
deadline: deadline,
eventLoop: eventLoop,
logger: logger
)
}
} else {
channelFuture = self.makeNonProxiedChannel(deadline: deadline, eventLoop: eventLoop, logger: logger)
channelFuture = self.makeNonProxiedChannel(requester: requester, connectionID: connectionID, deadline: deadline, eventLoop: eventLoop, logger: logger)
}

// let's map `ChannelError.connectTimeout` into a `HTTPClientError.connectTimeout`
Expand All @@ -179,30 +185,38 @@ extension HTTPConnectionPool.ConnectionFactory {
}
}

private func makeNonProxiedChannel(
private func makeNonProxiedChannel<Requester: HTTPConnectionRequester>(
requester: Requester,
connectionID: HTTPConnectionPool.Connection.ID,
deadline: NIODeadline,
eventLoop: EventLoop,
logger: Logger
) -> EventLoopFuture<NegotiatedProtocol> {
switch self.key.scheme {
case .http, .httpUnix, .unix:
return self.makePlainChannel(deadline: deadline, eventLoop: eventLoop).map { .http1_1($0) }
return self.makePlainChannel(requester: requester, connectionID: connectionID, deadline: deadline, eventLoop: eventLoop).map { .http1_1($0) }
case .https, .httpsUnix:
return self.makeTLSChannel(deadline: deadline, eventLoop: eventLoop, logger: logger).flatMapThrowing {
return self.makeTLSChannel(requester: requester, connectionID: connectionID, deadline: deadline, eventLoop: eventLoop, logger: logger).flatMapThrowing {
channel, negotiated in

try self.matchALPNToHTTPVersion(negotiated, channel: channel)
}
}
}

private func makePlainChannel(deadline: NIODeadline, eventLoop: EventLoop) -> EventLoopFuture<Channel> {
private func makePlainChannel<Requester: HTTPConnectionRequester>(
requester: Requester,
connectionID: HTTPConnectionPool.Connection.ID,
deadline: NIODeadline,
eventLoop: EventLoop
) -> EventLoopFuture<Channel> {
precondition(!self.key.scheme.usesTLS, "Unexpected scheme")
return self.makePlainBootstrap(deadline: deadline, eventLoop: eventLoop).connect(target: self.key.connectionTarget)
return self.makePlainBootstrap(requester: requester, connectionID: connectionID, deadline: deadline, eventLoop: eventLoop).connect(target: self.key.connectionTarget)
}

private func makeHTTPProxyChannel(
private func makeHTTPProxyChannel<Requester: HTTPConnectionRequester>(
_ proxy: HTTPClient.Configuration.Proxy,
requester: Requester,
connectionID: HTTPConnectionPool.Connection.ID,
deadline: NIODeadline,
eventLoop: EventLoop,
Expand All @@ -211,7 +225,7 @@ extension HTTPConnectionPool.ConnectionFactory {
// A proxy connection starts with a plain text connection to the proxy server. After
// the connection has been established with the proxy server, the connection might be
// upgraded to TLS before we send our first request.
let bootstrap = self.makePlainBootstrap(deadline: deadline, eventLoop: eventLoop)
let bootstrap = self.makePlainBootstrap(requester: requester, connectionID: connectionID, deadline: deadline, eventLoop: eventLoop)
return bootstrap.connect(host: proxy.host, port: proxy.port).flatMap { channel in
let encoder = HTTPRequestEncoder()
let decoder = ByteToMessageHandler(HTTPResponseDecoder(leftOverBytesStrategy: .dropBytes))
Expand Down Expand Up @@ -243,8 +257,9 @@ extension HTTPConnectionPool.ConnectionFactory {
}
}

private func makeSOCKSProxyChannel(
private func makeSOCKSProxyChannel<Requester: HTTPConnectionRequester>(
_ proxy: HTTPClient.Configuration.Proxy,
requester: Requester,
connectionID: HTTPConnectionPool.Connection.ID,
deadline: NIODeadline,
eventLoop: EventLoop,
Expand All @@ -253,7 +268,7 @@ extension HTTPConnectionPool.ConnectionFactory {
// A proxy connection starts with a plain text connection to the proxy server. After
// the connection has been established with the proxy server, the connection might be
// upgraded to TLS before we send our first request.
let bootstrap = self.makePlainBootstrap(deadline: deadline, eventLoop: eventLoop)
let bootstrap = self.makePlainBootstrap(requester: requester, connectionID: connectionID, deadline: deadline, eventLoop: eventLoop)
return bootstrap.connect(host: proxy.host, port: proxy.port).flatMap { channel in
let socksConnectHandler = SOCKSClientHandler(targetAddress: SOCKSAddress(self.key.connectionTarget))
let socksEventHandler = SOCKSEventsHandler(deadline: deadline)
Expand Down Expand Up @@ -331,14 +346,21 @@ extension HTTPConnectionPool.ConnectionFactory {
}
}

private func makePlainBootstrap(deadline: NIODeadline, eventLoop: EventLoop) -> NIOClientTCPBootstrapProtocol {
private func makePlainBootstrap<Requester: HTTPConnectionRequester>(
requester: Requester,
connectionID: HTTPConnectionPool.Connection.ID,
deadline: NIODeadline,
eventLoop: EventLoop
) -> NIOClientTCPBootstrapProtocol {
#if canImport(Network)
if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), let tsBootstrap = NIOTSConnectionBootstrap(validatingGroup: eventLoop) {
return tsBootstrap
.channelOption(NIOTSChannelOptions.waitForActivity, value: self.clientConfiguration.networkFrameworkWaitForConnectivity)
.connectTimeout(deadline - NIODeadline.now())
.channelInitializer { channel in
do {
try channel.pipeline.syncOperations.addHandler(HTTPClient.NWErrorHandler())
try channel.pipeline.syncOperations.addHandler(NWWaitingHandler(requester: requester, connectionID: connectionID))
return channel.eventLoop.makeSucceededVoidFuture()
} catch {
return channel.eventLoop.makeFailedFuture(error)
Expand All @@ -355,9 +377,17 @@ extension HTTPConnectionPool.ConnectionFactory {
preconditionFailure("No matching bootstrap found")
}

private func makeTLSChannel(deadline: NIODeadline, eventLoop: EventLoop, logger: Logger) -> EventLoopFuture<(Channel, String?)> {
private func makeTLSChannel<Requester: HTTPConnectionRequester>(
requester: Requester,
connectionID: HTTPConnectionPool.Connection.ID,
deadline: NIODeadline,
eventLoop: EventLoop,
logger: Logger
) -> EventLoopFuture<(Channel, String?)> {
precondition(self.key.scheme.usesTLS, "Unexpected scheme")
let bootstrapFuture = self.makeTLSBootstrap(
requester: requester,
connectionID: connectionID,
deadline: deadline,
eventLoop: eventLoop,
logger: logger
Expand Down Expand Up @@ -387,8 +417,13 @@ extension HTTPConnectionPool.ConnectionFactory {
return channelFuture
}

private func makeTLSBootstrap(deadline: NIODeadline, eventLoop: EventLoop, logger: Logger)
-> EventLoopFuture<NIOClientTCPBootstrapProtocol> {
private func makeTLSBootstrap<Requester: HTTPConnectionRequester>(
requester: Requester,
connectionID: HTTPConnectionPool.Connection.ID,
deadline: NIODeadline,
eventLoop: EventLoop,
logger: Logger
) -> EventLoopFuture<NIOClientTCPBootstrapProtocol> {
var tlsConfig = self.tlsConfiguration
switch self.clientConfiguration.httpVersion.configuration {
case .automatic:
Expand All @@ -408,11 +443,13 @@ extension HTTPConnectionPool.ConnectionFactory {
options -> NIOClientTCPBootstrapProtocol in

tsBootstrap
.channelOption(NIOTSChannelOptions.waitForActivity, value: self.clientConfiguration.networkFrameworkWaitForConnectivity)
.connectTimeout(deadline - NIODeadline.now())
.tlsOptions(options)
.channelInitializer { channel in
do {
try channel.pipeline.syncOperations.addHandler(HTTPClient.NWErrorHandler())
try channel.pipeline.syncOperations.addHandler(NWWaitingHandler(requester: requester, connectionID: connectionID))
// we don't need to set a TLS deadline for NIOTS connections, since the
// TLS handshake is part of the TS connection bootstrap. If the TLS
// handshake times out the complete connection creation will be failed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//
//===----------------------------------------------------------------------===//

import Atomics
import Logging
import NIOConcurrencyHelpers
import NIOCore
Expand Down Expand Up @@ -165,14 +166,14 @@ extension HTTPConnectionPool.Connection.ID {
static var globalGenerator = Generator()

struct Generator {
private let atomic: NIOAtomic<Int>
private let atomic: ManagedAtomic<Int>

init() {
self.atomic = .makeAtomic(value: 0)
self.atomic = .init(0)
}

func next() -> Int {
return self.atomic.add(1)
return self.atomic.loadThenWrappingIncrement(ordering: .relaxed)
}
}
}
12 changes: 10 additions & 2 deletions Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ final class HTTPConnectionPool {
self.unlocked = Unlocked(connection: .none, request: .none)

switch stateMachineAction.request {
case .cancelRequestTimeout(let requestID):
self.locked.request = .cancelRequestTimeout(requestID)
case .executeRequest(let request, let connection, cancelTimeout: let cancelTimeout):
if cancelTimeout {
self.locked.request = .cancelRequestTimeout(request.id)
Expand Down Expand Up @@ -467,6 +465,16 @@ extension HTTPConnectionPool: HTTPConnectionRequester {
$0.failedToCreateNewConnection(error, connectionID: connectionID)
}
}

func waitingForConnectivity(_ connectionID: HTTPConnectionPool.Connection.ID, error: Error) {
self.logger.debug("waiting for connectivity", metadata: [
"ahc-error": "\(error)",
"ahc-connection-id": "\(connectionID)",
])
self.modifyStateAndRunActions {
$0.waitingForConnectivity(error, connectionID: connectionID)
}
}
}

extension HTTPConnectionPool: HTTP1ConnectionDelegate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ extension HTTPConnectionPool {
}
}

mutating func waitingForConnectivity(_ error: Error, connectionID: Connection.ID) -> Action {
self.lastConnectFailure = error

return .init(request: .none, connection: .none)
}

mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action {
switch self.lifecycleState {
case .running:
Expand Down Expand Up @@ -317,9 +323,11 @@ extension HTTPConnectionPool {

mutating func cancelRequest(_ requestID: Request.ID) -> Action {
// 1. check requests in queue
if self.requests.remove(requestID) != nil {
if let request = self.requests.remove(requestID) {
// Use the last connection error to let the user know why the request was never scheduled
let error = self.lastConnectFailure ?? HTTPClientError.cancelled
return .init(
request: .cancelRequestTimeout(requestID),
request: .failRequest(request, error, cancelTimeout: true),
connection: .none
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,11 @@ extension HTTPConnectionPool {
return .init(request: .none, connection: .scheduleBackoffTimer(connectionID, backoff: backoff, on: eventLoop))
}

mutating func waitingForConnectivity(_ error: Error, connectionID: Connection.ID) -> Action {
self.lastConnectFailure = error
return .init(request: .none, connection: .none)
}

mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action {
// The naming of `failConnection` is a little confusing here. All it does is moving the
// connection state from `.backingOff` to `.closed` here. It also returns the
Expand Down Expand Up @@ -439,9 +444,11 @@ extension HTTPConnectionPool {

mutating func cancelRequest(_ requestID: Request.ID) -> Action {
// 1. check requests in queue
if self.requests.remove(requestID) != nil {
if let request = self.requests.remove(requestID) {
// Use the last connection error to let the user know why the request was never scheduled
let error = self.lastConnectFailure ?? HTTPClientError.cancelled
return .init(
request: .cancelRequestTimeout(requestID),
request: .failRequest(request, error, cancelTimeout: true),
connection: .none
)
}
Expand Down
Loading

0 comments on commit c15b42a

Please sign in to comment.