Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion clients/shared/Network/BtwClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,18 @@ public struct BtwClient: BtwClientProtocol {
]
let bodyData = try JSONSerialization.data(withJSONObject: body)

// Dedicated per-call session so `invalidateAndCancel()` can tear down
// the underlying data task on its own terms when the stream ends or
// the consumer cancels — avoids a use-after-free race between
// `Task.cancel()` and the `AsyncBytes` iterator on `URLSession.shared`.
let session = URLSession(configuration: .default)
defer { session.invalidateAndCancel() }

let (bytes, response) = try await GatewayHTTPClient.streamPostWithRetry(
path: "assistants/{assistantId}/btw",
body: bodyData,
timeout: 120
timeout: 120,
session: session
)

guard let http = response as? HTTPURLResponse else {
Expand Down
18 changes: 13 additions & 5 deletions clients/shared/Network/GatewayHTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -478,15 +478,19 @@ public enum GatewayHTTPClient {
/// - path: Path segment after `/v1/`.
/// - body: Pre-serialized request body data.
/// - timeout: Request timeout in seconds. Defaults to 30.
/// - session: The `URLSession` to use. Defaults to `.shared`. Pass a dedicated
/// session when the caller needs to control the lifecycle of the underlying
/// data task (e.g. to safely cancel a stream without a use-after-free
/// in `AsyncBytes`).
/// - Returns: A tuple of `(URLSession.AsyncBytes, URLResponse)` for streaming consumption.
/// - Throws: `ClientError` if the request cannot be constructed, or network errors from `URLSession`.
public static func streamPost(path: String, body: Data, timeout: TimeInterval = 30) async throws -> (URLSession.AsyncBytes, URLResponse) {
public static func streamPost(path: String, body: Data, timeout: TimeInterval = 30, session: URLSession = .shared) async throws -> (URLSession.AsyncBytes, URLResponse) {
let connection = try resolveConnection()
var request = try buildRequest(path: path, params: nil, method: "POST", timeout: timeout, connection: connection)
request.setValue("text/event-stream", forHTTPHeaderField: "Accept")
request.httpBody = body
logOutgoing(request, quiet: false)
let (bytes, response) = try await URLSession.shared.bytes(for: request)
let (bytes, response) = try await session.bytes(for: request)
if let http = response as? HTTPURLResponse {
logResponse(request, http: http, quiet: false)
}
Expand All @@ -504,18 +508,22 @@ public enum GatewayHTTPClient {
/// - path: Path segment after `/v1/`.
/// - body: Pre-serialized request body data.
/// - timeout: Request timeout in seconds. Defaults to 30.
/// - session: The `URLSession` to use. Defaults to `.shared`. Pass a dedicated
/// session when the caller needs to control the lifecycle of the underlying
/// data task (e.g. to safely cancel a stream without a use-after-free
/// in `AsyncBytes`).
/// - Returns: A tuple of `(URLSession.AsyncBytes, URLResponse)` for streaming consumption.
/// - Throws: `ClientError` if the request cannot be constructed,
/// `URLError(.userAuthenticationRequired)` if credential refresh fails,
/// or network errors from `URLSession`.
public static func streamPostWithRetry(path: String, body: Data, timeout: TimeInterval = 30) async throws -> (URLSession.AsyncBytes, URLResponse) {
public static func streamPostWithRetry(path: String, body: Data, timeout: TimeInterval = 30, session: URLSession = .shared) async throws -> (URLSession.AsyncBytes, URLResponse) {
let connection = try resolveConnection()
var request = try buildRequest(path: path, params: nil, method: "POST", timeout: timeout, connection: connection)
request.setValue("text/event-stream", forHTTPHeaderField: "Accept")
request.httpBody = body
logOutgoing(request, quiet: false)

let (bytes, response) = try await URLSession.shared.bytes(for: request)
let (bytes, response) = try await session.bytes(for: request)

guard let http = response as? HTTPURLResponse else {
return (bytes, response)
Expand All @@ -541,7 +549,7 @@ public enum GatewayHTTPClient {
retryRequest.setValue("text/event-stream", forHTTPHeaderField: "Accept")
retryRequest.httpBody = body
logOutgoing(retryRequest, quiet: false)
let (retryBytes, retryResponse) = try await URLSession.shared.bytes(for: retryRequest)
let (retryBytes, retryResponse) = try await session.bytes(for: retryRequest)
if let retryHttp = retryResponse as? HTTPURLResponse {
logResponse(retryRequest, http: retryHttp, quiet: false)
}
Expand Down