Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shutdownAsync #109

Merged
merged 4 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public final class EventLoopGroupConnectionPool<Source> where Source: Connection
///
/// - Warning: This method is soft-deprecated. Use `syncShutdownGracefully()` or
/// `shutdownGracefully()` instead.
@available(*, noasync, message: "This calls wait() and should not be used in an async context", renamed: "shutdownAsync()")
public func shutdown() {
// synchronize access to closing
guard self.lock.withLock({
Expand All @@ -190,6 +191,44 @@ public final class EventLoopGroupConnectionPool<Source> where Source: Connection
}
}

/// Closes the connection pool.
///
/// All available connections will be closed immediately. Any connections still in use will be
/// closed as soon as they are returned to the pool. Once closed, the pool can not be used to
/// create new connections.
///
/// Connection pools must be closed before they deinitialize.
///
/// This method shuts down asynchronously, waiting for all connection closures to complete before
/// returning.
///
/// - Warning: The pool is always fully shut down once this method returns, even if an error is
/// thrown. All errors are purely advisory.
public func shutdownAsync() async throws {
// synchronize access to closing
guard self.lock.withLock({
// check to make sure we aren't double closing
guard !self.didShutdown else {
return false
}
self.didShutdown = true
self.logger.debug("Connection pool shutting down, closing each event loop's storage")
return true
}) else {
self.logger.debug("Cannot shutdown the connection pool more than once")
throw ConnectionPoolError.shutdown
}

// shutdown all pools
for pool in self.storage.values {
do {
try await pool.close().get()
} catch {
self.logger.error("Failed shutting down event loop pool: \(error)")
}
}
}

/// Closes the connection pool.
///
/// All available connections will be closed immediately. Any connections still in use will be
Expand All @@ -203,6 +242,7 @@ public final class EventLoopGroupConnectionPool<Source> where Source: Connection
///
/// - Warning: The pool is always fully shut down once this method returns, even if an error is
/// thrown. All errors are purely advisory.
@available(*, noasync, message: "This calls wait() and should not be used in an async context", renamed: "shutdownAsync()")
public func syncShutdownGracefully() throws {
// - TODO: Does this need to assert "not on any EventLoop", as `EventLoopGroup.syncShutdownGracefully()` does?
var possibleError: Error? = nil
Expand Down
225 changes: 225 additions & 0 deletions Tests/AsyncKitTests/AsyncConnectionPoolTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
import Atomics
import AsyncKit
import XCTest
import NIOConcurrencyHelpers
import Logging
import NIOCore
import NIOEmbedded

final class AsyncConnectionPoolTests: AsyncKitAsyncTestCase {
func testPooling() async throws {
let foo = FooDatabase()
let pool = EventLoopConnectionPool(
source: foo,
maxConnections: 2,
on: self.group.any()
)

// make two connections
let connA = try await pool.requestConnection().get()
XCTAssertEqual(connA.isClosed, false)
let connB = try await pool.requestConnection().get()
XCTAssertEqual(connB.isClosed, false)
XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 2)

// try to make a third, but pool only supports 2
let futureC = pool.requestConnection()
let connC = ManagedAtomic<FooConnection?>(nil)
futureC.whenSuccess { connC.store($0, ordering: .relaxed) }
XCTAssertNil(connC.load(ordering: .relaxed))
XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 2)

// release one of the connections, allowing the third to be made
pool.releaseConnection(connB)
let connCRet = try await futureC.get()
XCTAssertNotNil(connC.load(ordering: .relaxed))
XCTAssert(connC.load(ordering: .relaxed) === connB)
XCTAssert(connCRet === connC.load(ordering: .relaxed))
XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 2)

// try to make a third again, with two active
let futureD = pool.requestConnection()
let connD = ManagedAtomic<FooConnection?>(nil)
futureD.whenSuccess { connD.store($0, ordering: .relaxed) }
XCTAssertNil(connD.load(ordering: .relaxed))
XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 2)

// this time, close the connection before releasing it
try await connCRet.close().get()
pool.releaseConnection(connC.load(ordering: .relaxed)!)
let connDRet = try await futureD.get()
XCTAssert(connD.load(ordering: .relaxed) !== connB)
XCTAssert(connDRet === connD.load(ordering: .relaxed))
XCTAssertEqual(connD.load(ordering: .relaxed)?.isClosed, false)
XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 3)

try! await pool.close().get()
}

func testFIFOWaiters() async throws {
let foo = FooDatabase()
let pool = EventLoopConnectionPool(
source: foo,
maxConnections: 1,
on: self.group.any()
)

// * User A makes a request for a connection, gets connection number 1.
let a_1 = pool.requestConnection()
let a = try await a_1.get()

// * User B makes a request for a connection, they are exhausted so he gets a promise.
let b_1 = pool.requestConnection()

// * User A makes another request for a connection, they are still exhausted so he gets a promise.
let a_2 = pool.requestConnection()

// * User A returns connection number 1. His previous request is fulfilled with connection number 1.
pool.releaseConnection(a)

// * User B gets his connection
let b = try await b_1.get()
XCTAssert(a === b)

// * User B releases his connection
pool.releaseConnection(b)

// * User A's second connection request is fulfilled
let c = try await a_2.get()
XCTAssert(a === c)

try! await pool.close().get()
}

