diff --git a/clients/shared/Network/EventStreamClient.swift b/clients/shared/Network/EventStreamClient.swift index d8fd6e2e853..3fbe787f6b4 100644 --- a/clients/shared/Network/EventStreamClient.swift +++ b/clients/shared/Network/EventStreamClient.swift @@ -302,11 +302,12 @@ public final class EventStreamClient { // A back-to-back call to `startSSEStream()` on the MainActor can // cancel this task and invalidate `session` before it runs its - // first instruction. Calling `session.bytes(for:)` on an already - // invalidated session throws an ObjC NSGenericException from - // `-[__NSURLSessionLocal taskForClassInfo:]`, which is - // uncatchable in Swift and crashes the process with SIGABRT. - // Bail out before touching the session if we've been superseded. + // first instruction. Bail out cheaply when that's already visible + // so we don't build the request or hop off MainActor for nothing. + // `GatewayHTTPClient.stream` additionally wraps data-task creation + // in an ObjC `@try/@catch` trampoline, which is the authoritative + // defense against invalidation that lands after this guard but + // before `URLSession` creates the underlying task. guard !Task.isCancelled, self.sseSession === session else { return } do { diff --git a/clients/shared/Network/GatewayHTTPClient.swift b/clients/shared/Network/GatewayHTTPClient.swift index 947d9d30872..9af7345af0e 100644 --- a/clients/shared/Network/GatewayHTTPClient.swift +++ b/clients/shared/Network/GatewayHTTPClient.swift @@ -1,4 +1,5 @@ import Foundation +import ObjCExceptionCatcher import os private let log = Logger(subsystem: Bundle.appBundleIdentifier, category: "GatewayHTTPClient") @@ -445,8 +446,7 @@ public enum GatewayHTTPClient { /// Performs an authenticated streaming GET request against the gateway. /// - /// Returns an async byte stream suitable for SSE or other streaming transports - /// that need `URLSession.bytes(for:)` instead of `URLSession.data(for:)`. + /// Returns an async byte stream suitable for SSE or other streaming transports. /// /// - Parameters: /// - path: Path segment after `/v1/`. @@ -454,39 +454,15 @@ public enum GatewayHTTPClient { /// - session: The `URLSession` to use. Defaults to `.shared`. Pass a dedicated /// session when the caller needs to control the lifecycle of the underlying /// data task (e.g. to safely cancel an SSE stream without a use-after-free - /// in `AsyncBytes`). - /// - Returns: A tuple of `(URLSession.AsyncBytes, URLResponse)` for streaming consumption. + /// in the byte iterator). + /// - Returns: A tuple of `(SafeAsyncBytes, URLResponse)` for streaming consumption. /// - Throws: `ClientError` if the request cannot be constructed, or network errors from `URLSession`. - public static func stream(path: String, timeout: TimeInterval = 30, session: URLSession = .shared) async throws -> (URLSession.AsyncBytes, URLResponse) { + public static func stream(path: String, timeout: TimeInterval = 30, session: URLSession = .shared) async throws -> (SafeAsyncBytes, URLResponse) { let connection = try resolveConnection() var request = try buildRequest(path: path, params: nil, method: "GET", timeout: timeout, connection: connection) request.setValue("text/event-stream", forHTTPHeaderField: "Accept") logOutgoing(request, quiet: false) - let (bytes, response) = try await session.bytes(for: request) - if let http = response as? HTTPURLResponse { - logResponse(request, http: http, quiet: false) - } - return (bytes, response) - } - - /// Performs an authenticated streaming POST request against the gateway. - /// - /// Returns an async byte stream suitable for SSE or other streaming transports - /// that need `URLSession.bytes(for:)` instead of `URLSession.data(for:)`. - /// - /// - Parameters: - /// - path: Path segment after `/v1/`. - /// - body: Pre-serialized request body data. - /// - timeout: Request timeout in seconds. Defaults to 30. - /// - Returns: A tuple of `(URLSession.AsyncBytes, URLResponse)` for streaming consumption. - /// - Throws: `ClientError` if the request cannot be constructed, or network errors from `URLSession`. - public static func streamPost(path: String, body: Data, timeout: TimeInterval = 30) async throws -> (URLSession.AsyncBytes, URLResponse) { - let connection = try resolveConnection() - var request = try buildRequest(path: path, params: nil, method: "POST", timeout: timeout, connection: connection) - request.setValue("text/event-stream", forHTTPHeaderField: "Accept") - request.httpBody = body - logOutgoing(request, quiet: false) - let (bytes, response) = try await URLSession.shared.bytes(for: request) + let (bytes, response) = try await safeBytes(session: session, request: request) if let http = response as? HTTPURLResponse { logResponse(request, http: http, quiet: false) } @@ -504,18 +480,21 @@ public enum GatewayHTTPClient { /// - path: Path segment after `/v1/`. /// - body: Pre-serialized request body data. /// - timeout: Request timeout in seconds. Defaults to 30. - /// - Returns: A tuple of `(URLSession.AsyncBytes, URLResponse)` for streaming consumption. + /// - session: The `URLSession` to use. Defaults to `.shared`. Pass a dedicated + /// session when the caller needs to control the lifecycle of the underlying + /// data task (e.g. to cancel a stream without a use-after-free in the byte iterator). + /// - Returns: A tuple of `(SafeAsyncBytes, URLResponse)` for streaming consumption. /// - Throws: `ClientError` if the request cannot be constructed, /// `URLError(.userAuthenticationRequired)` if credential refresh fails, /// or network errors from `URLSession`. - public static func streamPostWithRetry(path: String, body: Data, timeout: TimeInterval = 30) async throws -> (URLSession.AsyncBytes, URLResponse) { + public static func streamPostWithRetry(path: String, body: Data, timeout: TimeInterval = 30, session: URLSession = .shared) async throws -> (SafeAsyncBytes, URLResponse) { let connection = try resolveConnection() var request = try buildRequest(path: path, params: nil, method: "POST", timeout: timeout, connection: connection) request.setValue("text/event-stream", forHTTPHeaderField: "Accept") request.httpBody = body logOutgoing(request, quiet: false) - let (bytes, response) = try await URLSession.shared.bytes(for: request) + let (bytes, response) = try await safeBytes(session: session, request: request) guard let http = response as? HTTPURLResponse else { return (bytes, response) @@ -541,13 +520,302 @@ public enum GatewayHTTPClient { retryRequest.setValue("text/event-stream", forHTTPHeaderField: "Accept") retryRequest.httpBody = body logOutgoing(retryRequest, quiet: false) - let (retryBytes, retryResponse) = try await URLSession.shared.bytes(for: retryRequest) + let (retryBytes, retryResponse) = try await safeBytes(session: session, request: retryRequest) if let retryHttp = retryResponse as? HTTPURLResponse { logResponse(retryRequest, http: retryHttp, quiet: false) } return (retryBytes, retryResponse) } + /// Streams the response body for `request` as a sequence of bytes, surviving + /// concurrent invalidation of `session`. + /// + /// `URLSession.bytes(for:)` internally calls `-[NSURLSession dataTaskWithRequest:]` + /// from the synchronous prefix of its async body. If the session is invalidated + /// on another thread — even *during* that call — the method raises an Objective-C + /// `NSGenericException` ("Task created in a session that has been invalidated") + /// from `-[__NSURLSessionLocal taskForClassInfo:]`. Swift `do/catch` cannot + /// intercept ObjC exceptions, so the process aborts with SIGABRT. + /// + /// This helper bypasses `bytes(for:)` entirely: it creates the data task itself + /// inside a `VLMPerformWithObjCExceptionHandling` trampoline (so an invalidated + /// session surfaces a Swift `URLError(.cancelled)` instead of aborting the + /// process), then drives the byte stream from a per-task + /// `URLSessionDataDelegate`. The returned `SafeAsyncBytes` conforms to + /// `AsyncSequence` with `Element == UInt8`, so callers can use `bytes.lines` + /// exactly like `URLSession.AsyncBytes`. + private static func safeBytes(session: URLSession, request: URLRequest) async throws -> (SafeAsyncBytes, URLResponse) { + let (chunks, chunksContinuation) = AsyncThrowingStream.makeStream() + let delegate = StreamingBytesDelegate(chunksContinuation: chunksContinuation) + + var taskCreationError: NSError? + var createdTask: URLSessionDataTask? + let taskCreationSucceeded = VLMPerformWithObjCExceptionHandling({ + createdTask = session.dataTask(with: request) + }, &taskCreationError) + guard taskCreationSucceeded, let task = createdTask else { + chunksContinuation.finish(throwing: URLError(.cancelled)) + throw URLError(.cancelled, userInfo: [ + NSLocalizedDescriptionKey: taskCreationError?.localizedDescription + ?? "URLSession was invalidated before the stream could start" + ]) + } + + // Per-task delegate (iOS 15+) overrides any session-level delegate and + // is invoked even when the session itself has none. + task.delegate = delegate + + // Wire the task into the delegate so it can apply TCP backpressure + // (see `StreamingBytesDelegate` for the high/low water-mark policy). + delegate.attach(task: task) + + // If the consumer stops iterating, cancel the underlying data task so + // the URLSession tears down cleanly. + chunksContinuation.onTermination = { @Sendable _ in task.cancel() } + + // `resume()` is documented to be a no-op on a cancelled or completed + // task, but guard it with the trampoline anyway so any hypothetical + // NSException from a concurrently-invalidated task surfaces cleanly. + // + // The `withTaskCancellationHandler` propagates parent Swift task + // cancellation (e.g. `BtwClient.sendMessage` tearing down its worker + // before headers arrive) to the underlying `URLSessionDataTask`, so + // callers that cancel while we're awaiting the initial response don't + // leave the HTTP request running until server timeout. Cancelling the + // task fires `didCompleteWithError(.cancelled)`, which the delegate + // translates into a thrown `URLError` on this continuation. + let response: URLResponse = try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { continuation in + delegate.setResponseContinuation(continuation) + var resumeError: NSError? + let resumed = VLMPerformWithObjCExceptionHandling({ + task.resume() + }, &resumeError) + if !resumed { + delegate.failResponseIfPending( + with: URLError(.cancelled, userInfo: [ + NSLocalizedDescriptionKey: resumeError?.localizedDescription + ?? "URLSession was invalidated before the task could resume" + ]) + ) + chunksContinuation.finish(throwing: URLError(.cancelled)) + } + } + } onCancel: { + task.cancel() + } + + return (SafeAsyncBytes(chunks: chunks, bytesConsumed: delegate.bytesConsumed), response) + } + + /// `AsyncSequence` of raw response bytes, API-compatible with + /// `URLSession.AsyncBytes` for the subset the gateway uses (`Element == UInt8` + /// plus the `lines` extension). Backed by an `AsyncThrowingStream` so + /// the iterator consumes at chunk granularity and exposes bytes one-by-one. + /// + /// `bytesConsumed` is invoked each time the iterator drains a chunk from + /// the underlying stream, so the producing `StreamingBytesDelegate` can + /// release its in-flight byte accounting and resume a suspended task + /// once the pending buffer falls below the low-water mark. + public struct SafeAsyncBytes: AsyncSequence, Sendable { + public typealias Element = UInt8 + + private let chunks: AsyncThrowingStream + private let bytesConsumed: @Sendable (Int) -> Void + + fileprivate init( + chunks: AsyncThrowingStream, + bytesConsumed: @escaping @Sendable (Int) -> Void + ) { + self.chunks = chunks + self.bytesConsumed = bytesConsumed + } + + public struct AsyncIterator: AsyncIteratorProtocol { + fileprivate var chunkIterator: AsyncThrowingStream.AsyncIterator + fileprivate var currentChunk: Data = Data() + fileprivate var currentOffset: Int = 0 + fileprivate let bytesConsumed: @Sendable (Int) -> Void + + public mutating func next() async throws -> UInt8? { + while currentOffset >= currentChunk.count { + guard let next = try await chunkIterator.next() else { return nil } + currentChunk = next + currentOffset = 0 + bytesConsumed(next.count) + } + let byte = currentChunk[currentChunk.startIndex + currentOffset] + currentOffset += 1 + return byte + } + } + + public func makeAsyncIterator() -> AsyncIterator { + AsyncIterator(chunkIterator: chunks.makeAsyncIterator(), bytesConsumed: bytesConsumed) + } + } + + /// `URLSessionDataDelegate` that bridges task callbacks to `safeBytes`'s + /// chunk stream and the initial response continuation. Callbacks are + /// dispatched on a URLSession-owned background queue; the lock serialises + /// the one-shot response continuation and the in-flight byte accounting. + /// + /// The delegate is installed on the task before the task is resumed, so + /// the session can deliver `didCompleteWithError` — e.g. because the + /// session was invalidated on another thread — before `safeBytes` has a + /// chance to call `setResponseContinuation`. `responseResolved` plus the + /// stashed `completionError` let the late `setResponseContinuation` call + /// resume immediately with the right error instead of hanging forever. + /// + /// Applies TCP-level backpressure so producers that outpace the consumer + /// don't grow the `AsyncThrowingStream` buffer without bound (matching + /// `URLSession.AsyncBytes`'s built-in flow control). When the number of + /// yielded-but-not-yet-consumed bytes crosses `highWaterMark`, the + /// underlying `URLSessionDataTask` is suspended; when it falls back below + /// `lowWaterMark`, the task is resumed. Suspending the task stops reading + /// from the socket, which lets the OS receive buffer fill and throttle the + /// remote sender via TCP flow control. + private final class StreamingBytesDelegate: NSObject, URLSessionDataDelegate, @unchecked Sendable { + /// 1 MiB. Suspend the task once this many bytes are queued but not yet + /// consumed. Sized to absorb SSE bursts while bounding peak memory for + /// high-volume consumers (e.g. `TerminalAPIClient.subscribeEvents`). + private static let highWaterMark: Int = 1 << 20 + /// 256 KiB. Resume once the queue drains below this mark; the gap + /// between the two marks avoids rapid suspend/resume thrashing. + private static let lowWaterMark: Int = 256 * 1024 + + private let chunksContinuation: AsyncThrowingStream.Continuation + private var responseContinuation: CheckedContinuation? + private var responseResolved = false + private var completionError: Error? + private weak var task: URLSessionDataTask? + private var pendingBytes: Int = 0 + private var suspendedForBackpressure = false + private let lock = NSLock() + + init(chunksContinuation: AsyncThrowingStream.Continuation) { + self.chunksContinuation = chunksContinuation + } + + /// Stores a weak reference to the task so the delegate can drive + /// `suspend()`/`resume()` for backpressure without creating a + /// retain cycle that would outlive the caller's stream. + func attach(task: URLSessionDataTask) { + lock.lock() + self.task = task + lock.unlock() + } + + /// Called by `SafeAsyncBytes.AsyncIterator` each time it drains a + /// chunk from the stream. Decrements the in-flight byte counter and + /// resumes the task if backpressure suspended it and the queue has + /// drained below the low-water mark. + /// + /// Exposed as a `@Sendable` closure property so the iterator can + /// call it without importing the concrete delegate type. + /// + /// `task.resume()` runs while holding `lock` so the flag flip and + /// the task-state change are atomic with the matching + /// `task.suspend()` in `didReceive data:`. If those operations + /// interleave without mutual exclusion, the task can end up + /// suspended with `suspendedForBackpressure = false`, leaving no + /// code path to wake it up. + lazy var bytesConsumed: @Sendable (Int) -> Void = { [weak self] count in + guard let self else { return } + self.lock.lock() + self.pendingBytes = max(0, self.pendingBytes - count) + if self.suspendedForBackpressure + && self.pendingBytes <= Self.lowWaterMark { + self.suspendedForBackpressure = false + self.task?.resume() + } + self.lock.unlock() + } + + func setResponseContinuation(_ continuation: CheckedContinuation) { + lock.lock() + if responseResolved { + // The delegate already received `didCompleteWithError` (e.g. + // the session was invalidated between `task.delegate = …` and + // this call). Resume immediately so + // `withCheckedThrowingContinuation` doesn't hang. + let error = completionError ?? URLError(.cancelled) + lock.unlock() + continuation.resume(throwing: error) + return + } + responseContinuation = continuation + lock.unlock() + } + + func failResponseIfPending(with error: Error) { + lock.lock() + let pending = responseContinuation + responseContinuation = nil + responseResolved = true + if completionError == nil { + completionError = error + } + lock.unlock() + pending?.resume(throwing: error) + } + + func urlSession( + _ session: URLSession, + dataTask: URLSessionDataTask, + didReceive response: URLResponse, + completionHandler: @escaping (URLSession.ResponseDisposition) -> Void + ) { + lock.lock() + let pending = responseContinuation + responseContinuation = nil + responseResolved = true + lock.unlock() + pending?.resume(returning: response) + completionHandler(.allow) + } + + func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) { + // Account for the chunk *before* yielding it so a consumer that + // drains it immediately on another thread can't beat the + // delegate to `bytesConsumed` and drive `pendingBytes` negative. + // + // `dataTask.suspend()` runs while still holding `lock` so the + // flag flip and the task-state change are atomic with the + // matching `task.resume()` in `bytesConsumed`. + lock.lock() + pendingBytes += data.count + if !suspendedForBackpressure + && pendingBytes >= Self.highWaterMark { + suspendedForBackpressure = true + dataTask.suspend() + } + lock.unlock() + chunksContinuation.yield(data) + } + + func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) { + lock.lock() + let pending = responseContinuation + responseContinuation = nil + let hadResponse = responseResolved + responseResolved = true + let effectiveError = error ?? (hadResponse ? nil : URLError(.badServerResponse)) + if !hadResponse, completionError == nil { + completionError = effectiveError + } + lock.unlock() + if !hadResponse, let effectiveError { + pending?.resume(throwing: effectiveError) + } + if let error { + chunksContinuation.finish(throwing: error) + } else { + chunksContinuation.finish() + } + } + } + // MARK: - Internals #if os(macOS)