feat(acp): Align to new request patterns of ACP Streamable HTTP/WS transport#8605
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 2ecc9acdfd
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| for senders in listeners.values_mut() { | ||
| senders.retain(|tx| tx.send(msg.to_string()).is_ok()); |
There was a problem hiding this comment.
Scope GET stream broadcasts to the subscribed session
register_get_listener stores listeners under their Acp-Session-Id, but broadcast ignores that key and pushes every notification/request to every listener (listeners.values_mut()). When a single connection has multiple Goose sessions, a GET stream opened for session A will receive events from session B, which breaks the session-scoped contract and leaks cross-session traffic to the wrong client.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
I think this is the point, right? broadcast is for server events that are not tied to a session?
There was a problem hiding this comment.
yes, I will need to look at this and revise it! thank you
| } else { | ||
| (StatusCode::BAD_REQUEST, "Invalid JSON-RPC message").into_response() | ||
| info!( | ||
| method = method, | ||
| connection_id = %conn_id, | ||
| "ACP notification/response: {}", method | ||
| ); | ||
| handle_notification_or_response(state, &conn_id, &json_message).await |
There was a problem hiding this comment.
Reject malformed JSON-RPC payloads before forwarding
The else branch now forwards any non-request JSON payload to handle_notification_or_response, so objects that are neither JSON-RPC notifications nor responses (for example {} or {"foo":"bar"}) are accepted and return 202 instead of 400. This regresses input validation from the previous logic and can push invalid frames into the ACP connection, causing downstream protocol errors instead of a clear client-facing validation failure.
Useful? React with 👍 / 👎.
| .unwrap_or("<response>"); | ||
| let id = parsed.get("id").map(|id| id.to_string()); | ||
| if is_jsonrpc_response_or_error(&parsed) { | ||
| info!( |
There was a problem hiding this comment.
this seems a bit verbose to be at info
| for senders in listeners.values_mut() { | ||
| senders.retain(|tx| tx.send(msg.to_string()).is_ok()); |
There was a problem hiding this comment.
I think this is the point, right? broadcast is for server events that are not tied to a session?
2ecc9ac to
7232fdf
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7232fdfc85
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| // Informational — the SDK tracks sessionId in the response body. | ||
| } | ||
|
|
||
| await consumeSSE(response, abortController.signal); |
There was a problem hiding this comment.
Do not await per-request SSE inside WritableStream.write
This await consumeSSE(...) makes each write() block until the request’s SSE stream finishes, but WritableStream processes writes sequentially. If the server sends a request that needs a client response while a prior request is still open (for example the permission request flow triggered via cx.send_request(...) in goose-acp), that response write is queued behind the blocked write, so the server waits for a response the client cannot send yet and the request can deadlock. Dispatch the request and consume SSE in a background task so write() returns promptly.
Useful? React with 👍 / 👎.
| value.get("id").map(|id| match id { | ||
| Value::String(s) => s.clone(), | ||
| Value::Number(n) => n.to_string(), | ||
| other => other.to_string(), |
There was a problem hiding this comment.
Preserve JSON-RPC id type when keying request streams
extract_jsonrpc_id collapses numeric and string IDs into the same string key (1 and "1" both become "1"). Since request streams are keyed by this value, concurrent in-flight requests that differ only by JSON-RPC id type can overwrite each other, causing responses to be routed to the wrong stream and leaving one caller hanging. Use a lossless key type (for example a typed enum or serde_json::Value) so numeric and string ids remain distinct.
Useful? React with 👍 / 👎.
dff4fa1 to
2fc7af6
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 2fc7af6b51
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| let mut response = sse_from_receiver(rx).into_response(); | ||
| response | ||
| .headers_mut() | ||
| .insert(HEADER_SESSION_ID, acp_session_id.parse().unwrap()); | ||
| .insert(HEADER_CONNECTION_ID, connection_id.parse().unwrap()); | ||
| response |
There was a problem hiding this comment.
Expire orphaned connections when clients drop without DELETE
After initialize, the handler returns an SSE response directly and leaves the new connection in HttpState until an explicit DELETE /acp; there is no automatic cleanup path for abrupt client exits (process crash, browser tab close, network loss). In those cases remove_connection is never called, so the per-connection agent/router tasks can accumulate and leak memory/resources over time. Please add server-side reaping (idle timeout or disconnect-based cleanup) for connections that never send DELETE.
Useful? React with 👍 / 👎.
| let mut messages = Vec::new(); | ||
| let mut session_id: Option<String> = None; | ||
|
|
||
| while let Some(msg) = rx.recv().await { | ||
| if session_id.is_none() { |
There was a problem hiding this comment.
Avoid buffering full session/load SSE before responding
build_session_creating_sse accumulates all messages in memory before constructing the SSE response, so session/load replays are not streamed incrementally and the first bytes are delayed until the channel closes. For large history replays this creates unnecessary latency and unbounded memory growth in the handler, and can trigger request timeouts under proxies. This path should stream progressively (or skip optional header extraction) instead of buffering the entire event sequence.
Useful? React with 👍 / 👎.
2fc7af6 to
0c8057b
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0c8057b1c5
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| let rx = router.register_request(request_id).await; | ||
|
|
||
| let message_str = serde_json::to_string(json_message).unwrap(); | ||
| if let Err(status) = state.send_message(&acp_session_id, message_str).await { | ||
| if let Err(status) = state.send_to_agent(connection_id, message_str).await { | ||
| return status.into_response(); |
There was a problem hiding this comment.
Remove stale request streams when forwarding fails
handle_request inserts a per-request sender before attempting send_to_agent, but the error path returns immediately without deleting that entry. When the ACP task/channel is already closed, no response can ever arrive to trigger normal route() cleanup, so each retry on that connection leaks another request_streams entry and wastes memory. Remove the registered id on send failure (or defer registration until after a successful send).
Useful? React with 👍 / 👎.
| close() { | ||
| sseAbort.abort(); | ||
| // Terminate the connection. | ||
| if (connectionId) { | ||
| const headers: Record<string, string> = { | ||
| [ACP_CONNECTION_HEADER]: connectionId, |
There was a problem hiding this comment.
Cleanup ACP connection on WritableStream abort paths
The client only sends DELETE /acp from close(), but WritableStream error/abort paths do not call close (for example, a rejected write() or explicit writer.abort()). In those common failure scenarios, connectionId is never torn down and server-side connection state is left orphaned. Add an abort() sink handler (or shared finalizer) that performs the same best-effort DELETE cleanup.
Useful? React with 👍 / 👎.
Implements the 2026-04-23 revision of the Streamable HTTP & WebSocket transport RFD: agentclientprotocol/agent-client-protocol#1064 - Two-header identity model: Acp-Connection-Id (returned at initialize) and Acp-Session-Id (required on session-scoped POSTs). - Single long-lived GET SSE stream per connection carries all server→client messages (responses, notifications, and server-initiated requests), demuxed by client via sessionId in the JSON-RPC body. - POST initialize returns 200 OK with the initialize response JSON body and Acp-Connection-Id header. All other POSTs return 202 Accepted immediately; the agent's response is delivered on the GET stream. - Introduces a shared ConnectionRegistry used by both HTTP and WebSocket handlers, backed by a broadcast channel for fan-out to SSE/WS sinks. - WebSocket upgrade on the same /acp endpoint attaches to the same connection abstraction. - DELETE /acp tears down the connection and aborts the agent task. Signed-off-by: Alex Hancock <alexhancock@block.xyz>
0c8057b to
8a8e3f3
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8a8e3f324a
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| // subscriber connects would otherwise be lost. Buffering here | ||
| // is a future improvement; for now we rely on the client | ||
| // opening its GET stream before issuing further requests. | ||
| let _ = outbound_tx.send(msg); |
There was a problem hiding this comment.
Buffer outbound messages before first GET subscriber
start_fanout drops every message when no subscriber is attached because broadcast::Sender::send returns Err in that case and the error is ignored. In the HTTP flow, fanout is started before the client can open GET /acp, so any agent notifications or server-initiated requests emitted in that window are permanently lost and can desynchronize the client. Delay fanout until a subscriber exists or add a replay buffer for pre-subscription messages.
Useful? React with 👍 / 👎.
| connectionId = connId; | ||
|
|
||
| const body = (await response.json()) as AnyMessage; | ||
| acpDebug("initialize response", body); | ||
| await openGetStream(); |
There was a problem hiding this comment.
Tear down connection when initialize GET setup fails
After a successful initialize POST, the client stores connectionId and then awaits openGetStream(). If opening the GET stream fails (e.g., network error or non-2xx), sendInitialize throws before any DELETE is sent; because this failure occurs in write(), close()/abort() are not guaranteed to run, so the server-side connection can be orphaned. Add failure cleanup (best-effort DELETE + clear connectionId) around openGetStream.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 562337f107
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| cancel() { | ||
| closed = true; | ||
| getStreamAbort?.abort(); | ||
| }, |
There was a problem hiding this comment.
Send DELETE when readable stream is canceled
The cancel() handler only aborts the GET SSE request, but it never calls sendDelete(), so teardown paths that cancel the readable side (for example reader cancellation on shutdown/error) leave the server connection allocated. In this transport, server-side cleanup is tied to DELETE /acp, so this path can orphan agent tasks and connection state until process-level cleanup.
Useful? React with 👍 / 👎.
562337f to
f16e328
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f16e3283fe
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| function waitForInbox(): Promise<void> { | ||
| if (inbox.length > 0) return Promise.resolve(); | ||
| return new Promise<void>((r) => { | ||
| pullResolve = r; | ||
| }); |
There was a problem hiding this comment.
Return immediately from waitForInbox after close
waitForInbox never checks closed, so if close()/abort()/cancel() runs when no pull is waiting, a later pull creates a new unresolved promise and blocks forever. In that state the readable side never reaches controller.close(), which can leave consumers waiting indefinitely for stream completion after shutdown. Add a closed short-circuit in waitForInbox (or before awaiting it in pull) so post-close pulls complete immediately.
Useful? React with 👍 / 👎.
|
ci failure is unrelated so i'll merge this |
* main: (29 commits) chore(deps): bump winreg from 0.55.0 to 0.56.0 (#8829) Fix grammar issue (#8669) colorize context window indicator (#8851) Refresh canonical model metadata from models.dev (#8838) fix(ci): prevent flaky smoke test timeouts from failing the build (#8837) updates: release 0.19.0 of the tui/sdk/etc (#8806) add a goose2 signed release flow (#8728) Port provider tests to typescript (#8237) refactor: make ACP server smaller (#8787) Add NVIDIA provider, and improve declarative provider UX (#8798) fix: removed failed provider test for deprecated providers (#8801) fix: only call cleanup when the pr is from same repo (#8799) chore: check stale for draft pr (#8803) fix: use _meta instead of meta in newSession request (#8796) fix: add missing underscore prefix in updateWorkingDir method name (#8743) feat: migrate session metadata storage from frontend overlay to backend (#8769) Add more info to BUILDING_LINUX (#8789) feat(acp): Align to new request patterns of ACP Streamable HTTP/WS transport (#8605) Dedupe and organize skills/sources (#8731) docs: add skills slash command (#8783) ...
Summary
goose-acpserver andui/sdkclient implementations in line with the latest revision of ACP RFD 721 (transports: Revisions on streamable-http/ws transport request patterns agentclientprotocol/agent-client-protocol#1064)goose servenow handles the request patterns describednpx @aaif/goose --server $URLnow sends the request patterns describedTesting
Manual usage
Related Issues
agentclientprotocol/agent-client-protocol#721
agentclientprotocol/agent-client-protocol#1064
#6642