func testConnectError() async throws {
let db = ErrorDatabase()
let pool = EventLoopConnectionPool(
source: db,
maxConnections: 1,
on: self.group.any()
)

do {
_ = try await pool.requestConnection().get()
XCTFail("should not have created connection")
} catch _ as ErrorDatabase.Error {
// pass
}

// test that we can still make another request even after a failed request
do {
_ = try await pool.requestConnection().get()
XCTFail("should not have created connection")
} catch _ as ErrorDatabase.Error {
// pass
}

try! await pool.close().get()
}

func testPoolClose() async throws {
let foo = FooDatabase()
let pool = EventLoopConnectionPool(
source: foo,
maxConnections: 1,
on: self.group.any()
)
let _ = try await pool.requestConnection().get()
let b = pool.requestConnection()
try await pool.close().get()

let c = pool.requestConnection()

// check that waiters are failed
do {
_ = try await b.get()
XCTFail("should not have created connection")
} catch ConnectionPoolError.shutdown {
// pass
}

// check that new requests fail
do {
_ = try await c.get()
XCTFail("should not have created connection")
} catch ConnectionPoolError.shutdown {
// pass
}
}

func testGracefulShutdownAsync() async throws {
let foo = FooDatabase()
let pool = EventLoopGroupConnectionPool(
source: foo,
maxConnectionsPerEventLoop: 2,
on: self.group
)

try await pool.shutdownAsync()
var errorCaught = false

do {
try await pool.shutdownAsync()
} catch {
errorCaught = true
XCTAssertEqual(error as? ConnectionPoolError, ConnectionPoolError.shutdown)
}
XCTAssertTrue(errorCaught)
}

func testShutdownWithHeldConnection() async throws {
let foo = FooDatabase()
let pool = EventLoopGroupConnectionPool(
source: foo,
maxConnectionsPerEventLoop: 2,
on: self.group
)

let connection = try await pool.requestConnection().get()

try await pool.shutdownAsync()
var errorCaught = false

do {
try await pool.shutdownAsync()
} catch {
errorCaught = true
XCTAssertEqual(error as? ConnectionPoolError, ConnectionPoolError.shutdown)
}
XCTAssertTrue(errorCaught)

let result1 = try await connection.eventLoop.submit { connection.isClosed }.get()
XCTAssertFalse(result1)
pool.releaseConnection(connection)
let result2 = try await connection.eventLoop.submit { connection.isClosed }.get()
XCTAssertTrue(result2)
}

func testEventLoopDelegation() async throws {
let foo = FooDatabase()
let pool = EventLoopGroupConnectionPool(
source: foo,
maxConnectionsPerEventLoop: 1,
on: self.group
)

for _ in 0..<500 {
let eventLoop = self.group.any()
let a = pool.requestConnection(
on: eventLoop
).map { conn in
XCTAssertTrue(eventLoop.inEventLoop)
pool.releaseConnection(conn)
}
let b = pool.requestConnection(
on: eventLoop
).map { conn in
XCTAssertTrue(eventLoop.inEventLoop)
pool.releaseConnection(conn)
}
_ = try await a.and(b).get()
}

try await pool.shutdownAsync()
}
}
17 changes: 17 additions & 0 deletions Tests/AsyncKitTests/AsyncKitTestsCommon.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,20 @@ class AsyncKitTestCase: XCTestCase {
XCTAssertTrue(isLoggingConfigured)
}
}

class AsyncKitAsyncTestCase: XCTestCase {
var group: (any EventLoopGroup)!
var eventLoop: any EventLoop { self.group.any() }

override func setUp() async throws {
try await super.setUp()
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
XCTAssertTrue(isLoggingConfigured)
}

override func tearDown() async throws {
try await self.group.shutdownGracefully()
self.group = nil
try await super.tearDown()
}
}
8 changes: 4 additions & 4 deletions Tests/AsyncKitTests/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ final class ConnectionPoolTests: AsyncKitTestCase {
XCTAssertEqual($0 as? ConnectionPoolError, ConnectionPoolError.shutdown)
}
}

func testGracefulShutdownWithHeldConnection() throws {
let foo = FooDatabase()
let pool = EventLoopGroupConnectionPool(
Expand Down Expand Up @@ -312,7 +312,7 @@ final class ConnectionPoolTests: AsyncKitTestCase {
}
}

private struct ErrorDatabase: ConnectionPoolSource {
struct ErrorDatabase: ConnectionPoolSource {
enum Error: Swift.Error {
case test
}
Expand All @@ -322,7 +322,7 @@ private struct ErrorDatabase: ConnectionPoolSource {
}
}

private final class FooDatabase: ConnectionPoolSource {
final class FooDatabase: ConnectionPoolSource {
var connectionsCreated: ManagedAtomic<Int>

init() {
Expand All @@ -336,7 +336,7 @@ private final class FooDatabase: ConnectionPoolSource {
}
}

private final class FooConnection: ConnectionPoolItem, AtomicReference {
final class FooConnection: ConnectionPoolItem, AtomicReference {
var isClosed: Bool
let eventLoop: EventLoop

Expand Down
Loading