diff --git a/Sources/ContainerClient/ContainerEvents.swift b/Sources/ContainerClient/ContainerEvents.swift deleted file mode 100644 index 83d06f50..00000000 --- a/Sources/ContainerClient/ContainerEvents.swift +++ /dev/null @@ -1,20 +0,0 @@ -//===----------------------------------------------------------------------===// -// Copyright © 2025 Apple Inc. and the container project authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//===----------------------------------------------------------------------===// - -public enum ContainerEvent: Sendable, Codable { - case containerStart(id: String) - case containerExit(id: String, exitCode: Int64) -} diff --git a/Sources/ContainerClient/Core/ClientContainer.swift b/Sources/ContainerClient/Core/ClientContainer.swift index 696b0dce..a110fe50 100644 --- a/Sources/ContainerClient/Core/ClientContainer.swift +++ b/Sources/ContainerClient/Core/ClientContainer.swift @@ -25,10 +25,6 @@ import TerminalProgress public struct ClientContainer: Sendable, Codable { static let serviceIdentifier = "com.apple.container.apiserver" - private var sandboxClient: SandboxClient { - SandboxClient(id: configuration.id, runtime: configuration.runtimeHandler) - } - /// Identifier of the container. public var id: String { configuration.id @@ -58,14 +54,10 @@ public struct ClientContainer: Sendable, Codable { self.status = snapshot.status self.networks = snapshot.networks } - - public var initProcess: ClientProcess { - ClientProcessImpl(containerId: self.id, client: self.sandboxClient) - } } extension ClientContainer { - private static func newClient() -> XPCClient { + private static func newXPCClient() -> XPCClient { XPCClient(service: serviceIdentifier) } @@ -84,8 +76,8 @@ extension ClientContainer { kernel: Kernel ) async throws -> ClientContainer { do { - let client = Self.newClient() - let request = XPCMessage(route: .createContainer) + let client = Self.newXPCClient() + let request = XPCMessage(route: .containerCreate) let data = try JSONEncoder().encode(configuration) let kdata = try JSONEncoder().encode(kernel) @@ -107,8 +99,8 @@ extension ClientContainer { public static func list() async throws -> [ClientContainer] { do { - let client = Self.newClient() - let request = XPCMessage(route: .listContainer) + let client = Self.newXPCClient() + let request = XPCMessage(route: .containerList) let response = try await xpcSend( client: client, @@ -145,16 +137,72 @@ extension ClientContainer { extension ClientContainer { public func bootstrap(stdio: [FileHandle?]) async throws -> ClientProcess { - let client = self.sandboxClient - try await client.bootstrap(stdio: stdio) - return ClientProcessImpl(containerId: self.id, client: self.sandboxClient) + let request = XPCMessage(route: .containerBootstrap) + let client = Self.newXPCClient() + + for (i, h) in stdio.enumerated() { + let key: XPCKeys = try { + switch i { + case 0: .stdin + case 1: .stdout + case 2: .stderr + default: + throw ContainerizationError(.invalidArgument, message: "invalid fd \(i)") + } + }() + + if let h { + request.set(key: key, value: h) + } + } + + do { + request.set(key: .id, value: self.id) + try await client.send(request) + return ClientProcessImpl(containerId: self.id, xpcClient: client) + } catch { + throw ContainerizationError( + .internalError, + message: "failed to bootstrap container", + cause: error + ) + } + } + + public func kill(_ signal: Int32) async throws { + do { + let request = XPCMessage(route: .containerKill) + request.set(key: .id, value: self.id) + request.set(key: .processIdentifier, value: self.id) + request.set(key: .signal, value: Int64(signal)) + + let client = Self.newXPCClient() + try await client.send(request) + } catch { + throw ContainerizationError( + .internalError, + message: "failed to kill container", + cause: error + ) + } } /// Stop the container and all processes currently executing inside. public func stop(opts: ContainerStopOptions = ContainerStopOptions.default) async throws { do { - let client = self.sandboxClient - try await client.stop(options: opts) + let client = Self.newXPCClient() + let request = XPCMessage(route: .containerStop) + let data = try JSONEncoder().encode(opts) + request.set(key: .id, value: self.id) + request.set(key: .stopOptions, value: data) + + // Stop is somewhat more prone to hanging than other commands given it + // has quite a bit of `wait()`'s down the chain to make sure the container actually + // exited. To combat a potential hang, lets timeout if we don't return in a small + // time period after the actual stop timeout sent via .stopOptions (the time + // until we send SIGKILL after SIGTERM if the container still hasn't exited). + let responseTimeout = Duration(.seconds(Int64(opts.timeoutInSeconds + 3))) + try await client.send(request, responseTimeout: responseTimeout) } catch { throw ContainerizationError( .internalError, @@ -167,8 +215,8 @@ extension ClientContainer { /// Delete the container along with any resources. public func delete(force: Bool = false) async throws { do { - let client = XPCClient(service: Self.serviceIdentifier) - let request = XPCMessage(route: .deleteContainer) + let client = Self.newXPCClient() + let request = XPCMessage(route: .containerDelete) request.set(key: .id, value: self.id) request.set(key: .forceDelete, value: force) try await client.send(request) @@ -180,38 +228,45 @@ extension ClientContainer { ) } } -} -extension ClientContainer { - /// Execute a new process inside a running container. + /// Create a new process inside a running container. The process is in a + /// created state and must still be started. public func createProcess( id: String, configuration: ProcessConfiguration, stdio: [FileHandle?] ) async throws -> ClientProcess { do { - let client = self.sandboxClient - try await client.createProcess(id, config: configuration, stdio: stdio) - return ClientProcessImpl(containerId: self.id, processId: id, client: client) - } catch { - throw ContainerizationError( - .internalError, - message: "failed to exec in container", - cause: error - ) - } - } + let request = XPCMessage(route: .containerCreateProcess) + request.set(key: .id, value: self.id) + request.set(key: .processIdentifier, value: id) - /// Send or "kill" a signal to the initial process of the container. - /// Kill does not wait for the process to exit, it only delivers the signal. - public func kill(_ signal: Int32) async throws { - do { - let client = self.sandboxClient - try await client.kill(self.id, signal: Int64(signal)) + let data = try JSONEncoder().encode(configuration) + request.set(key: .processConfig, value: data) + + for (i, h) in stdio.enumerated() { + let key: XPCKeys = try { + switch i { + case 0: .stdin + case 1: .stdout + case 2: .stderr + default: + throw ContainerizationError(.invalidArgument, message: "invalid fd \(i)") + } + }() + + if let h { + request.set(key: key, value: h) + } + } + + let client = Self.newXPCClient() + try await client.send(request) + return ClientProcessImpl(containerId: self.id, processId: id, xpcClient: client) } catch { throw ContainerizationError( .internalError, - message: "failed to kill container \(self.id)", + message: "failed to create process in container", cause: error ) } @@ -219,7 +274,7 @@ extension ClientContainer { public func logs() async throws -> [FileHandle] { do { - let client = XPCClient(service: Self.serviceIdentifier) + let client = Self.newXPCClient() let request = XPCMessage(route: .containerLogs) request.set(key: .id, value: self.id) @@ -242,15 +297,27 @@ extension ClientContainer { } public func dial(_ port: UInt32) async throws -> FileHandle { + let request = XPCMessage(route: .containerDial) + request.set(key: .id, value: self.id) + request.set(key: .port, value: UInt64(port)) + + let client = Self.newXPCClient() + let response: XPCMessage do { - let client = self.sandboxClient - return try await client.dial(port) + response = try await client.send(request) } catch { throw ContainerizationError( .internalError, - message: "failed to dial \(port) in container \(self.id)", + message: "failed to dial port \(port) on container", cause: error ) } + guard let fh = response.fileHandle(key: .fd) else { + throw ContainerizationError( + .internalError, + message: "failed to get fd for vsock port \(port)" + ) + } + return fh } } diff --git a/Sources/ContainerClient/Core/ClientProcess.swift b/Sources/ContainerClient/Core/ClientProcess.swift index b3907d91..cb391ed1 100644 --- a/Sources/ContainerClient/Core/ClientProcess.swift +++ b/Sources/ContainerClient/Core/ClientProcess.swift @@ -35,7 +35,7 @@ public protocol ClientProcess: Sendable { func start() async throws /// Send a terminal resize request to the process `id`. func resize(_ size: Terminal.Size) async throws - /// Send or "kill" a signal to the process `id`. + /// Send a signal to the process `id`. /// Kill does not wait for the process to exit, it only delivers the signal. func kill(_ signal: Int32) async throws /// Wait for the process `id` to complete and return its exit code. @@ -45,79 +45,66 @@ public protocol ClientProcess: Sendable { struct ClientProcessImpl: ClientProcess, Sendable { static let serviceIdentifier = "com.apple.container.apiserver" + + /// ID of the process. + public var id: String { + processId ?? containerId + } + /// Identifier of the container. public let containerId: String - private let client: SandboxClient - /// Identifier of a process. That is running inside of a container. /// This field is nil if the process this objects refers to is the /// init process of the container. public let processId: String? - public var id: String { - processId ?? containerId - } + private let xpcClient: XPCClient - init(containerId: String, processId: String? = nil, client: SandboxClient) { + init(containerId: String, processId: String? = nil, xpcClient: XPCClient) { self.containerId = containerId self.processId = processId - self.client = client + self.xpcClient = xpcClient } - /// Start the container and return the initial process. + /// Start the process. public func start() async throws { - do { - let client = self.client - try await client.startProcess(self.id) - } catch { - throw ContainerizationError( - .internalError, - message: "failed to start container", - cause: error - ) - } + let request = XPCMessage(route: .containerStartProcess) + request.set(key: .id, value: containerId) + request.set(key: .processIdentifier, value: id) + + try await xpcClient.send(request) } + /// Send a signal to the process. public func kill(_ signal: Int32) async throws { - do { - - let client = self.client - try await client.kill(self.id, signal: Int64(signal)) - } catch { - throw ContainerizationError( - .internalError, - message: "failed to kill process", - cause: error - ) - } - } + let request = XPCMessage(route: .containerKill) + request.set(key: .id, value: containerId) + request.set(key: .processIdentifier, value: id) + request.set(key: .signal, value: Int64(signal)) - public func resize(_ size: ContainerizationOS.Terminal.Size) async throws { - do { + try await xpcClient.send(request) + } - let client = self.client - try await client.resize(self.id, size: size) + /// Resize the processes PTY if it has one. + public func resize(_ size: Terminal.Size) async throws { + let request = XPCMessage(route: .containerResize) + request.set(key: .id, value: containerId) + request.set(key: .processIdentifier, value: id) + request.set(key: .width, value: UInt64(size.width)) + request.set(key: .height, value: UInt64(size.height)) - } catch { - throw ContainerizationError( - .internalError, - message: "failed to resize process", - cause: error - ) - } + try await xpcClient.send(request) } + /// Wait for the process to exit. public func wait() async throws -> Int32 { - do { - let client = self.client - return try await client.wait(self.id) - } catch { - throw ContainerizationError( - .internalError, - message: "failed to wait on process", - cause: error - ) - } + let request = XPCMessage(route: .containerWait) + request.set(key: .id, value: containerId) + request.set(key: .processIdentifier, value: id) + + let response = try await xpcClient.send(request) + let code = response.int64(key: .exitCode) + return Int32(code) } } diff --git a/Sources/ContainerClient/Core/ContainerSnapshot.swift b/Sources/ContainerClient/Core/ContainerSnapshot.swift index ebd6f1ca..b0fcb943 100644 --- a/Sources/ContainerClient/Core/ContainerSnapshot.swift +++ b/Sources/ContainerClient/Core/ContainerSnapshot.swift @@ -20,11 +20,11 @@ import ContainerNetworkService /// and any runtime state information. public struct ContainerSnapshot: Codable, Sendable { /// The configuration of the container. - public let configuration: ContainerConfiguration + public var configuration: ContainerConfiguration /// The runtime status of the container. - public let status: RuntimeStatus + public var status: RuntimeStatus /// Network interfaces attached to the sandbox that are provided to the container. - public let networks: [Attachment] + public var networks: [Attachment] public init( configuration: ContainerConfiguration, diff --git a/Sources/ContainerClient/XPC+.swift b/Sources/ContainerClient/Core/XPC+.swift similarity index 93% rename from Sources/ContainerClient/XPC+.swift rename to Sources/ContainerClient/Core/XPC+.swift index af028137..db90eb64 100644 --- a/Sources/ContainerClient/XPC+.swift +++ b/Sources/ContainerClient/Core/XPC+.swift @@ -48,6 +48,8 @@ public enum XPCKeys: String { case stopOptions /// Whether to force stop a container when deleting. case forceDelete + /// An endpoint to talk to a sandbox service. + case sandboxServiceEndpoint /// Plugins case pluginName case plugins @@ -114,9 +116,18 @@ public enum XPCKeys: String { } public enum XPCRoute: String { - case listContainer - case createContainer - case deleteContainer + case containerList + case containerCreate + case containerBootstrap + case containerCreateProcess + case containerStartProcess + case containerWait + case containerDelete + case containerStop + case containerDial + case containerResize + case containerKill + case containerState case containerLogs case containerEvent diff --git a/Sources/Services/ContainerSandboxService/ExitMonitor.swift b/Sources/ContainerClient/ExitMonitor.swift similarity index 99% rename from Sources/Services/ContainerSandboxService/ExitMonitor.swift rename to Sources/ContainerClient/ExitMonitor.swift index 80dfa58e..4318065a 100644 --- a/Sources/Services/ContainerSandboxService/ExitMonitor.swift +++ b/Sources/ContainerClient/ExitMonitor.swift @@ -14,8 +14,6 @@ // limitations under the License. //===----------------------------------------------------------------------===// -// - import ContainerizationError import ContainerizationExtras import Foundation diff --git a/Sources/ContainerClient/SandboxClient.swift b/Sources/ContainerClient/SandboxClient.swift index 67815a69..d9a4cdac 100644 --- a/Sources/ContainerClient/SandboxClient.swift +++ b/Sources/ContainerClient/SandboxClient.swift @@ -21,7 +21,7 @@ import Foundation import TerminalProgress /// A client for interacting with a single sandbox. -public struct SandboxClient: Sendable, Codable { +public struct SandboxClient: Sendable { static let label = "com.apple.container.runtime" public static func machServiceLabel(runtime: String, id: String) -> String { @@ -34,11 +34,41 @@ public struct SandboxClient: Sendable, Codable { let id: String let runtime: String + let client: XPCClient - /// Create a container. - public init(id: String, runtime: String) { + init(id: String, runtime: String, client: XPCClient) { self.id = id self.runtime = runtime + self.client = client + } + + /// Create a SandboxClient by ID and runtime string. The returned client is ready to be used + /// without additional steps. + public static func create(id: String, runtime: String) async throws -> SandboxClient { + let label = Self.machServiceLabel(runtime: runtime, id: id) + let client = XPCClient(service: label) + let request = XPCMessage(route: SandboxRoutes.createEndpoint.rawValue) + + let response: XPCMessage + do { + response = try await client.send(request, responseTimeout: .seconds(3)) + } catch { + throw ContainerizationError( + .internalError, + message: "failed to create container \(id)", + cause: error + ) + } + guard let endpoint = response.endpoint(key: .sandboxServiceEndpoint) else { + throw ContainerizationError( + .internalError, + message: "failed to get endpoint for sandbox service" + ) + } + + let endpointConnection = xpc_connection_create_from_endpoint(endpoint) + let xpcClient = XPCClient(connection: endpointConnection, label: label) + return SandboxClient(id: id, runtime: runtime, client: xpcClient) } } @@ -46,17 +76,15 @@ public struct SandboxClient: Sendable, Codable { extension SandboxClient { public func bootstrap(stdio: [FileHandle?]) async throws { let request = XPCMessage(route: SandboxRoutes.bootstrap.rawValue) - let client = createClient() - defer { client.close() } for (i, h) in stdio.enumerated() { - let key: XPCKeys = { + let key: XPCKeys = try { switch i { case 0: .stdin case 1: .stdout case 2: .stderr default: - fatalError("invalid fd \(i)") + throw ContainerizationError(.invalidArgument, message: "invalid fd \(i)") } }() @@ -65,15 +93,29 @@ extension SandboxClient { } } - try await client.send(request) + do { + try await self.client.send(request) + } catch { + throw ContainerizationError( + .internalError, + message: "failed to bootstrap container \(self.id)", + cause: error + ) + } } public func state() async throws -> SandboxSnapshot { let request = XPCMessage(route: SandboxRoutes.state.rawValue) - let client = createClient() - defer { client.close() } - - let response = try await client.send(request) + let response: XPCMessage + do { + response = try await self.client.send(request) + } catch { + throw ContainerizationError( + .internalError, + message: "failed to get state for container \(self.id)", + cause: error + ) + } return try response.sandboxSnapshot() } @@ -84,13 +126,13 @@ extension SandboxClient { request.set(key: .processConfig, value: data) for (i, h) in stdio.enumerated() { - let key: XPCKeys = { + let key: XPCKeys = try { switch i { case 0: .stdin case 1: .stdout case 2: .stderr default: - fatalError("invalid fd \(i)") + throw ContainerizationError(.invalidArgument, message: "invalid fd \(i)") } }() @@ -99,19 +141,29 @@ extension SandboxClient { } } - let client = createClient() - defer { client.close() } - try await client.send(request) + do { + try await self.client.send(request) + } catch { + throw ContainerizationError( + .internalError, + message: "failed to create process \(id) in container \(self.id)", + cause: error + ) + } } public func startProcess(_ id: String) async throws { let request = XPCMessage(route: SandboxRoutes.start.rawValue) request.set(key: .id, value: id) - - let client = createClient() - defer { client.close() } - - try await client.send(request) + do { + try await self.client.send(request) + } catch { + throw ContainerizationError( + .internalError, + message: "failed to start process \(id) in container \(self.id)", + cause: error + ) + } } public func stop(options: ContainerStopOptions) async throws { @@ -120,10 +172,16 @@ extension SandboxClient { let data = try JSONEncoder().encode(options) request.set(key: .stopOptions, value: data) - let client = createClient() - defer { client.close() } let responseTimeout = Duration(.seconds(Int64(options.timeoutInSeconds + 1))) - try await client.send(request, responseTimeout: responseTimeout) + do { + try await self.client.send(request, responseTimeout: responseTimeout) + } catch { + throw ContainerizationError( + .internalError, + message: "failed to stop container \(self.id)", + cause: error + ) + } } public func kill(_ id: String, signal: Int64) async throws { @@ -131,9 +189,15 @@ extension SandboxClient { request.set(key: .id, value: id) request.set(key: .signal, value: signal) - let client = createClient() - defer { client.close() } - try await client.send(request) + do { + try await self.client.send(request) + } catch { + throw ContainerizationError( + .internalError, + message: "failed to send signal \(signal) to process \(id) in container \(self.id)", + cause: error + ) + } } public func resize(_ id: String, size: Terminal.Size) async throws { @@ -142,18 +206,31 @@ extension SandboxClient { request.set(key: .width, value: UInt64(size.width)) request.set(key: .height, value: UInt64(size.height)) - let client = createClient() - defer { client.close() } - try await client.send(request) + do { + try await self.client.send(request) + } catch { + throw ContainerizationError( + .internalError, + message: "failed to resize pty for process \(id) in container \(self.id)", + cause: error + ) + } } public func wait(_ id: String) async throws -> Int32 { let request = XPCMessage(route: SandboxRoutes.wait.rawValue) request.set(key: .id, value: id) - let client = createClient() - defer { client.close() } - let response = try await client.send(request) + let response: XPCMessage + do { + response = try await self.client.send(request) + } catch { + throw ContainerizationError( + .internalError, + message: "failed to wait for process \(id) in container \(self.id)", + cause: error + ) + } let code = response.int64(key: .exitCode) return Int32(code) } @@ -162,10 +239,16 @@ extension SandboxClient { let request = XPCMessage(route: SandboxRoutes.dial.rawValue) request.set(key: .port, value: UInt64(port)) - let client = createClient() - defer { client.close() } - - let response = try await client.send(request) + let response: XPCMessage + do { + response = try await self.client.send(request) + } catch { + throw ContainerizationError( + .internalError, + message: "failed to dial \(port) on \(self.id)", + cause: error + ) + } guard let fh = response.fileHandle(key: .fd) else { throw ContainerizationError( .internalError, @@ -175,8 +258,18 @@ extension SandboxClient { return fh } - private func createClient() -> XPCClient { - XPCClient(service: machServiceLabel) + public func shutdown() async throws { + let request = XPCMessage(route: SandboxRoutes.shutdown.rawValue) + + do { + _ = try await self.client.send(request) + } catch { + throw ContainerizationError( + .internalError, + message: "failed to shutdown container \(self.id)", + cause: error + ) + } } } @@ -184,7 +277,10 @@ extension XPCMessage { public func id() throws -> String { let id = self.string(key: .id) guard let id else { - throw ContainerizationError(.invalidArgument, message: "No id") + throw ContainerizationError( + .invalidArgument, + message: "No id" + ) } return id } @@ -192,7 +288,10 @@ extension XPCMessage { func sandboxSnapshot() throws -> SandboxSnapshot { let data = self.dataNoCopy(key: .snapshot) guard let data else { - throw ContainerizationError(.invalidArgument, message: "No state data returned") + throw ContainerizationError( + .invalidArgument, + message: "No state data returned" + ) } return try JSONDecoder().decode(SandboxSnapshot.self, from: data) } diff --git a/Sources/ContainerClient/SandboxRoutes.swift b/Sources/ContainerClient/SandboxRoutes.swift index e5296623..93948207 100644 --- a/Sources/ContainerClient/SandboxRoutes.swift +++ b/Sources/ContainerClient/SandboxRoutes.swift @@ -15,6 +15,8 @@ //===----------------------------------------------------------------------===// public enum SandboxRoutes: String { + /// Create an xpc endpoint to the sandbox instance. + case createEndpoint = "com.apple.container.sandbox/createEndpoint" /// Bootstrap the sandbox instance and create the init process. case bootstrap = "com.apple.container.sandbox/bootstrap" /// Create a process in the sandbox. @@ -35,4 +37,6 @@ public enum SandboxRoutes: String { case exec = "com.apple.container.sandbox/exec" /// Dial a vsock port in the sandbox. case dial = "com.apple.container.sandbox/dial" + /// Shutdown the sandbox service process. + case shutdown = "com.apple.container.sandbox/shutdown" } diff --git a/Sources/ContainerClient/SandboxSnapshot.swift b/Sources/ContainerClient/SandboxSnapshot.swift index 12978023..7895e94e 100644 --- a/Sources/ContainerClient/SandboxSnapshot.swift +++ b/Sources/ContainerClient/SandboxSnapshot.swift @@ -19,11 +19,11 @@ import ContainerNetworkService /// A snapshot of a sandbox and its resources. public struct SandboxSnapshot: Codable, Sendable { /// The runtime status of the sandbox. - public let status: RuntimeStatus + public var status: RuntimeStatus /// Network attachments for the sandbox. - public let networks: [Attachment] + public var networks: [Attachment] /// Containers placed in the sandbox. - public let containers: [ContainerSnapshot] + public var containers: [ContainerSnapshot] public init( status: RuntimeStatus, diff --git a/Sources/ContainerCommands/Builder/BuilderStart.swift b/Sources/ContainerCommands/Builder/BuilderStart.swift index e03af8fb..e14acb02 100644 --- a/Sources/ContainerCommands/Builder/BuilderStart.swift +++ b/Sources/ContainerCommands/Builder/BuilderStart.swift @@ -251,7 +251,7 @@ extension ClientContainer { defer { try? io.close() } let process = try await bootstrap(stdio: io.stdio) - _ = try await process.start() + try await process.start() await taskManager?.finish() try io.closeAfterStart() diff --git a/Sources/ContainerCommands/Container/ContainerStop.swift b/Sources/ContainerCommands/Container/ContainerStop.swift index 3bfbca8c..7ffa2866 100644 --- a/Sources/ContainerCommands/Container/ContainerStop.swift +++ b/Sources/ContainerCommands/Container/ContainerStop.swift @@ -70,7 +70,10 @@ extension Application { ) let failed = try await Self.stopContainers(containers: containers, stopOptions: opts) if failed.count > 0 { - throw ContainerizationError(.internalError, message: "stop failed for one or more containers \(failed.joined(separator: ","))") + throw ContainerizationError( + .internalError, + message: "stop failed for one or more containers \(failed.joined(separator: ","))" + ) } } diff --git a/Sources/ContainerCommands/RunCommand.swift b/Sources/ContainerCommands/RunCommand.swift index 5fcc47b8..6d5d6503 100644 --- a/Sources/ContainerCommands/RunCommand.swift +++ b/Sources/ContainerCommands/RunCommand.swift @@ -110,7 +110,6 @@ extension Application { ) let detach = self.managementFlags.detach - do { let io = try ProcessIO.create( tty: self.processFlags.tty, diff --git a/Sources/ContainerXPC/XPCClient.swift b/Sources/ContainerXPC/XPCClient.swift index dbe07bce..ebf9de1f 100644 --- a/Sources/ContainerXPC/XPCClient.swift +++ b/Sources/ContainerXPC/XPCClient.swift @@ -18,7 +18,7 @@ import ContainerizationError import Foundation -public struct XPCClient: Sendable { +public final class XPCClient: Sendable { private nonisolated(unsafe) let connection: xpc_connection_t private let q: DispatchQueue? private let service: String @@ -33,6 +33,20 @@ public struct XPCClient: Sendable { xpc_connection_set_target_queue(connection, self.q) xpc_connection_activate(connection) } + + public init(connection: xpc_connection_t, label: String, queue: DispatchQueue? = nil) { + self.connection = connection + self.q = queue + self.service = label + + xpc_connection_set_event_handler(connection) { _ in } + xpc_connection_set_target_queue(connection, self.q) + xpc_connection_activate(connection) + } + + deinit { + self.close() + } } extension XPCClient { @@ -56,7 +70,10 @@ extension XPCClient { group.addTask { try await Task.sleep(for: responseTimeout) let route = message.string(key: XPCMessage.routeKey) ?? "nil" - throw ContainerizationError(.internalError, message: "XPC timeout for request to \(self.service)/\(route)") + throw ContainerizationError( + .internalError, + message: "XPC timeout for request to \(self.service)/\(route)" + ) } } diff --git a/Sources/ContainerXPC/XPCServer.swift b/Sources/ContainerXPC/XPCServer.swift index d496e877..7b02a0f7 100644 --- a/Sources/ContainerXPC/XPCServer.swift +++ b/Sources/ContainerXPC/XPCServer.swift @@ -35,13 +35,20 @@ public struct XPCServer: Sendable { let connection = xpc_connection_create_mach_service( identifier, nil, - UInt64(XPC_CONNECTION_MACH_SERVICE_LISTENER)) + UInt64(XPC_CONNECTION_MACH_SERVICE_LISTENER) + ) self.routes = routes self.connection = connection self.log = log } + public init(connection: xpc_connection_t, routes: [String: RouteHandler], log: Logging.Logger) { + self.routes = routes + self.connection = connection + self.log = log + } + public func listen() async throws { let connections = AsyncStream { cont in lock.withLock { @@ -71,6 +78,7 @@ public struct XPCServer: Sendable { lock.withLock { xpc_connection_activate(self.connection) } + try await withThrowingDiscardingTaskGroup { group in for await conn in connections { // `conn` isn't used concurrently. diff --git a/Sources/Helpers/APIServer/APIServer.swift b/Sources/Helpers/APIServer/APIServer.swift index 844ee1db..b40221b4 100644 --- a/Sources/Helpers/APIServer/APIServer.swift +++ b/Sources/Helpers/APIServer/APIServer.swift @@ -61,7 +61,11 @@ struct APIServer: AsyncParsableCommand { var routes = [XPCRoute: XPCServer.RouteHandler]() let pluginLoader = try initializePluginLoader(log: log) try await initializePlugins(pluginLoader: pluginLoader, log: log, routes: &routes) - let containersService = try initializeContainerService(pluginLoader: pluginLoader, log: log, routes: &routes) + let containersService = try initializeContainerService( + pluginLoader: pluginLoader, + log: log, + routes: &routes + ) let networkService = try await initializeNetworkService( pluginLoader: pluginLoader, containersService: containersService, @@ -177,6 +181,8 @@ struct APIServer: AsyncParsableCommand { log: Logger, routes: inout [XPCRoute: XPCServer.RouteHandler] ) async throws { + log.info("initializing plugins") + let bootPlugins = pluginLoader.findPlugins().filter { $0.shouldBoot } let service = PluginsService(pluginLoader: pluginLoader, log: log) @@ -191,11 +197,15 @@ struct APIServer: AsyncParsableCommand { } private func initializeHealthCheckService(log: Logger, routes: inout [XPCRoute: XPCServer.RouteHandler]) { + log.info("initializing health check service") + let svc = HealthCheckHarness(appRoot: appRoot, installRoot: installRoot, log: log) routes[XPCRoute.ping] = svc.ping } private func initializeKernelService(log: Logger, routes: inout [XPCRoute: XPCServer.RouteHandler]) throws { + log.info("initializing kernel service") + let svc = try KernelService(log: log, appRoot: appRoot) let harness = KernelHarness(service: svc, log: log) routes[XPCRoute.installKernel] = harness.install @@ -203,6 +213,8 @@ struct APIServer: AsyncParsableCommand { } private func initializeContainerService(pluginLoader: PluginLoader, log: Logger, routes: inout [XPCRoute: XPCServer.RouteHandler]) throws -> ContainersService { + log.info("initializing container service") + let service = try ContainersService( appRoot: appRoot, pluginLoader: pluginLoader, @@ -210,11 +222,18 @@ struct APIServer: AsyncParsableCommand { ) let harness = ContainersHarness(service: service, log: log) - routes[XPCRoute.listContainer] = harness.list - routes[XPCRoute.createContainer] = harness.create - routes[XPCRoute.deleteContainer] = harness.delete + routes[XPCRoute.containerList] = harness.list + routes[XPCRoute.containerCreate] = harness.create + routes[XPCRoute.containerDelete] = harness.delete routes[XPCRoute.containerLogs] = harness.logs - routes[XPCRoute.containerEvent] = harness.eventHandler + routes[XPCRoute.containerBootstrap] = harness.bootstrap + routes[XPCRoute.containerDial] = harness.dial + routes[XPCRoute.containerStop] = harness.stop + routes[XPCRoute.containerStartProcess] = harness.startProcess + routes[XPCRoute.containerCreateProcess] = harness.createProcess + routes[XPCRoute.containerResize] = harness.resize + routes[XPCRoute.containerWait] = harness.wait + routes[XPCRoute.containerKill] = harness.kill return service } @@ -225,6 +244,8 @@ struct APIServer: AsyncParsableCommand { log: Logger, routes: inout [XPCRoute: XPCServer.RouteHandler] ) async throws -> NetworksService { + log.info("initializing network service") + let resourceRoot = appRoot.appendingPathComponent("networks") let service = try await NetworksService( pluginLoader: pluginLoader, @@ -249,7 +270,13 @@ struct APIServer: AsyncParsableCommand { return service } - private func initializeVolumeService(containersService: ContainersService, log: Logger, routes: inout [XPCRoute: XPCServer.RouteHandler]) throws { + private func initializeVolumeService( + containersService: ContainersService, + log: Logger, + routes: inout [XPCRoute: XPCServer.RouteHandler] + ) throws { + log.info("initializing volume service") + let resourceRoot = appRoot.appendingPathComponent("volumes") let service = try VolumesService(resourceRoot: resourceRoot, containersService: containersService, log: log) let harness = VolumesHarness(service: service, log: log) diff --git a/Sources/Helpers/RuntimeLinux/RuntimeLinuxHelper.swift b/Sources/Helpers/RuntimeLinux/RuntimeLinuxHelper.swift index 4be61429..9fb5b2d5 100644 --- a/Sources/Helpers/RuntimeLinux/RuntimeLinuxHelper.swift +++ b/Sources/Helpers/RuntimeLinux/RuntimeLinuxHelper.swift @@ -64,15 +64,33 @@ struct RuntimeLinuxHelper: AsyncParsableCommand { signal(SIGPIPE, SIG_IGN) log.info("configuring XPC server") + let interfaceStrategy: any InterfaceStrategy if #available(macOS 26, *) { interfaceStrategy = NonisolatedInterfaceStrategy(log: log) } else { interfaceStrategy = IsolatedInterfaceStrategy() } - let server = SandboxService(root: .init(fileURLWithPath: root), interfaceStrategy: interfaceStrategy, eventLoopGroup: eventLoopGroup, log: log) - let xpc = XPCServer( + + nonisolated(unsafe) let anonymousConnection = xpc_connection_create(nil, nil) + let server = SandboxService( + root: .init(fileURLWithPath: root), + interfaceStrategy: interfaceStrategy, + eventLoopGroup: eventLoopGroup, + connection: anonymousConnection, + log: log + ) + + let endpointServer = XPCServer( identifier: machServiceLabel, + routes: [ + SandboxRoutes.createEndpoint.rawValue: server.createEndpoint + ], + log: log + ) + + let mainServer = XPCServer( + connection: anonymousConnection, routes: [ SandboxRoutes.bootstrap.rawValue: server.bootstrap, SandboxRoutes.createProcess.rawValue: server.createProcess, @@ -83,12 +101,23 @@ struct RuntimeLinuxHelper: AsyncParsableCommand { SandboxRoutes.wait.rawValue: server.wait, SandboxRoutes.start.rawValue: server.startProcess, SandboxRoutes.dial.rawValue: server.dial, + SandboxRoutes.shutdown.rawValue: server.shutdown, ], log: log ) log.info("starting XPC server") - try await xpc.listen() + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await endpointServer.listen() + } + group.addTask { + try await mainServer.listen() + } + defer { group.cancelAll() } + + _ = try await group.next() + } } catch { log.error("\(commandName) failed", metadata: ["error": "\(error)"]) try? await eventLoopGroup.shutdownGracefully() diff --git a/Sources/Services/ContainerAPIService/Containers/ContainersHarness.swift b/Sources/Services/ContainerAPIService/Containers/ContainersHarness.swift index 61e92eec..6e1062da 100644 --- a/Sources/Services/ContainerAPIService/Containers/ContainersHarness.swift +++ b/Sources/Services/ContainerAPIService/Containers/ContainersHarness.swift @@ -41,15 +41,142 @@ public struct ContainersHarness: Sendable { return reply } + @Sendable + public func bootstrap(_ message: XPCMessage) async throws -> XPCMessage { + let id = message.string(key: .id) + guard let id else { + throw ContainerizationError( + .invalidArgument, + message: "id cannot be empty" + ) + } + let stdio = message.stdio() + try await service.bootstrap(id: id, stdio: stdio) + return message.reply() + } + + @Sendable + public func stop(_ message: XPCMessage) async throws -> XPCMessage { + let stopOptions = try message.stopOptions() + let id = message.string(key: .id) + guard let id else { + throw ContainerizationError( + .invalidArgument, + message: "id cannot be empty" + ) + } + try await service.stop(id: id, options: stopOptions) + return message.reply() + } + + @Sendable + public func dial(_ message: XPCMessage) async throws -> XPCMessage { + let id = message.string(key: .id) + guard let id else { + throw ContainerizationError( + .invalidArgument, + message: "id cannot be empty" + ) + } + + let port = message.uint64(key: .port) + let fh = try await service.dial(id: id, port: UInt32(port)) + let reply = message.reply() + reply.setFileHandle(fh) + + return reply + } + + @Sendable + public func wait(_ message: XPCMessage) async throws -> XPCMessage { + let id = message.string(key: .id) + guard let id else { + throw ContainerizationError( + .invalidArgument, + message: "id cannot be empty" + ) + } + let processID = message.string(key: .processIdentifier) + guard let processID else { + throw ContainerizationError( + .invalidArgument, + message: "process ID cannot be empty" + ) + } + + let exitCode = try await service.wait(id: id, processID: processID) + let reply = message.reply() + reply.set(key: .exitCode, value: Int64(exitCode)) + return reply + } + + @Sendable + public func resize(_ message: XPCMessage) async throws -> XPCMessage { + let id = message.string(key: .id) + guard let id else { + throw ContainerizationError( + .invalidArgument, + message: "id cannot be empty" + ) + } + let processID = message.string(key: .processIdentifier) + guard let processID else { + throw ContainerizationError( + .invalidArgument, + message: "process ID cannot be empty" + ) + } + + let width = message.uint64(key: .width) + let height = message.uint64(key: .height) + try await service.resize( + id: id, + processID: processID, + size: Terminal.Size(width: UInt16(width), height: UInt16(height)) + ) + + return message.reply() + } + + @Sendable + public func kill(_ message: XPCMessage) async throws -> XPCMessage { + let id = message.string(key: .id) + guard let id else { + throw ContainerizationError( + .invalidArgument, + message: "id cannot be empty" + ) + } + let processID = message.string(key: .processIdentifier) + guard let processID else { + throw ContainerizationError( + .invalidArgument, + message: "process ID cannot be empty" + ) + } + try await service.kill( + id: id, + processID: processID, + signal: try message.signal() + ) + return message.reply() + } + @Sendable public func create(_ message: XPCMessage) async throws -> XPCMessage { let data = message.dataNoCopy(key: .containerConfig) guard let data else { - throw ContainerizationError(.invalidArgument, message: "container configuration cannot be empty") + throw ContainerizationError( + .invalidArgument, + message: "container configuration cannot be empty" + ) } let kdata = message.dataNoCopy(key: .kernel) guard let kdata else { - throw ContainerizationError(.invalidArgument, message: "kernel cannot be empty") + throw ContainerizationError( + .invalidArgument, + message: "kernel cannot be empty" + ) } let odata = message.dataNoCopy(key: .containerOptions) var options: ContainerCreateOptions = .default @@ -63,6 +190,60 @@ public struct ContainersHarness: Sendable { return message.reply() } + @Sendable + public func createProcess(_ message: XPCMessage) async throws -> XPCMessage { + let id = message.string(key: .id) + guard let id else { + throw ContainerizationError( + .invalidArgument, + message: "id cannot be empty" + ) + } + let processID = message.string(key: .processIdentifier) + guard let processID else { + throw ContainerizationError( + .invalidArgument, + message: "process ID cannot be empty" + ) + } + let config = try message.processConfig() + let stdio = message.stdio() + + try await service.createProcess( + id: id, + processID: processID, + config: config, + stdio: stdio + ) + + return message.reply() + } + + @Sendable + public func startProcess(_ message: XPCMessage) async throws -> XPCMessage { + let id = message.string(key: .id) + guard let id else { + throw ContainerizationError( + .invalidArgument, + message: "id cannot be empty" + ) + } + let processID = message.string(key: .processIdentifier) + guard let processID else { + throw ContainerizationError( + .invalidArgument, + message: "process ID cannot be empty" + ) + } + + try await service.startProcess( + id: id, + processID: processID, + ) + + return message.reply() + } + @Sendable public func delete(_ message: XPCMessage) async throws -> XPCMessage { let id = message.string(key: .id) @@ -88,21 +269,4 @@ public struct ContainersHarness: Sendable { try reply.set(key: .logs, value: fds) return reply } - - @Sendable - public func eventHandler(_ message: XPCMessage) async throws -> XPCMessage { - let event = try message.containerEvent() - try await service.handleContainerEvents(event: event) - return message.reply() - } -} - -extension XPCMessage { - public func containerEvent() throws -> ContainerEvent { - guard let data = self.dataNoCopy(key: .containerEvent) else { - throw ContainerizationError(.invalidArgument, message: "Missing container event data") - } - let event = try JSONDecoder().decode(ContainerEvent.self, from: data) - return event - } } diff --git a/Sources/Services/ContainerAPIService/Containers/ContainersService.swift b/Sources/Services/ContainerAPIService/Containers/ContainersService.swift index acb806f1..86f361d3 100644 --- a/Sources/Services/ContainerAPIService/Containers/ContainersService.swift +++ b/Sources/Services/ContainerAPIService/Containers/ContainersService.swift @@ -18,6 +18,7 @@ import CVersion import ContainerClient import ContainerPlugin import ContainerSandboxService +import ContainerXPC import Containerization import ContainerizationError import ContainerizationExtras @@ -27,6 +28,22 @@ import Foundation import Logging public actor ContainersService { + struct ContainerState { + var snapshot: ContainerSnapshot + var client: SandboxClient? + + func getClient() throws -> SandboxClient { + guard let client else { + var message = "no sandbox client exists" + if snapshot.status == .stopped { + message += ": container is stopped" + } + throw ContainerizationError(.invalidState, message: message) + } + return client + } + } + private static let machServicePrefix = "com.apple.container" private static let launchdDomainString = try! ServiceManager.getDomainString() @@ -34,13 +51,15 @@ public actor ContainersService { private let containerRoot: URL private let pluginLoader: PluginLoader private let runtimePlugins: [Plugin] + private let exitMonitor: ExitMonitor private let lock = AsyncLock() - private var containers: [String: ContainerSnapshot] + private var containers: [String: ContainerState] public init(appRoot: URL, pluginLoader: PluginLoader, log: Logger) throws { let containerRoot = appRoot.appendingPathComponent("containers") try FileManager.default.createDirectory(at: containerRoot, withIntermediateDirectories: true) + self.exitMonitor = ExitMonitor(log: log) self.containerRoot = containerRoot self.pluginLoader = pluginLoader self.log = log @@ -48,7 +67,7 @@ public actor ContainersService { self.containers = try Self.loadAtBoot(root: containerRoot, loader: pluginLoader, log: log) } - static func loadAtBoot(root: URL, loader: PluginLoader, log: Logger) throws -> [String: ContainerSnapshot] { + static func loadAtBoot(root: URL, loader: PluginLoader, log: Logger) throws -> [String: ContainerState] { var directories = try FileManager.default.contentsOfDirectory( at: root, includingPropertiesForKeys: [.isDirectoryKey] @@ -58,17 +77,32 @@ public actor ContainersService { } let runtimePlugins = loader.findPlugins().filter { $0.hasType(.runtime) } - var results = [String: ContainerSnapshot]() + var results = [String: ContainerState]() for dir in directories { do { let bundle = ContainerClient.Bundle(path: dir) let config = try bundle.configuration - results[config.id] = .init(configuration: config, status: .stopped, networks: []) + let state = ContainerState( + snapshot: .init( + configuration: config, + status: .stopped, + networks: [] + ) + ) + results[config.id] = state let plugin = runtimePlugins.first { $0.name == config.runtimeHandler } guard let plugin else { - throw ContainerizationError(.internalError, message: "Failed to find runtime plugin \(config.runtimeHandler)") + throw ContainerizationError( + .internalError, + message: "Failed to find runtime plugin \(config.runtimeHandler)" + ) } - try Self.registerService(plugin: plugin, loader: loader, configuration: config, path: dir) + try Self.registerService( + plugin: plugin, + loader: loader, + configuration: config, + path: dir + ) } catch { try? FileManager.default.removeItem(at: dir) log.warning("failed to load container bundle at \(dir.path)") @@ -77,21 +111,17 @@ public actor ContainersService { return results } - private func setContainer(_ id: String, _ item: ContainerSnapshot, context: AsyncLock.Context) async { - self.containers[id] = item - } - /// List all containers registered with the service. public func list() async throws -> [ContainerSnapshot] { self.log.debug("\(#function)") - return Array(self.containers.values) + return self.containers.values.map { $0.snapshot } } /// Execute an operation with the current container list while maintaining atomicity /// This prevents race conditions where containers are created during the operation public func withContainerList(_ operation: @Sendable @escaping ([ContainerSnapshot]) async throws -> T) async throws -> T { try await lock.withLock { context in - let snapshots = Array(await self.containers.values) + let snapshots = await self.containers.values.map { $0.snapshot } return try await operation(snapshots) } } @@ -101,12 +131,15 @@ public actor ContainersService { self.log.debug("\(#function)") guard containers[configuration.id] == nil else { - throw ContainerizationError(.exists, message: "container already exists: \(configuration.id)") + throw ContainerizationError( + .exists, + message: "container already exists: \(configuration.id)" + ) } var allHostnames = Set() for container in containers.values { - for attachmentConfiguration in container.configuration.networks { + for attachmentConfiguration in container.snapshot.configuration.networks { allHostnames.insert(attachmentConfiguration.options.hostname) } } @@ -119,16 +152,20 @@ public actor ContainersService { } guard conflictingHostnames.isEmpty else { - throw ContainerizationError(.exists, message: "hostname(s) already exist: \(conflictingHostnames)") + throw ContainerizationError( + .exists, + message: "hostname(s) already exist: \(conflictingHostnames)" + ) } - self.containers[configuration.id] = ContainerSnapshot(configuration: configuration, status: .stopped, networks: []) - let runtimePlugin = self.runtimePlugins.filter { $0.name == configuration.runtimeHandler }.first guard let runtimePlugin else { - throw ContainerizationError(.notFound, message: "unable to locate runtime plugin \(configuration.runtimeHandler)") + throw ContainerizationError( + .notFound, + message: "unable to locate runtime plugin \(configuration.runtimeHandler)" + ) } let path = self.containerRoot.appendingPathComponent(configuration.id) @@ -153,6 +190,13 @@ public actor ContainersService { configuration: configuration, path: path ) + + let snapshot = ContainerSnapshot( + configuration: configuration, + status: .stopped, + networks: [] + ) + self.containers[configuration.id] = ContainerState(snapshot: snapshot) } catch { do { try bundle.delete() @@ -163,80 +207,264 @@ public actor ContainersService { } } - private func getInitBlock(for platform: Platform) async throws -> Filesystem { - let initImage = try await ClientImage.fetch(reference: ClientImage.initImageRef, platform: platform) - var fs = try await initImage.getCreateSnapshot(platform: platform) - fs.options = ["ro"] - return fs + /// Bootstrap the init process of the container. + public func bootstrap(id: String, stdio: [FileHandle?]) async throws { + self.log.debug("\(#function)") + do { + try await self.lock.withLock { context in + var state = try await self.getContainerState(id: id, context: context) + let runtime = state.snapshot.configuration.runtimeHandler + let sandboxClient = try await SandboxClient.create( + id: id, + runtime: runtime + ) + try await sandboxClient.bootstrap(stdio: stdio) + + try await self.exitMonitor.registerProcess( + id: id, + onExit: self.handleContainerExit + ) + + state.client = sandboxClient + await self.setContainerState(id, state, context: context) + } + } catch { + do { + try await _cleanup(id: id) + } catch { + self.log.error("failed to cleanup container \(id) after bootstrap failure: \(error)") + } + throw error + } } - private static func registerService( - plugin: Plugin, - loader: PluginLoader, - configuration: ContainerConfiguration, - path: URL - ) throws { - let args = [ - "--root", path.path, - "--uuid", configuration.id, - "--debug", - ] - try loader.registerWithLaunchd( - plugin: plugin, - pluginStateRoot: path, - args: args, - instanceId: configuration.id - ) + /// Create a new process in the container. + public func createProcess( + id: String, + processID: String, + config: ProcessConfiguration, + stdio: [FileHandle?] + ) async throws { + self.log.debug("\(#function)") + + let state = try self._getContainerState(id: id) + do { + let client = try state.getClient() + try await client.createProcess( + processID, + config: config, + stdio: stdio + ) + } catch { + do { + try await _cleanup(id: id) + } catch { + self.log.error("failed to cleanup container \(id) after start failure: \(error)") + } + throw error + } + } + + /// Start a process in a container. This can either be a process created via + /// createProcess, or the init process of the container which requires + /// id == processID. + public func startProcess(id: String, processID: String) async throws { + self.log.debug("\(#function)") + + do { + try await self.lock.withLock { context in + var state = try await self.getContainerState(id: id, context: context) + + let isInit = Self.isInitProcess(id: id, processID: processID) + if state.snapshot.status == .running && isInit { + return + } + + let client = try state.getClient() + try await client.startProcess(processID) + + if isInit { + let log = self.log + let waitFunc: ExitMonitor.WaitHandler = { + log.info("registering container \(id) with exit monitor") + let code = try await client.wait(id) + log.info("container \(id) finished in exit monitor") + + return code + } + try await self.exitMonitor.track(id: id, waitingOn: waitFunc) + + let sandboxSnapshot = try await client.state() + state.snapshot.status = .running + state.snapshot.networks = sandboxSnapshot.networks + await self.setContainerState(id, state, context: context) + } + } + } catch { + do { + try await _cleanup(id: id) + } catch { + self.log.error("failed to cleanup container \(id) after start failure: \(error)") + } + throw error + } } - private func get(id: String, context: AsyncLock.Context) throws -> ContainerSnapshot { - try self._get(id: id) + /// Send a signal to the container. + public func kill(id: String, processID: String, signal: Int64) async throws { + self.log.debug("\(#function)") + + let state = try self._getContainerState(id: id) + let client = try state.getClient() + try await client.kill(processID, signal: signal) } - private func _get(id: String) throws -> ContainerSnapshot { - let item = self.containers[id] - guard let item else { + /// Stop all containers inside the sandbox, aborting any processes currently + /// executing inside the container, before stopping the underlying sandbox. + public func stop(id: String, options: ContainerStopOptions) async throws { + self.log.debug("\(#function)") + + let state = try self._getContainerState(id: id) + + // Stop should be idempotent. + let client: SandboxClient + do { + client = try state.getClient() + } catch { + return + } + + do { + try await client.stop(options: options) + } catch let err as ContainerizationError { + if err.code != .interrupted { + throw err + } + } + try await handleContainerExit(id: id) + } + + public func dial(id: String, port: UInt32) async throws -> FileHandle { + let state = try self._getContainerState(id: id) + let client = try state.getClient() + return try await client.dial(port) + } + + /// Wait waits for the container's init process or exec to exit and returns the + /// exit status. + public func wait(id: String, processID: String) async throws -> Int32 { + self.log.debug("\(#function)") + + let state = try self._getContainerState(id: id) + let client = try state.getClient() + return try await client.wait(processID) + } + + /// Resize resizes the container's PTY if one exists. + public func resize(id: String, processID: String, size: Terminal.Size) async throws { + self.log.debug("\(#function)") + + let state = try self._getContainerState(id: id) + let client = try state.getClient() + try await client.resize(processID, size: size) + } + + // Get the logs for the container. + public func logs(id: String) async throws -> [FileHandle] { + self.log.debug("\(#function)") + + // Logs doesn't care if the container is running or not, just that + // the bundle is there, and that the files actually exist. + do { + let path = self.containerRoot.appendingPathComponent(id) + let bundle = ContainerClient.Bundle(path: path) + return [ + try FileHandle(forReadingFrom: bundle.containerLog), + try FileHandle(forReadingFrom: bundle.bootlog), + ] + } catch { throw ContainerizationError( - .notFound, - message: "container with ID \(id) not found" + .internalError, + message: "failed to open container logs: \(error)" ) } - return item } /// Delete a container and its resources. public func delete(id: String, force: Bool) async throws { self.log.debug("\(#function)") - let item = try self._get(id: id) - switch item.status { + let state = try self._getContainerState(id: id) + switch state.snapshot.status { case .running: if !force { throw ContainerizationError( .invalidState, - message: "container \(id) is \(item.status) and can not be deleted" + message: "container \(id) is \(state.snapshot.status) and can not be deleted" ) } - let autoRemove = try getContainerCreationOptions(id: id).autoRemove let opts = ContainerStopOptions( timeoutInSeconds: 5, signal: SIGKILL ) - try await self._stop( - id: id, - runtimeHandler: item.configuration.runtimeHandler, - options: opts - ) - if autoRemove { - return + let client = try state.getClient() + try await client.stop(options: opts) + try await self.lock.withLock { context in + try await self.cleanup(id: id, context: context) } - try self._cleanup(id: id, item: item) case .stopping: throw ContainerizationError( .invalidState, - message: "container \(id) is \(item.status) and can not be deleted" + message: "container \(id) is \(state.snapshot.status) and can not be deleted" ) default: - try self._cleanup(id: id, item: item) + try await self.lock.withLock { context in + try await self.cleanup(id: id, context: context) + } + } + } + + private func handleContainerExit(id: String, code: Int32? = nil) async throws { + try await self.lock.withLock { [self] context in + try await handleContainerExit(id: id, code: code, context: context) + } + } + + private func handleContainerExit(id: String, code: Int32?, context: AsyncLock.Context) async throws { + if let code { + self.log.info("Handling container \(id) exit. Code \(code)") + } + + var state: ContainerState + do { + state = try self.getContainerState(id: id, context: context) + if state.snapshot.status == .stopped { + return + } + } catch { + // Was auto removed by the background thread, nothing for us to do. + return + } + + await self.exitMonitor.stopTracking(id: id) + + // Try and shutdown the runtime helper. + do { + self.log.info("Shutting down sandbox service for \(id)") + + let client = try state.getClient() + try await client.shutdown() + } catch { + self.log.error("failed to shutdown sandbox service for \(id): \(error)") + } + + state.snapshot.status = .stopped + state.snapshot.networks = [] + state.client = nil + await self.setContainerState(id, state, context: context) + + let options = try getContainerCreationOptions(id: id) + if options.autoRemove { + try await self.cleanup(id: id, context: context) } } @@ -244,30 +472,33 @@ public actor ContainersService { "\(Self.launchdDomainString)/\(Self.machServicePrefix).\(runtimeName).\(instanceId)" } - private func _cleanup(id: String, item: ContainerSnapshot) throws { + private func _cleanup(id: String) async throws { self.log.debug("\(#function)") + // Did the exit container handler win? + if self.containers[id] == nil { + return + } + + // To be pedantic. This is only needed if something in the "launch + // the init process" lifecycle fails before actually fork+exec'ing + // the OCI runtime. + await self.exitMonitor.stopTracking(id: id) let path = self.containerRoot.appendingPathComponent(id) let bundle = ContainerClient.Bundle(path: path) let config = try bundle.configuration - let label = Self.fullLaunchdServiceLabel(runtimeName: config.runtimeHandler, instanceId: id) + let label = Self.fullLaunchdServiceLabel( + runtimeName: config.runtimeHandler, + instanceId: id + ) try ServiceManager.deregister(fullServiceLabel: label) try bundle.delete() self.containers.removeValue(forKey: id) } - private func _shutdown(id: String, item: ContainerSnapshot) throws { - let path = self.containerRoot.appendingPathComponent(id) - let bundle = ContainerClient.Bundle(path: path) - let config = try bundle.configuration - - let label = Self.fullLaunchdServiceLabel(runtimeName: config.runtimeHandler, instanceId: id) - try ServiceManager.kill(fullServiceLabel: label) - } - - private func cleanup(id: String, item: ContainerSnapshot, context: AsyncLock.Context) throws { - try self._cleanup(id: id, item: item) + private func cleanup(id: String, context: AsyncLock.Context) async throws { + try await self._cleanup(id: id) } private func getContainerCreationOptions(id: String) throws -> ContainerCreateOptions { @@ -277,104 +508,95 @@ public actor ContainersService { return options } - private func containerProcessExitHandler(_ id: String, _ exitCode: Int32, context: AsyncLock.Context) async { - self.log.info("Handling container \(id) exit. Code \(exitCode)") - do { - let item = try self.get(id: id, context: context) - let snapshot = ContainerSnapshot(configuration: item.configuration, status: .stopped, networks: []) - await self.setContainer(id, snapshot, context: context) + private func getInitBlock(for platform: Platform) async throws -> Filesystem { + let initImage = try await ClientImage.fetch(reference: ClientImage.initImageRef, platform: platform) + var fs = try await initImage.getCreateSnapshot(platform: platform) + fs.options = ["ro"] + return fs + } - let options = try getContainerCreationOptions(id: id) - if options.autoRemove { - try self.cleanup(id: id, item: item, context: context) - } - } catch { - self.log.error( - "Failed to handle container exit", - metadata: [ - "id": .string(id), - "error": .string(String(describing: error)), - ]) - } + private static func registerService( + plugin: Plugin, + loader: PluginLoader, + configuration: ContainerConfiguration, + path: URL + ) throws { + let args = [ + "--root", path.path, + "--uuid", configuration.id, + "--debug", + ] + try loader.registerWithLaunchd( + plugin: plugin, + pluginStateRoot: path, + args: args, + instanceId: configuration.id + ) } - private func containerStartHandler(_ id: String, context: AsyncLock.Context) async throws { - self.log.debug("\(#function)") - self.log.info("Handling container \(id) Start.") - do { - let currentSnapshot = try self.get(id: id, context: context) - let client = SandboxClient(id: currentSnapshot.configuration.id, runtime: currentSnapshot.configuration.runtimeHandler) - let sandboxSnapshot = try await client.state() - let snapshot = ContainerSnapshot(configuration: currentSnapshot.configuration, status: .running, networks: sandboxSnapshot.networks) - await self.setContainer(id, snapshot, context: context) - } catch { - self.log.error( - "Failed to handle container start", - metadata: [ - "id": .string(id), - "error": .string(String(describing: error)), - ]) + private func setContainerState(_ id: String, _ state: ContainerState, context: AsyncLock.Context) async { + self.containers[id] = state + } + + private func getContainerState(id: String, context: AsyncLock.Context) throws -> ContainerState { + try self._getContainerState(id: id) + } + + private func _getContainerState(id: String) throws -> ContainerState { + let state = self.containers[id] + guard let state else { + throw ContainerizationError( + .notFound, + message: "container with ID \(id) not found" + ) } + return state + } + + private static func isInitProcess(id: String, processID: String) -> Bool { + id == processID } } -extension ContainersService { - public func handleContainerEvents(event: ContainerEvent) async throws { - self.log.debug("\(#function)") - try await self.lock.withLock { context in - switch event { - case .containerExit(let id, let code): - await self.containerProcessExitHandler(id, Int32(code), context: context) - case .containerStart(let id): - try await self.containerStartHandler(id, context: context) - } - } +extension XPCMessage { + func signal() throws -> Int64 { + self.int64(key: .signal) } - /// Stop all containers inside the sandbox, aborting any processes currently - /// executing inside the container, before stopping the underlying sandbox. - public func stop(id: String, options: ContainerStopOptions) async throws { - self.log.debug("\(#function)") - try await lock.withLock { context in - let item = try await self.get(id: id, context: context) - switch item.status { - case .running: - try await self._stop( - id: id, - runtimeHandler: item.configuration.runtimeHandler, - options: options - ) - default: - return - } + func stopOptions() throws -> ContainerStopOptions { + guard let data = self.dataNoCopy(key: .stopOptions) else { + throw ContainerizationError(.invalidArgument, message: "empty StopOptions") } + return try JSONDecoder().decode(ContainerStopOptions.self, from: data) } - private func _stop(id: String, runtimeHandler: String, options: ContainerStopOptions) async throws { - let client = SandboxClient( - id: id, - runtime: runtimeHandler - ) - try await client.stop(options: options) + func setState(_ state: SandboxSnapshot) throws { + let data = try JSONEncoder().encode(state) + self.set(key: .snapshot, value: data) } - public func logs(id: String) async throws -> [FileHandle] { - self.log.debug("\(#function)") - // Logs doesn't care if the container is running or not, just that - // the bundle is there, and that the files actually exist. - do { - let path = self.containerRoot.appendingPathComponent(id) - let bundle = ContainerClient.Bundle(path: path) - return [ - try FileHandle(forReadingFrom: bundle.containerLog), - try FileHandle(forReadingFrom: bundle.bootlog), - ] - } catch { - throw ContainerizationError( - .internalError, - message: "failed to open container logs: \(error)" - ) + func stdio() -> [FileHandle?] { + var handles = [FileHandle?](repeating: nil, count: 3) + if let stdin = self.fileHandle(key: .stdin) { + handles[0] = stdin + } + if let stdout = self.fileHandle(key: .stdout) { + handles[1] = stdout } + if let stderr = self.fileHandle(key: .stderr) { + handles[2] = stderr + } + return handles } + func setFileHandle(_ handle: FileHandle) { + self.set(key: .fd, value: handle) + } + + func processConfig() throws -> ProcessConfiguration { + guard let data = self.dataNoCopy(key: .processConfig) else { + throw ContainerizationError(.invalidArgument, message: "empty process configuration") + } + return try JSONDecoder().decode(ProcessConfiguration.self, from: data) + } } diff --git a/Sources/Services/ContainerAPIService/Networks/NetworksService.swift b/Sources/Services/ContainerAPIService/Networks/NetworksService.swift index 6416782c..015e0440 100644 --- a/Sources/Services/ContainerAPIService/Networks/NetworksService.swift +++ b/Sources/Services/ContainerAPIService/Networks/NetworksService.swift @@ -48,7 +48,11 @@ public actor NetworksService { self.log = log try FileManager.default.createDirectory(at: resourceRoot, withIntermediateDirectories: true) - self.store = try FilesystemEntityStore(path: resourceRoot, type: "network", log: log) + self.store = try FilesystemEntityStore( + path: resourceRoot, + type: "network", + log: log + ) let networkPlugin = pluginLoader diff --git a/Sources/Services/ContainerNetworkService/NetworkClient.swift b/Sources/Services/ContainerNetworkService/NetworkClient.swift index 80645e56..7be2284f 100644 --- a/Sources/Services/ContainerNetworkService/NetworkClient.swift +++ b/Sources/Services/ContainerNetworkService/NetworkClient.swift @@ -40,7 +40,6 @@ extension NetworkClient { public func state() async throws -> NetworkState { let request = XPCMessage(route: NetworkRoutes.state.rawValue) let client = createClient() - defer { client.close() } let response = try await client.send(request) let state = try response.state() @@ -52,7 +51,6 @@ extension NetworkClient { request.set(key: NetworkKeys.hostname.rawValue, value: hostname) let client = createClient() - defer { client.close() } let response = try await client.send(request) let attachment = try response.attachment() @@ -65,7 +63,6 @@ extension NetworkClient { request.set(key: NetworkKeys.hostname.rawValue, value: hostname) let client = createClient() - defer { client.close() } try await client.send(request) } @@ -74,7 +71,6 @@ extension NetworkClient { request.set(key: NetworkKeys.hostname.rawValue, value: hostname) let client = createClient() - defer { client.close() } let response = try await client.send(request) return try response.dataNoCopy(key: NetworkKeys.attachment.rawValue).map { @@ -86,7 +82,6 @@ extension NetworkClient { let request = XPCMessage(route: NetworkRoutes.disableAllocator.rawValue) let client = createClient() - defer { client.close() } let response = try await client.send(request) return try response.allocatorDisabled() diff --git a/Sources/Services/ContainerSandboxService/SandboxService.swift b/Sources/Services/ContainerSandboxService/SandboxService.swift index ef9bb909..555af28a 100644 --- a/Sources/Services/ContainerSandboxService/SandboxService.swift +++ b/Sources/Services/ContainerSandboxService/SandboxService.swift @@ -14,8 +14,6 @@ // limitations under the License. //===----------------------------------------------------------------------===// -// - import ContainerClient import ContainerNetworkService import ContainerPersistence @@ -37,6 +35,7 @@ import struct ContainerizationOCI.Process /// An XPC service that manages the lifecycle of a single VM-backed container. public actor SandboxService { + private let connection: xpc_connection_t private let root: URL private let interfaceStrategy: InterfaceStrategy private var container: ContainerInfo? @@ -66,12 +65,36 @@ public actor SandboxService { /// - interfaceStrategy: The strategy for producing network interface /// objects for each network to which the container attaches. /// - log: The destination for log messages. - public init(root: URL, interfaceStrategy: InterfaceStrategy, eventLoopGroup: any EventLoopGroup, log: Logger) { + public init( + root: URL, + interfaceStrategy: InterfaceStrategy, + eventLoopGroup: any EventLoopGroup, + connection: xpc_connection_t, + log: Logger + ) { self.root = root self.interfaceStrategy = interfaceStrategy self.log = log self.monitor = ExitMonitor(log: log) self.eventLoopGroup = eventLoopGroup + self.connection = connection + } + + /// Returns an endpoint from an anonymous xpc connection. + /// + /// - Parameters: + /// - message: An XPC message with no parameters. + /// + /// - Returns: An XPC message with the following parameters: + /// - endpoint: An XPC endpoint that can be used to communicate + /// with the sandbox service. + @Sendable + public func createEndpoint(_ message: XPCMessage) async throws -> XPCMessage { + self.log.info("`createEndpoint` xpc handler") + let endpoint = xpc_endpoint_create(self.connection) + let reply = message.reply() + reply.set(key: .sandboxServiceEndpoint, value: endpoint) + return reply } /// Start the VM and the guest agent process for a container. @@ -217,7 +240,7 @@ public actor SandboxService { /// - Returns: An XPC message with no parameters. @Sendable public func startProcess(_ message: XPCMessage) async throws -> XPCMessage { - self.log.info("`start` xpc handler") + self.log.info("`startProcess` xpc handler") return try await self.lock.withLock { lock in let id = try message.id() let containerInfo = try await self.getContainer() @@ -225,7 +248,6 @@ public actor SandboxService { if id == containerId { try await self.startInitProcess(lock: lock) await self.setState(.running) - try await self.sendContainerEvent(.containerStart(id: id)) } else { try await self.startExecProcess(processId: id, lock: lock) } @@ -233,119 +255,37 @@ public actor SandboxService { } } - private func startInitProcess(lock: AsyncLock.Context) async throws { - let info = try self.getContainer() - let container = info.container - let id = container.id - - guard self.state == .booted else { - throw ContainerizationError( - .invalidState, - message: "container expected to be in booted state, got: \(self.state)" - ) - } - - self.setState(.starting) - do { - let io = info.io - - try await container.start() - let waitFunc: ExitMonitor.WaitHandler = { - let code = try await container.wait() - if let out = io.out { - try out.close() - } - if let err = io.err { - try err.close() - } - return code - } - try await self.monitor.track(id: id, waitingOn: waitFunc) - } catch { - try? await self.cleanupContainer() - self.setState(.created) - try await self.sendContainerEvent(.containerExit(id: id, exitCode: -1)) - throw error - } - } - - private func startExecProcess(processId id: String, lock: AsyncLock.Context) async throws { - let container = try self.getContainer().container - guard let processInfo = self.processes[id] else { - throw ContainerizationError(.notFound, message: "Process with id \(id)") - } - - let containerInfo = try self.getContainer() - - let czConfig = self.configureProcessConfig(config: processInfo.config, stdio: processInfo.io, containerConfig: containerInfo.config) - - let process = try await container.exec(id, configuration: czConfig) - try self.setUnderlyingProcess(id, process) - - try await process.start() + /// Shutdown the SandboxService. + /// + /// - Parameters: + /// - message: An XPC message with no parameters. + /// + /// - Returns: An XPC message with no parameters. + @Sendable + public func shutdown(_ message: XPCMessage) async throws -> XPCMessage { + self.log.info("`shutdown` xpc handler") - let waitFunc: ExitMonitor.WaitHandler = { - let code = try await process.wait() - if let out = processInfo.io[1] { - try self.closeHandle(out.fileDescriptor) - } - if let err = processInfo.io[2] { - try self.closeHandle(err.fileDescriptor) - } - return code - } - try await self.monitor.track(id: id, waitingOn: waitFunc) - } + switch self.state { + case .created, .stopped(_), .stopping: + self.state = .shuttingDown - private func startSocketForwarders(containerIpAddress: String, publishedPorts: [PublishPort]) async throws { - var forwarders: [SocketForwarderResult] = [] - try await withThrowingTaskGroup(of: SocketForwarderResult.self) { group in - for publishedPort in publishedPorts { - let proxyAddress = try SocketAddress(ipAddress: publishedPort.hostAddress, port: Int(publishedPort.hostPort)) - let serverAddress = try SocketAddress(ipAddress: containerIpAddress, port: Int(publishedPort.containerPort)) - log.info( - "creating forwarder for", - metadata: [ - "proxy": "\(proxyAddress)", - "server": "\(serverAddress)", - "protocol": "\(publishedPort.proto)", - ]) - group.addTask { - let forwarder: SocketForwarder - switch publishedPort.proto { - case .tcp: - forwarder = try TCPForwarder( - proxyAddress: proxyAddress, - serverAddress: serverAddress, - eventLoopGroup: self.eventLoopGroup, - log: self.log - ) - case .udp: - forwarder = try UDPForwarder( - proxyAddress: proxyAddress, - serverAddress: serverAddress, - eventLoopGroup: self.eventLoopGroup, - log: self.log - ) - } - return try await forwarder.run().get() + Task { + do { + try await Task.sleep(for: .seconds(5)) + } catch { + self.log.error("failed to sleep before shutting down SandboxService: \(error)") } + self.log.info("Shutting down SandboxService") + exit(0) } - for try await result in group { - forwarders.append(result) - } + default: + throw ContainerizationError( + .invalidState, + message: "cannot shutdown: container is not stopped" + ) } - self.socketForwarders = forwarders - } - - private func stopSocketForwarders() async { - log.info("closing forwarders") - for forwarder in self.socketForwarders { - forwarder.close() - try? await forwarder.wait() - } - log.info("closed forwarders") + return message.reply() } /// Create a process inside the virtual machine for the container. @@ -365,17 +305,12 @@ public actor SandboxService { log.info("`createProcess` xpc handler") return try await self.lock.withLock { [self] _ in switch await self.state { - case .created, .stopped(_), .starting, .stopping: - throw ContainerizationError( - .invalidState, - message: "cannot exec: container is not running" - ) case .running, .booted: let id = try message.id() let config = try message.processConfig() let stdio = message.stdio() - await self.addNewProcess(id, config, stdio) + try await self.addNewProcess(id, config, stdio) try await self.monitor.registerProcess( id: id, @@ -396,6 +331,11 @@ public actor SandboxService { ) return message.reply() + default: + throw ContainerizationError( + .invalidState, + message: "cannot exec: container is not running" + ) } } } @@ -416,7 +356,7 @@ public actor SandboxService { var cs: ContainerSnapshot? switch state { - case .created, .stopped(_), .starting, .booted: + case .created, .stopped(_), .booted, .shuttingDown: status = .stopped case .stopping: status = .stopping @@ -455,36 +395,32 @@ public actor SandboxService { @Sendable public func stop(_ message: XPCMessage) async throws -> XPCMessage { self.log.info("`stop` xpc handler") - let reply = try await self.lock.withLock { [self] _ in - switch await self.state { - case .stopped(_), .created, .stopping: - return message.reply() - case .starting: - throw ContainerizationError( - .invalidState, - message: "cannot stop: container is not running" - ) - case .running, .booted: - let ctr = try await getContainer() - let stopOptions = try message.stopOptions() + switch self.state { + case .running, .booted: + self.state = .stopping + + let ctr = try getContainer() + let stopOptions = try message.stopOptions() + let code = try await gracefulStopContainer( + ctr.container, + stopOpts: stopOptions + ) + + await self.lock.withLock { _ in do { - try await gracefulStopContainer( - ctr.container, - stopOpts: stopOptions - ) + if case .stopped(_) = await self.state { + return + } + try await self.cleanupContainer() } catch { - log.notice("failed to stop sandbox gracefully: \(error)") + self.log.error("failed to cleanup container: \(error)") } - await setState(.stopping) - return message.reply() + await self.setState(.stopped(code)) } + default: + break } - do { - try await cleanupContainer() - } catch { - self.log.error("failed to cleanup container: \(error)") - } - return reply + return message.reply() } /// Signal a process running in the virtual machine. @@ -500,11 +436,6 @@ public actor SandboxService { self.log.info("`kill` xpc handler") return try await self.lock.withLock { [self] _ in switch await self.state { - case .created, .stopped, .starting, .booted, .stopping: - throw ContainerizationError( - .invalidState, - message: "cannot kill: container is not running" - ) case .running: let ctr = try await getContainer() let id = try message.id() @@ -523,6 +454,11 @@ public actor SandboxService { // TODO: fix underlying signal value to int64 try await ctr.container.kill(Int32(try message.signal())) return message.reply() + default: + throw ContainerizationError( + .invalidState, + message: "cannot kill: container is not running" + ) } } } @@ -539,34 +475,47 @@ public actor SandboxService { @Sendable public func resize(_ message: XPCMessage) async throws -> XPCMessage { self.log.info("`resize` xpc handler") - return try await self.lock.withLock { [self] _ in - switch await self.state { - case .created, .stopped, .starting, .booted, .stopping: - throw ContainerizationError( - .invalidState, - message: "cannot resize: container is not running" - ) - case .running: - let id = try message.id() - let ctr = try await getContainer() - let width = message.uint64(key: .width) - let height = message.uint64(key: .height) - if id != ctr.container.id { - guard let processInfo = await self.processes[id] else { - throw ContainerizationError(.invalidState, message: "Process \(id) does not exist") - } - - guard let proc = processInfo.process else { - throw ContainerizationError(.invalidState, message: "Process \(id) not started") - } + switch self.state { + case .running: + let id = try message.id() + let ctr = try getContainer() + let width = message.uint64(key: .width) + let height = message.uint64(key: .height) + + if id != ctr.container.id { + guard let processInfo = self.processes[id] else { + throw ContainerizationError( + .invalidState, + message: "Process \(id) does not exist" + ) + } - try await proc.resize(to: .init(width: UInt16(width), height: UInt16(height))) - return message.reply() + guard let proc = processInfo.process else { + throw ContainerizationError( + .invalidState, + message: "Process \(id) not started" + ) } - try await ctr.container.resize(to: .init(width: UInt16(width), height: UInt16(height))) - return message.reply() + try await proc.resize( + to: .init( + width: UInt16(width), + height: UInt16(height)) + ) + } else { + try await ctr.container.resize( + to: .init( + width: UInt16(width), + height: UInt16(height)) + ) } + + return message.reply() + default: + throw ContainerizationError( + .invalidState, + message: "cannot resize: container is not running" + ) } } @@ -635,11 +584,6 @@ public actor SandboxService { public func dial(_ message: XPCMessage) async throws -> XPCMessage { self.log.info("`dial` xpc handler") switch self.state { - case .starting, .created, .stopped, .stopping: - throw ContainerizationError( - .invalidState, - message: "cannot dial: container is not running" - ) case .running, .booted: let port = message.uint64(key: .port) guard port > 0 else { @@ -655,7 +599,125 @@ public actor SandboxService { let reply = message.reply() reply.set(key: .fd, value: fh) return reply + default: + throw ContainerizationError( + .invalidState, + message: "cannot dial: container is not running" + ) + } + } + + private func startInitProcess(lock: AsyncLock.Context) async throws { + let info = try self.getContainer() + let container = info.container + let id = container.id + + guard self.state == .booted else { + throw ContainerizationError( + .invalidState, + message: "container expected to be in booted state, got: \(self.state)" + ) + } + + do { + let io = info.io + + try await container.start() + let waitFunc: ExitMonitor.WaitHandler = { + let code = try await container.wait() + if let out = io.out { + try out.close() + } + if let err = io.err { + try err.close() + } + return code + } + try await self.monitor.track(id: id, waitingOn: waitFunc) + } catch { + try? await self.cleanupContainer() + self.setState(.created) + throw error + } + } + + private func startExecProcess(processId id: String, lock: AsyncLock.Context) async throws { + let container = try self.getContainer().container + guard let processInfo = self.processes[id] else { + throw ContainerizationError(.notFound, message: "Process with id \(id)") + } + + let containerInfo = try self.getContainer() + + let czConfig = self.configureProcessConfig(config: processInfo.config, stdio: processInfo.io, containerConfig: containerInfo.config) + + let process = try await container.exec(id, configuration: czConfig) + try self.setUnderlyingProcess(id, process) + + try await process.start() + + let waitFunc: ExitMonitor.WaitHandler = { + let code = try await process.wait() + if let out = processInfo.io[1] { + try self.closeHandle(out.fileDescriptor) + } + if let err = processInfo.io[2] { + try self.closeHandle(err.fileDescriptor) + } + return code + } + try await self.monitor.track(id: id, waitingOn: waitFunc) + } + + private func startSocketForwarders(containerIpAddress: String, publishedPorts: [PublishPort]) async throws { + var forwarders: [SocketForwarderResult] = [] + try await withThrowingTaskGroup(of: SocketForwarderResult.self) { group in + for publishedPort in publishedPorts { + let proxyAddress = try SocketAddress(ipAddress: publishedPort.hostAddress, port: Int(publishedPort.hostPort)) + let serverAddress = try SocketAddress(ipAddress: containerIpAddress, port: Int(publishedPort.containerPort)) + log.info( + "creating forwarder for", + metadata: [ + "proxy": "\(proxyAddress)", + "server": "\(serverAddress)", + "protocol": "\(publishedPort.proto)", + ]) + group.addTask { + let forwarder: SocketForwarder + switch publishedPort.proto { + case .tcp: + forwarder = try TCPForwarder( + proxyAddress: proxyAddress, + serverAddress: serverAddress, + eventLoopGroup: self.eventLoopGroup, + log: self.log + ) + case .udp: + forwarder = try UDPForwarder( + proxyAddress: proxyAddress, + serverAddress: serverAddress, + eventLoopGroup: self.eventLoopGroup, + log: self.log + ) + } + return try await forwarder.run().get() + } + } + for try await result in group { + forwarders.append(result) + } + } + + self.socketForwarders = forwarders + } + + private func stopSocketForwarders() async { + log.info("closing forwarders") + for forwarder in self.socketForwarders { + forwarder.close() + try? await forwarder.wait() } + log.info("closed forwarders") } private func onContainerExit(id: String, code: Int32) async throws { @@ -664,10 +726,9 @@ public actor SandboxService { try await self.lock.withLock { [self] _ in let ctrInfo = try await getContainer() let ctr = ctrInfo.container - // Did someone explicitly call stop and we're already - // cleaning up? + switch await self.state { - case .stopped(_): + case .stopped(_), .stopping: return default: break @@ -676,7 +737,7 @@ public actor SandboxService { do { try await ctr.stop() } catch { - log.notice("failed to stop sandbox gracefully: \(error)") + self.log.notice("failed to stop sandbox gracefully: \(error)") } do { @@ -685,13 +746,12 @@ public actor SandboxService { self.log.error("failed to cleanup container: \(error)") } await setState(.stopped(code)) + let waiters = await self.waiters[id] ?? [] for cc in waiters { cc.resume(returning: code) } await self.removeWaiters(for: id) - try await self.sendContainerEvent(.containerExit(id: id, exitCode: Int64(code))) - exit(code) } } @@ -866,11 +926,12 @@ public actor SandboxService { return container } - private func gracefulStopContainer(_ lc: LinuxContainer, stopOpts: ContainerStopOptions) async throws { + private func gracefulStopContainer(_ lc: LinuxContainer, stopOpts: ContainerStopOptions) async throws -> Int32 { // Try and gracefully shut down the process. Even if this succeeds we need to power off // the vm, but we should try this first always. + var code: Int32 = 255 do { - try await withThrowingTaskGroup(of: Void.self) { group in + code = try await withThrowingTaskGroup(of: Int32.self) { group in group.addTask { try await lc.wait() } @@ -878,13 +939,24 @@ public actor SandboxService { try await lc.kill(stopOpts.signal) try await Task.sleep(for: .seconds(stopOpts.timeoutInSeconds)) try await lc.kill(SIGKILL) + + return 137 + } + guard let code = try await group.next() else { + throw ContainerizationError( + .internalError, + message: "failed to get exit code from gracefully stopping container" + ) } - try await group.next() group.cancelAll() + + return code } } catch {} + // Now actually bring down the vm. try await lc.stop() + return code } private func cleanupContainer() async throws { @@ -900,17 +972,6 @@ public actor SandboxService { } } } - - private func sendContainerEvent(_ event: ContainerEvent) async throws { - let serviceIdentifier = "com.apple.container.apiserver" - let client = XPCClient(service: serviceIdentifier) - let message = XPCMessage(route: .containerEvent) - - let data = try JSONEncoder().encode(event) - message.set(key: .containerEvent, value: data) - try await client.send(message) - } - } extension XPCMessage { @@ -1088,7 +1149,10 @@ extension SandboxService { self.container = info } - private func addNewProcess(_ id: String, _ config: ProcessConfiguration, _ io: [FileHandle?]) { + private func addNewProcess(_ id: String, _ config: ProcessConfiguration, _ io: [FileHandle?]) throws { + guard self.processes[id] == nil else { + throw ContainerizationError(.invalidArgument, message: "process \(id) already exists") + } self.processes[id] = ProcessInfo(config: config, process: nil, state: .created, io: io) } @@ -1107,13 +1171,21 @@ extension SandboxService { let io: (in: FileHandle?, out: MultiWriter?, err: MultiWriter?) } + /// States the underlying sandbox can be in. public enum State: Sendable, Equatable { + /// Sandbox is created. This should be what the service starts the sandbox in. case created + /// Bootstrap will transition a .created state to .booted. case booted - case starting + /// startProcess on the init process will transition .booted to .running. case running + /// At the beginning of stop() .running will be transitioned to .stopping. case stopping + /// Once a stop is successful, .stopping will transition to .stopped. case stopped(Int32) + /// .shuttingDown will be the last state the sandbox service will ever be in. Shortly + /// afterwards the process will exit. + case shuttingDown } func setState(_ new: State) {