diff --git a/Sources/AsyncKit/ConnectionPool/EventLoopGroupConnectionPool.swift b/Sources/AsyncKit/ConnectionPool/EventLoopGroupConnectionPool.swift index 42dc4db..060d0e9 100644 --- a/Sources/AsyncKit/ConnectionPool/EventLoopGroupConnectionPool.swift +++ b/Sources/AsyncKit/ConnectionPool/EventLoopGroupConnectionPool.swift @@ -166,6 +166,7 @@ public final class EventLoopGroupConnectionPool where Source: Connection /// /// - Warning: This method is soft-deprecated. Use `syncShutdownGracefully()` or /// `shutdownGracefully()` instead. + @available(*, noasync, message: "This calls wait() and should not be used in an async context", renamed: "shutdownAsync()") public func shutdown() { // synchronize access to closing guard self.lock.withLock({ @@ -190,6 +191,44 @@ public final class EventLoopGroupConnectionPool where Source: Connection } } + /// Closes the connection pool. + /// + /// All available connections will be closed immediately. Any connections still in use will be + /// closed as soon as they are returned to the pool. Once closed, the pool can not be used to + /// create new connections. + /// + /// Connection pools must be closed before they deinitialize. + /// + /// This method shuts down asynchronously, waiting for all connection closures to complete before + /// returning. + /// + /// - Warning: The pool is always fully shut down once this method returns, even if an error is + /// thrown. All errors are purely advisory. + public func shutdownAsync() async throws { + // synchronize access to closing + guard self.lock.withLock({ + // check to make sure we aren't double closing + guard !self.didShutdown else { + return false + } + self.didShutdown = true + self.logger.debug("Connection pool shutting down, closing each event loop's storage") + return true + }) else { + self.logger.debug("Cannot shutdown the connection pool more than once") + throw ConnectionPoolError.shutdown + } + + // shutdown all pools + for pool in self.storage.values { + do { + try await pool.close().get() + } catch { + self.logger.error("Failed shutting down event loop pool: \(error)") + } + } + } + /// Closes the connection pool. /// /// All available connections will be closed immediately. Any connections still in use will be @@ -203,6 +242,7 @@ public final class EventLoopGroupConnectionPool where Source: Connection /// /// - Warning: The pool is always fully shut down once this method returns, even if an error is /// thrown. All errors are purely advisory. + @available(*, noasync, message: "This calls wait() and should not be used in an async context", renamed: "shutdownAsync()") public func syncShutdownGracefully() throws { // - TODO: Does this need to assert "not on any EventLoop", as `EventLoopGroup.syncShutdownGracefully()` does? var possibleError: Error? = nil diff --git a/Tests/AsyncKitTests/AsyncConnectionPoolTests.swift b/Tests/AsyncKitTests/AsyncConnectionPoolTests.swift new file mode 100644 index 0000000..08fa355 --- /dev/null +++ b/Tests/AsyncKitTests/AsyncConnectionPoolTests.swift @@ -0,0 +1,225 @@ +import Atomics +import AsyncKit +import XCTest +import NIOConcurrencyHelpers +import Logging +import NIOCore +import NIOEmbedded + +final class AsyncConnectionPoolTests: AsyncKitAsyncTestCase { + func testPooling() async throws { + let foo = FooDatabase() + let pool = EventLoopConnectionPool( + source: foo, + maxConnections: 2, + on: self.group.any() + ) + + // make two connections + let connA = try await pool.requestConnection().get() + XCTAssertEqual(connA.isClosed, false) + let connB = try await pool.requestConnection().get() + XCTAssertEqual(connB.isClosed, false) + XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 2) + + // try to make a third, but pool only supports 2 + let futureC = pool.requestConnection() + let connC = ManagedAtomic(nil) + futureC.whenSuccess { connC.store($0, ordering: .relaxed) } + XCTAssertNil(connC.load(ordering: .relaxed)) + XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 2) + + // release one of the connections, allowing the third to be made + pool.releaseConnection(connB) + let connCRet = try await futureC.get() + XCTAssertNotNil(connC.load(ordering: .relaxed)) + XCTAssert(connC.load(ordering: .relaxed) === connB) + XCTAssert(connCRet === connC.load(ordering: .relaxed)) + XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 2) + + // try to make a third again, with two active + let futureD = pool.requestConnection() + let connD = ManagedAtomic(nil) + futureD.whenSuccess { connD.store($0, ordering: .relaxed) } + XCTAssertNil(connD.load(ordering: .relaxed)) + XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 2) + + // this time, close the connection before releasing it + try await connCRet.close().get() + pool.releaseConnection(connC.load(ordering: .relaxed)!) + let connDRet = try await futureD.get() + XCTAssert(connD.load(ordering: .relaxed) !== connB) + XCTAssert(connDRet === connD.load(ordering: .relaxed)) + XCTAssertEqual(connD.load(ordering: .relaxed)?.isClosed, false) + XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 3) + + try! await pool.close().get() + } + + func testFIFOWaiters() async throws { + let foo = FooDatabase() + let pool = EventLoopConnectionPool( + source: foo, + maxConnections: 1, + on: self.group.any() + ) + + // * User A makes a request for a connection, gets connection number 1. + let a_1 = pool.requestConnection() + let a = try await a_1.get() + + // * User B makes a request for a connection, they are exhausted so he gets a promise. + let b_1 = pool.requestConnection() + + // * User A makes another request for a connection, they are still exhausted so he gets a promise. + let a_2 = pool.requestConnection() + + // * User A returns connection number 1. His previous request is fulfilled with connection number 1. + pool.releaseConnection(a) + + // * User B gets his connection + let b = try await b_1.get() + XCTAssert(a === b) + + // * User B releases his connection + pool.releaseConnection(b) + + // * User A's second connection request is fulfilled + let c = try await a_2.get() + XCTAssert(a === c) + + try! await pool.close().get() + } + + func testConnectError() async throws { + let db = ErrorDatabase() + let pool = EventLoopConnectionPool( + source: db, + maxConnections: 1, + on: self.group.any() + ) + + do { + _ = try await pool.requestConnection().get() + XCTFail("should not have created connection") + } catch _ as ErrorDatabase.Error { + // pass + } + + // test that we can still make another request even after a failed request + do { + _ = try await pool.requestConnection().get() + XCTFail("should not have created connection") + } catch _ as ErrorDatabase.Error { + // pass + } + + try! await pool.close().get() + } + + func testPoolClose() async throws { + let foo = FooDatabase() + let pool = EventLoopConnectionPool( + source: foo, + maxConnections: 1, + on: self.group.any() + ) + let _ = try await pool.requestConnection().get() + let b = pool.requestConnection() + try await pool.close().get() + + let c = pool.requestConnection() + + // check that waiters are failed + do { + _ = try await b.get() + XCTFail("should not have created connection") + } catch ConnectionPoolError.shutdown { + // pass + } + + // check that new requests fail + do { + _ = try await c.get() + XCTFail("should not have created connection") + } catch ConnectionPoolError.shutdown { + // pass + } + } + + func testGracefulShutdownAsync() async throws { + let foo = FooDatabase() + let pool = EventLoopGroupConnectionPool( + source: foo, + maxConnectionsPerEventLoop: 2, + on: self.group + ) + + try await pool.shutdownAsync() + var errorCaught = false + + do { + try await pool.shutdownAsync() + } catch { + errorCaught = true + XCTAssertEqual(error as? ConnectionPoolError, ConnectionPoolError.shutdown) + } + XCTAssertTrue(errorCaught) + } + + func testShutdownWithHeldConnection() async throws { + let foo = FooDatabase() + let pool = EventLoopGroupConnectionPool( + source: foo, + maxConnectionsPerEventLoop: 2, + on: self.group + ) + + let connection = try await pool.requestConnection().get() + + try await pool.shutdownAsync() + var errorCaught = false + + do { + try await pool.shutdownAsync() + } catch { + errorCaught = true + XCTAssertEqual(error as? ConnectionPoolError, ConnectionPoolError.shutdown) + } + XCTAssertTrue(errorCaught) + + let result1 = try await connection.eventLoop.submit { connection.isClosed }.get() + XCTAssertFalse(result1) + pool.releaseConnection(connection) + let result2 = try await connection.eventLoop.submit { connection.isClosed }.get() + XCTAssertTrue(result2) + } + + func testEventLoopDelegation() async throws { + let foo = FooDatabase() + let pool = EventLoopGroupConnectionPool( + source: foo, + maxConnectionsPerEventLoop: 1, + on: self.group + ) + + for _ in 0..<500 { + let eventLoop = self.group.any() + let a = pool.requestConnection( + on: eventLoop + ).map { conn in + XCTAssertTrue(eventLoop.inEventLoop) + pool.releaseConnection(conn) + } + let b = pool.requestConnection( + on: eventLoop + ).map { conn in + XCTAssertTrue(eventLoop.inEventLoop) + pool.releaseConnection(conn) + } + _ = try await a.and(b).get() + } + + try await pool.shutdownAsync() + } +} diff --git a/Tests/AsyncKitTests/AsyncKitTestsCommon.swift b/Tests/AsyncKitTests/AsyncKitTestsCommon.swift index 274ee1c..b30ad45 100644 --- a/Tests/AsyncKitTests/AsyncKitTestsCommon.swift +++ b/Tests/AsyncKitTests/AsyncKitTestsCommon.swift @@ -43,3 +43,20 @@ class AsyncKitTestCase: XCTestCase { XCTAssertTrue(isLoggingConfigured) } } + +class AsyncKitAsyncTestCase: XCTestCase { + var group: (any EventLoopGroup)! + var eventLoop: any EventLoop { self.group.any() } + + override func setUp() async throws { + try await super.setUp() + self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + XCTAssertTrue(isLoggingConfigured) + } + + override func tearDown() async throws { + try await self.group.shutdownGracefully() + self.group = nil + try await super.tearDown() + } +} diff --git a/Tests/AsyncKitTests/ConnectionPoolTests.swift b/Tests/AsyncKitTests/ConnectionPoolTests.swift index 3600090..8253ed8 100644 --- a/Tests/AsyncKitTests/ConnectionPoolTests.swift +++ b/Tests/AsyncKitTests/ConnectionPoolTests.swift @@ -264,7 +264,7 @@ final class ConnectionPoolTests: AsyncKitTestCase { XCTAssertEqual($0 as? ConnectionPoolError, ConnectionPoolError.shutdown) } } - + func testGracefulShutdownWithHeldConnection() throws { let foo = FooDatabase() let pool = EventLoopGroupConnectionPool( @@ -312,7 +312,7 @@ final class ConnectionPoolTests: AsyncKitTestCase { } } -private struct ErrorDatabase: ConnectionPoolSource { +struct ErrorDatabase: ConnectionPoolSource { enum Error: Swift.Error { case test } @@ -322,7 +322,7 @@ private struct ErrorDatabase: ConnectionPoolSource { } } -private final class FooDatabase: ConnectionPoolSource { +final class FooDatabase: ConnectionPoolSource { var connectionsCreated: ManagedAtomic init() { @@ -336,7 +336,7 @@ private final class FooDatabase: ConnectionPoolSource { } } -private final class FooConnection: ConnectionPoolItem, AtomicReference { +final class FooConnection: ConnectionPoolItem, AtomicReference { var isClosed: Bool let eventLoop: EventLoop