fix(macos): replace URLSession.bytes with delegate-driven stream to survive session invalidation#26281
Conversation
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e51fdd6520
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| ?? "URLSession was invalidated before the stream could start" | ||
| ]) | ||
| } | ||
| return try await session.bytes(for: request) |
There was a problem hiding this comment.
Catch invalidation around the real
bytes(for:) call
safeBytes only wraps the probe dataTask(with:) in VLMPerformWithObjCExceptionHandling, but the actual session.bytes(for:) invocation is still outside that protection. In the SSE path, EventStreamClient can invalidate the same session on @MainActor while this nonisolated function is running, so invalidation that lands after a successful probe can still trigger NSGenericException and abort the process. This leaves the crash path reachable during rapid stop/start or reconnect races, so the fix is incomplete unless task creation for the real stream is also protected/serialized.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Good catch — you're right that the probe-then-call left a TOCTOU window (the real bytes(for:) call was still outside the trampoline, and on-MainActor invalidation can interleave with on-concurrent-executor code on a different thread). Addressed in a3bbc46: replaced the probe approach with a full URLSessionDataDelegate-driven SafeAsyncBytes. safeBytes() now owns the dataTask(with:) call site directly, wraps it in VLMPerformWithObjCExceptionHandling, and drives the byte stream from per-task delegate callbacks — there's no URLSession.bytes(for:) call left to race against. Resolved.
There was a problem hiding this comment.
Approved ✅
Clean fix for a real crash. The safeBytes probe approach is clever — reuses the existing VLMPerformWithObjCExceptionHandling ObjC trampoline to catch the NSGenericException before it becomes a SIGABRT, and the synchronous probe → cancel → real call sequence correctly avoids TOCTOU since session invalidation only happens on @MainActor and can't interleave with the non-await code on the concurrent executor.
Key things I verified:
- The probe
dataTask(with:)stays suspended (no.resume()), so zero network side effects - Backward compatible:
session: URLSession = .shareddefault onstreamPost/streamPostWithRetry - The
EventStreamClientcomment update correctly reflects thatsafeBytesis now the authoritative defense, with thesseSession === sessionguard remaining as a cheap early bailout - Error surfaces as
URLError(.cancelled)which existing callers already handle
Devin Review: ✅ No issues found. Ready to merge.
a3bbc46
There was a problem hiding this comment.
Approved ✅
Summary
This evolved well from the initial probe approach to a full delegate-based streaming implementation that eliminates bytes(for:) entirely rather than just guarding it. Much more robust.
What I Checked
SafeAsyncBytesAPI compatibility — conforms toAsyncSequence<UInt8>, so Foundation's.linesextension works transparently. All callers (EventStreamClient.startSSEStream,BtwClient,TerminalAPIClient) use type-inferredlet (bytes, response)— no explicitURLSession.AsyncBytestype annotations to break. ✅- Thread safety —
StreamingBytesDelegateserializes the response continuation underNSLock. BothdidReceive responseandfailResponseIfPendingnil out + resume under the lock, guaranteeing exactly-one-resume.chunksContinuationis inherentlySendable. ✅ - Task failure before response —
didCompleteWithErrorchecksresponseResolvedand resumes the pending continuation with the error. Edge case handled. ✅ onTerminationcleanup — cancels the data task when the consumer stops iterating. ✅streamPostWithRetryretry path — retry also goes throughsafeByteswith the same session parameter. Fresh connection credentials but same session lifecycle. Correct. ✅- ObjC trampoline reuse —
VLMPerformWithObjCExceptionHandlingalready battle-tested forAVAudioNode.installTap. Using it fordataTask(with:)andresume()is the same pattern. ✅ - No behavior change in happy path — same bytes, same response, same error paths. Just driven by delegate instead of Foundation's internal async wrapper.
One note for local build verification
The per-task delegate (task.delegate = delegate) requires iOS 15+ / macOS 12+ — should be fine given our deployment targets, but worth confirming in the Xcode build.
Verdict
Clean, well-documented, correct fix for a real SIGABRT crash. The delegate-based approach is architecturally better than probing because it removes the entire bytes(for:) call surface rather than wrapping it. Ship it.
f8055bc
There was a problem hiding this comment.
Approved ✅
v3 Review
This is a solid, complete replacement for URLSession.bytes(for:). The evolution from v1 (probe) → v2 (delegate-driven) → v3 (continuation leak fix) landed in a good place.
Key strengths:
SafeAsyncBytesis source-compatible withURLSession.AsyncBytesfor the.linesusage — no caller changes neededStreamingBytesDelegate.setResponseContinuationproperly handles the race wheredidCompleteWithErrorfires before the continuation is stored (responseResolved+completionErrorstash). This was the real v3 fix and it's correct.- NSLock serialization is appropriate — URLSession delegate callbacks come from a session-owned background queue, but the
setResponseContinuationcall comes from the caller's context. The lock covers both paths. onTerminationcleanup ensures the data task is cancelled if the consumer stops iterating- ObjC trampoline on both
dataTask(with:)andresume()is belt-and-suspenders — good defensive coding
Merge readiness: CI checks are skipped (Devin branch policy). Needs local Xcode build verification before merge. Two files changed, well-scoped. No behavioral change in the happy path — only the crash-on-invalidation path is affected.
ashleeradka
left a comment
There was a problem hiding this comment.
Approved ✅ (v3 — f8055bc)
The delegate-driven SafeAsyncBytes is the right architecture here. Key things I verified across all 3 iterations:
-
v3 continuation leak fix —
setResponseContinuationnow checksresponseResolvedfirst and resumes immediately if the delegate already fireddidCompleteWithError. This closes the real race betweentask.delegate = delegateand the delegate queue.completionErrorstash ensures the actual error surfaces rather than a generic.cancelled. -
NSLock serialization — All
responseContinuation/responseResolved/completionErrormutations are lock-guarded. The lock-unlock-then-resume pattern avoids holding the lock across continuation resume (which could deadlock if the continuation scheduled work on the same queue). -
chunksContinuation.onTermination— Task gets cancelled when consumer stops iterating. Clean teardown. -
ObjC trampoline on both
dataTask(with:)andresume()— Belt and suspenders. Even ifresume()somehow throws on an invalidated session, it's caught. -
streamPostandstreamPostWithRetrygainsession:parameter — Consistency withstream(). No caller changes needed since it defaults to.shared.
Needs local Xcode build verification before merge — CI checks skipped on Devin branches.
There was a problem hiding this comment.
Approved ✅ (v3 — f8055bc)
The delegate-driven SafeAsyncBytes is the right architecture. Verified across all 3 iterations:
-
v3 continuation leak fix —
setResponseContinuationchecksresponseResolvedfirst and resumes immediately if the delegate already fireddidCompleteWithError.completionErrorstash ensures the actual error surfaces rather than a generic.cancelled. -
NSLock serialization — All
responseContinuation/responseResolved/completionErrormutations are lock-guarded. Lock-unlock-then-resume pattern avoids deadlock. -
chunksContinuation.onTermination— Task gets cancelled when consumer stops iterating. Clean teardown. -
ObjC trampoline on both
dataTask(with:)andresume()— Belt and suspenders. -
streamPostandstreamPostWithRetrygainsession:parameter — Consistency withstream(). No caller changes needed.
Needs local Xcode build verification before merge — CI checks skipped on Devin branches.
3940c8d
There was a problem hiding this comment.
Re-approving on v4 (3940c8d). The delta from v3 is a pure deletion of the unused streamPost non-retry entry point (0 additions, 26 deletions) — dead code since #19442. The v3 delegate-based SafeAsyncBytes + continuation leak fix remains intact and correct. ✅
Still needs local Xcode build verification before merge (CI checks skipped on Devin branches).
|
@devin-ai review this PR |
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3940c8d30b
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| let response: URLResponse = try await withCheckedThrowingContinuation { continuation in | ||
| delegate.setResponseContinuation(continuation) | ||
| var resumeError: NSError? | ||
| let resumed = VLMPerformWithObjCExceptionHandling({ | ||
| task.resume() |
There was a problem hiding this comment.
Propagate task cancellation while waiting for first response
safeBytes suspends in withCheckedThrowingContinuation until delegate callbacks arrive, but there is no withTaskCancellationHandler to cancel the underlying URLSessionDataTask if the parent Swift task is cancelled before headers are received. In current callers (for example BtwClient.sendMessage cancelling its worker task on stream termination), this means a cancelled operation can keep the HTTP request alive until timeout/remote completion instead of stopping promptly, which can leave stale network work and delay teardown.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Good catch — fixed in d1b4111.
Wrapped the response-header withCheckedThrowingContinuation in withTaskCancellationHandler, so a parent Swift task cancellation (e.g. BtwClient.sendMessage tearing down its worker) that lands between task.resume() and the first delegate callback now calls task.cancel() on the underlying URLSessionDataTask. Cancelling fires didCompleteWithError(URLError.cancelled), which the delegate converts into a thrown URLError on the pending continuation — matching the cancellation semantics of URLSession.bytes(for:).
chunksContinuation.onTermination still handles the post-headers case (consumer stops iterating), so cancellation is covered both before and after the initial response.
d1b4111
There was a problem hiding this comment.
Approved ✅ (re-review on v5 d1b4111)
Reviewed the full diff across all 5 commits. The evolution v1→v5 is clean:
- v1-v2: Probe approach → full delegate-based
SafeAsyncBytesreplacingURLSession.bytes(for:)entirely - v3: Continuation leak fix —
setResponseContinuationchecksresponseResolvedfirst, stashescompletionErrorfor the late-continuation race - v4: Dead code removal — unused
streamPostnon-retry entry point (pure deletion) - v5:
withTaskCancellationHandlerwrapping the header-wait continuation — fixes the Codex-found P1 where parent task cancellation before headers arrived would leave theURLSessionDataTaskalive until server timeout.onCancelcallstask.cancel(), firingdidCompleteWithError(.cancelled), which the delegate translates to a thrownURLErroron the pending continuation. MatchesURLSession.bytes(for:)cancellation semantics.
Concurrency Deep-Check (R11b)
- Race enumeration:
NSLockserializes the one-shot response continuation.responseResolved+completionErrorstash handle the delegate-fires-before-continuation race.withTaskCancellationHandlerhandles parent cancellation during header wait. ✅ - Continuation lifecycle: Single response continuation, set once via
setResponseContinuation, consumed exactly once (either bydidReceive responseor byfailResponseIfPending/didCompleteWithError/late-set path). No double-resume possible. ✅ - Cancellation propagation: Three paths covered — (a) consumer stops iterating →
onTerminationcancels task, (b) parent task cancelled before headers →withTaskCancellationHandlercancels task, (c) session invalidated → ObjC trampoline catches and throws. ✅ - Actor isolation: Delegate callbacks on URLSession background queue, properly serialized with NSLock. No MainActor assumptions. ✅
EventStreamClient changes
Comment update only — accurately describes the guard as a cheap early bailout with safeBytes as the authoritative defense. ✅
Still blocked on local Xcode build verification (Devin authored on Linux). The code is correct and ready for that final step.
There was a problem hiding this comment.
Approved ✅ (v5 — d1b4111)
v5 delta from v4: wraps the header-wait withCheckedThrowingContinuation in withTaskCancellationHandler. When the parent Swift task is cancelled before headers arrive, onCancel calls task.cancel() on the underlying URLSessionDataTask, which fires didCompleteWithError(.cancelled) through the delegate and resumes the continuation cleanly.
This fixes the P1 Codex identified: without it, a cancelled parent task would leave the HTTP request running until server timeout. The cancellation semantics match URLSession.bytes(for:)'s built-in behavior.
All 5 commits reviewed across the full evolution: v1 probe → v2 delegate-based SafeAsyncBytes → v3 continuation leak fix → v4 dead code cleanup → v5 task cancellation handler. Solid work.
|
@devin-ai review this PR |
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d1b4111a44
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| /// `AsyncSequence` with `Element == UInt8`, so callers can use `bytes.lines` | ||
| /// exactly like `URLSession.AsyncBytes`. | ||
| private static func safeBytes(session: URLSession, request: URLRequest) async throws -> (SafeAsyncBytes, URLResponse) { | ||
| let (chunks, chunksContinuation) = AsyncThrowingStream<Data, Error>.makeStream() |
There was a problem hiding this comment.
Bound SafeAsyncBytes buffering to prevent OOM
safeBytes creates the chunk stream with AsyncThrowingStream<Data, Error>.makeStream() and later yields every delegate chunk into it, which defaults to unbounded buffering. If the producer outpaces consumers (for example, high-volume terminal/SSE output while parsing runs on @MainActor), queued Data chunks can grow without limit and cause significant memory growth or process termination. This is a regression from the previous URLSession.AsyncBytes path and should use an explicit bounded buffering/backpressure strategy.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Good catch — fixed in f6bf772.
StreamingBytesDelegate now tracks yielded-but-not-yet-consumed bytes and drives URLSessionDataTask.suspend() / .resume() around a 1 MiB high-water / 256 KiB low-water band. Suspending the data task stops reading from the socket, so the OS receive buffer fills and TCP flow control throttles the remote sender — matching URLSession.AsyncBytes's built-in backpressure.
Implementation notes:
SafeAsyncBytes.AsyncIteratorgets a@Sendable (Int) -> VoidbytesConsumedcallback (exposed as alazy varon the delegate so the iterator doesn't need to import the delegate type) and invokes it each time it drains a chunk from the underlyingAsyncThrowingStream.- The delegate increments
pendingBytesbeforechunksContinuation.yield(data)so a fast consumer on another thread can't race the delegate and push the counter negative. suspend()/resume()are called outside the lock; the weak task reference lets the delegate drop out of scope cleanly when the caller's stream ends.onTermination → task.cancel()still wins over a backpressure-suspend, sincecancel()overridessuspend()perURLSessionTaskdocs.
Resolved.
f6bf772
There was a problem hiding this comment.
Approved ✅ (re-review on v6 f6bf772)
The v6 delta from v5 is a single commit: TCP backpressure for SafeAsyncBytes.
What v6 adds
AsyncThrowingStream.makeStream() defaults to unbounded buffering, so when the delegate-driven stream produces faster than the consumer drains (e.g. high-volume terminal SSE), memory grows without limit. v6 adds in-flight byte accounting:
- High-water mark (1 MiB):
URLSessionDataTaskis suspended when yielded-but-unconsumed bytes cross this threshold - Low-water mark (256 KiB): task resumes once consumer drains below — the gap prevents suspend/resume thrashing
pendingBytesincremented beforeyield()to prevent a race where a fast consumer on another thread drains before accountingweak var taskavoids retain cycles;bytesConsumedclosure uses[weak self]- All state guarded by
NSLock
This restores the flow control semantics that URLSession.AsyncBytes provided natively. Clean design, thorough documentation in the commit message and inline comments.
No concerns. The full PR across all 6 commits is solid — delegate-based stream (v1-v3) → dead code cleanup (v4) → task cancellation handler (v5) → backpressure (v6). Each layer builds correctly on the last.
Still needs: Boss re-approval on v6 + local Xcode build verification.
|
@devin-ai review this PR — v6 added TCP backpressure to the SafeAsyncBytes chunk stream. Please review the new high/low water-mark flow control mechanism. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f6bf7728bb
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if shouldResume { | ||
| self.suspendedForBackpressure = false | ||
| } | ||
| let task = self.task | ||
| self.lock.unlock() |
There was a problem hiding this comment.
Keep backpressure state synchronized with task suspension
This closure clears suspendedForBackpressure before resume() is actually executed, while didReceive data performs suspend() later (after yield) on a different thread. If the consumer drains to the low-water mark in that window, bytesConsumed can flip the flag to false and call resume() while the task is still running; then didReceive still executes suspend(), leaving the task suspended with suspendedForBackpressure == false, so no later path resumes it and the stream can stall indefinitely under high-throughput boundary conditions.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Real race — fixed in ed0eb6e5 (v7).
Both dataTask.suspend() and task.resume() now run inside the delegate's NSLock, so the flag flip and the task-state change are atomic with each other. The interleave you described is no longer possible:
didReceive data:holds the lock acrosspendingBytes += …→suspendedForBackpressure = true→dataTask.suspend(), then unlocks.bytesConsumedholds the lock acrosspendingBytes -= …→suspendedForBackpressure = false→task.resume(), then unlocks.
URLSession's suspend() / resume() are documented as thread-safe and don't synchronously call back into the delegate, so holding our own lock across them is safe — no risk of deadlock from URLSession invoking another delegate method on the same queue.
yield(data) stays outside the lock since it's a pure buffer write that doesn't need serialisation with the backpressure accounting. Resolved.
ed0eb6e
There was a problem hiding this comment.
Approved ✅ (re-review on v7 ed0eb6e5)
v7 delta from v6: moves task.suspend() and task.resume() inside the delegate lock. This fixes a real race Codex caught in v6:
The race (v6)
didReceive data:setssuspendedForBackpressure = trueand callssuspend()— but the flag set and suspend were not atomic- A fast consumer on another thread could call
bytesConsumed, see the flag, clear it, and callresume()between step 1's flag set and suspend call - Result: task suspended with flag cleared → no code path to wake it up → stream deadlock
The fix (v7)
Both suspend() (in didReceive data:) and resume() (in bytesConsumed) now execute while holding lock, making the flag-flip and task-state-change atomic. The documentation in the code comments is excellent — explains exactly what race this prevents.
Overall: 7-commit evolution from probe → delegate → continuation leak fix → dead code cleanup → cancellation propagation → TCP backpressure → lock serialization. Each commit is well-scoped and well-documented. Ship it.
Still needs: local Xcode build verification + Boss re-approval.
|
@devin-ai review this PR — v7 serializes backpressure suspend/resume under the delegate lock to fix a race condition Codex found in v6. |
|
Codex Review: Didn't find any major issues. You're on a roll. ℹ️ About Codex in GitHubCodex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback". |
…SE stream
URLSession.bytes(for:) internally calls -[NSURLSession dataTaskWithRequest:]
from the synchronous prefix of its async body. If the session is invalidated
on another thread — even during that call — the method raises an Objective-C
NSGenericException ("Task created in a session that has been invalidated")
from -[__NSURLSessionLocal taskForClassInfo:]. Swift do/catch cannot
intercept ObjC exceptions, so the process aborts with SIGABRT.
Replace URLSession.bytes(for:) with a delegate-driven byte stream that
owns task creation. The data task is constructed inside the existing
VLMPerformWithObjCExceptionHandling ObjC trampoline, so an invalidated
session surfaces a Swift URLError(.cancelled) instead of aborting. A
per-task URLSessionDataDelegate (iOS 15+ / macOS 12+) drives chunk
delivery into an AsyncThrowingStream, exposed as SafeAsyncBytes which
conforms to AsyncSequence so callers use .lines exactly like
URLSession.AsyncBytes.
The delegate covers the full stream lifecycle:
- didReceive response resolves the initial response continuation;
responseResolved + completionError let a late setResponseContinuation
call resume immediately with the real error if the delegate already
completed (e.g. session invalidated between task.delegate = … and
the caller getting back on-thread).
- didCompleteWithError translates transport errors into a thrown
URLError on the chunk stream, including explicit cancellation.
- withTaskCancellationHandler propagates parent Swift task cancellation
to URLSessionDataTask.cancel(), matching URLSession.bytes(for:)'s
built-in cancellation semantics so callers cancelling before headers
arrive don't leave the request running until server timeout.
- TCP backpressure: in-flight byte accounting suspends the data task
at a 1 MiB high-water mark and resumes it below a 256 KiB low-water
mark. Suspending stops reading from the socket, which lets the OS
receive buffer fill and throttle the server via TCP flow control,
preserving the flow control that URLSession.AsyncBytes has natively.
Both suspend() and resume() are invoked while the delegate's NSLock
is held so the flag flip and task-state change are atomic.
Also adds the session: URLSession = .shared parameter to
streamPostWithRetry so future callers can pass a dedicated session
with proper lifecycle, matching stream(). Removes the unused
streamPost entry point (dead since #19442).
Co-Authored-By: ashlee@vellum.ai <ashlee@vellum.ai>
d5d07a7
ed0eb6e to
d5d07a7
Compare
There was a problem hiding this comment.
Re-approved ✅ (post-squash d5d07a78)
Squashed 7 commits into one clean commit. Diff is identical to the v7 I approved earlier. SafeAsyncBytes delegate-driven stream, ObjC exception trampoline, lock-serialized backpressure. No code changes.
|
@devin-ai review this PR |
|
@codex review |
|
Codex Review: Didn't find any major issues. Can't wait for the next one! ℹ️ About Codex in GitHubCodex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback". |
Replaces
URLSession.bytes(for:)insideGatewayHTTPClient's streaming entry points (stream,streamPostWithRetry) with aURLSessionDataDelegate-drivenSafeAsyncByteswhere we own thedataTask(with:)call and wrap it in the existingVLMPerformWithObjCExceptionHandlingObjC@try/@catchtrampoline. An invalidated session now surfaces a SwiftURLError(.cancelled)instead of aborting the process with SIGABRT. Also adds asession: URLSession = .sharedparameter tostreamPostWithRetryso it matchesstream()and can participate in the dedicated-session lifecycle pattern established in #25396, and deletes the zero-call-sitestreamPostnon-retry variant (dead since #19442).Why
The app crashes (LUM-1001) with
NSGenericException: Task created in a session that has been invalidatedatEventStreamClient.swift:313 → GatewayHTTPClient.swift:465. PR #25396 moved SSE to a dedicatedURLSessionper connection, and PR #25426 added asseSession === sessionguard — but that guard runs on MainActor before theawait GatewayHTTPClient.stream(...)call. Because the network layer is intentionally non-isolated (PR #21729 / LUM-492), thatawaithops off MainActor; while suspended, another MainActor turn (token rotation, explicitstopSSE+startSSE, reconnect) can invalidate the captured localsession. When control resumes on the concurrent executor,URLSession.bytes(for:)synchronously calls-[NSURLSession dataTaskWithRequest:]on the now-invalidated session, which raises an ObjC exception Swiftdo/catchcannot intercept.The only way to fully close this is to own the
dataTask(with:)call site so it can live inside an ObjC@try/@catch— which is what this PR does.Benefits
URLError(.cancelled)into the caller's existing error-handling code (EventStreamClient.startSSEStream→handleSSEDisconnect()→ normal reconnect).setResponseContinuationvs. delegate-completion race, and unbounded buffering for high-volume consumers..linesstill works becauseSafeAsyncBytes: AsyncSequence where Element == UInt8.Why it's safe
bytes.lines. The only observable difference is the return type (source-compatible for current usage) and a conservative TCP backpressure policy that only activates on >1 MiB of queued bytes.URLError.URLSession.bytes(for:)semantics.withTaskCancellationHandlerpropagates parent-task cancellation toURLSessionDataTask.cancel().URLSessionTask.delegaterequires iOS 15+ / macOS 12+; the project targets iOS 17 / macOS 15 (clients/Package.swift).StreamingBytesDelegate.taskisweak, andbytesConsumedcaptures[weak self].What changed
GatewayHTTPClient.safeBytes(session:request:)— new private helper. Creates the data task insideVLMPerformWithObjCExceptionHandling(the ObjC trampoline inclients/shared/ObjCExceptionCatcher, already used forAVAudioNode.installTap). Attaches a per-taskURLSessionDataDelegatethat feeds anAsyncThrowingStream<Data, Error>of chunks. Resumes the task (also inside the trampoline, defensively) and awaits the initialURLResponsevia aCheckedContinuationwrapped inwithTaskCancellationHandler. On any failure,chunksContinuation.finish(throwing:)propagates the error to the iterator.GatewayHTTPClient.SafeAsyncBytes— new publicAsyncSequencewithElement == UInt8. API-compatible withURLSession.AsyncBytesfor the surface callers actually use (.linesis an extension onAsyncSequence where Element == UInt8).GatewayHTTPClient.StreamingBytesDelegate— privateURLSessionDataDelegate. One-shot response continuation serialised withNSLock.setResponseContinuationchecksresponseResolvedfirst and resumes immediately with a stashedcompletionErrorif the delegate already completed — closes the race betweentask.delegate = delegateand the delegate queue firingdidCompleteWithErrorbeforesafeBytesstores the continuation.StreamingBytesDelegatetracks yielded-but-not-yet-consumed bytes; crossing a 1 MiB high-water mark callsURLSessionDataTask.suspend(), draining below a 256 KiB low-water mark callsresume(). Suspending stops reading from the socket, so the OS receive buffer fills and TCP flow control throttles the remote sender — matching the built-in buffering ofURLSession.AsyncBytes. Bothsuspend()andresume()run while holding the delegate'sNSLock, so the flag flip and task-state change are atomic; otherwise a fast consumer on another thread could clearsuspendedForBackpressureand callresume()between the delegate's flag-set andsuspend(), leaving the task suspended with the flag cleared and no code path to wake it up. Apple documentsURLSessionTask.suspend()/resume()as thread-safe and they do not synchronously re-enter the delegate, so holding our lock across them is safe.stream()andstreamPostWithRetry()— return type changes fromURLSession.AsyncBytestoSafeAsyncBytes.streamPostWithRetrygains asession: URLSession = .sharedparameter so callers can opt into dedicated sessions without touching current call sites.streamPost(non-retry variant) — deleted. Zero call sites since Move sendBtwMessage to BtwClient, add streamPost to GatewayHTTPClient #19442;BtwClient(the only POST-SSE caller) usesstreamPostWithRetry.EventStreamClient.startSSEStream— updated the block comment above thesseSession === sessionguard to note it's now a cheap MainActor-side early bailout, withGatewayHTTPClient.stream's ObjC trampoline as the authoritative defense.Alternatives considered
stream()@MainActor— would serialise task creation with invalidation, but contradicts the deliberate non-isolation of the network layer (PR Move network layer off @MainActor isolation (LUM-492) #21729) and would hop every non-SSE caller through MainActor.dataTask(with:)inside@try/@catch, cancel the probe, then callbytes(for:)normally. Leaves a TOCTOU window between a successful probe and the real call. Rejected in favour of owning the real call site.await session.bytes(for:)directly in@try/@catch— infeasible; ObjC exception handlers are synchronous and cannot wrapawait.AsyncThrowingStream's boundedbufferingOldest(N)/bufferingNewest(N)— both policies drop chunks on overflow, which would corrupt a byte-accurate stream. Rejected in favour of real TCP backpressure viatask.suspend()/resume().suspend()/resume()outside the lock — simpler but not atomic with the flag flip, which allowed the race flagged in review (clear-flag-then-resume interleaving with set-flag-then-suspend leaves the task suspended with the flag cleared). Rejected in favour of holding the lock across the task-state change.Root cause analysis
URLSessionbut leftURLSession.bytes(for:)as the byte-stream primitive; PR fix(macos): guard SSE task against superseded session on MainActor race #25426 added a MainActor-sidesseSession === sessionguard assuming it was sufficient. Neither accounted for the fact thatURLSession.bytes(for:)synchronously calls-[NSURLSession dataTaskWithRequest:]from its non-isolated async body, leaving a window between the MainActor guard and task creation where another MainActor turn can invalidate the session.URLSessionis "do not create tasks afterinvalidateAndCancel()" and enforcing that requires owning thedataTask(with:)call site.streamPost/streamPostWithRetrystill usedURLSession.shared— the same class of lifecycle bug in a sibling entry point. And the fact that Swiftdo/catchcouldn't catch the crash in production was itself a signal that the exception was ObjC, not Swift.NSURLSessionTaskon a session whose lifecycle we don't own, wrap the synchronous prefix in the ObjC trampoline or drive the task ourselves. Guards on MainActor alone are not sufficient for non-isolated callees.clients/AGENTS.md→ "Concurrency and Task Management", add a short durable rule: "Swift'sdo/catchcannot intercept Objective-CNSExceptions. When a Foundation API is documented (or empirically known) to raiseNSExceptionon misuse — e.g.-[NSURLSession dataTaskWithRequest:]on an invalidated session, orAVAudioNode.installTap(onBus:bufferSize:format:)on format mismatch — wrap the call site inVLMPerformWithObjCExceptionHandlingso the exception surfaces as a Swift-catchableNSError." Kept short so it doesn't drift.Self-verified review items
Items below are verified from the diff / codebase and do not need a second human pass.
clients/Package.swiftdeclares.macOS("15.0")and.iOS(.v17);URLSessionTask.delegaterequires iOS 15+ / macOS 12+.SafeAsyncBytes.rg '\.bytes\(for:' clients/**/*.swiftreturns zero matches. The three consumers (EventStreamClient.startSSEStream,BtwClient.streamBtw,TerminalAPIClient.subscribeEvents) all consume viabytes.lines, which resolves againstAsyncSequence where Element == UInt8— satisfied bySafeAsyncBytes.session.dataTask(with:)remains on the streaming path. Only one call site; it's inside theVLMPerformWithObjCExceptionHandlingtrampoline insafeBytes.chunksContinuation.onTermination → task.cancel(); (b) parent Swift task cancellation during header wait →withTaskCancellationHandler.onCancel → task.cancel(); (c) session invalidation →didCompleteWithError→chunksContinuation.finish(throwing:). All three paths converge on the iterator's thrown error.StreamingBytesDelegate.taskisweak var;bytesConsumedcaptures[weak self].setResponseContinuationresumes immediately ifresponseResolved == true, using the stashedcompletionError(orURLError(.cancelled)as a fallback).dataTask.suspend()andtask?.resume()execute while the delegate'sNSLockis held; the flag and task-state change are atomic. Apple documentsURLSessionTask.suspend()/resume()as thread-safe and they do not synchronously re-enter the delegate — safe to hold our own lock across them.pendingBytes += data.counthappens beforeyield(data), and the decrement usesmax(0, pendingBytes - count).streamPost.rg 'streamPost\b'shows onlystreamPostWithRetrycall sites remain (BtwClientis the sole caller; defaultsessionkept its signature binary-compatible for that caller).AsyncBytes-only APIs (.task, delegate inspection); all three use.lines.Remaining human-only checks
swift/xcodebuild; CI is the pre-merge signal but required jobs are skipped under this repo's Devin-authored-branch policy, so local Xcode is the authoritative check.TerminalAPIClient.subscribeEventsunder load. The 1 MiB / 256 KiB thresholds are chosen conservatively; the suspend/resume cadence on a realistic high-volume terminal session is worth a quick Instruments pass to confirm no pathological flapping.safeBytesthrows →handleSSEDisconnect()→scheduleSSEReconnect()uses exponential backoff; worth eyeballing once locally to confirm it doesn't spin tight before the reconnect path replaces the session.References
URLSessionTask.delegate(per-task delegate, iOS 15+ / macOS 12+)URLSessionTask.suspend()/resume()/cancel()— documented thread-safe; suspend/resume is Apple's mechanism for TCP flow control on a data task.URLSessionDataDelegatewithTaskCancellationHandler(operation:onCancel:)withCheckedThrowingContinuation(function:_:)AsyncSequence.linesdo/catchcannot catch NSExceptions; an ObjC trampoline is the only option)Test plan
safeBytesdrives the stream from delegate callbacks; callers usebytes.linesexactly as before.dataTask(with:)on an invalidated session throwsURLError(.cancelled)intoEventStreamClient.startSSEStream's existingdo/catch→handleSSEDisconnect()→ reconnect.chunksContinuation.onTerminationcancels the underlyingURLSessionDataTask.withTaskCancellationHandler.onCancelcancels the data task;didCompleteWithError(.cancelled)fires; the response continuation throws.setResponseContinuationresolves immediately against a stashedcompletionErrorif the delegate has already completed.URLSessionDataTasksuspended above 1 MiB queued, resumed below 256 KiB. Suspension +chunksContinuation.onTermination → task.cancel()still tears down cleanly (cancel()overridessuspend()perURLSessionTaskdocs).Link to Devin session: https://app.devin.ai/sessions/356bfc5ee25d4c0e9d49eefc818de35d
Requested by: @ashleeradka