-
Notifications
You must be signed in to change notification settings - Fork 4.6k
fix: SSE reconnect followup - reset refs, prevent duplication, sort active requests #7992
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b336664
4471aac
56ed38b
351e171
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -367,11 +367,17 @@ export function useChatStream({ | |
| // the full conversation history. Events are buffered in the meantime. | ||
| const pendingReattachRequestIdRef = useRef<string | null>(null); | ||
| const pendingReattachBufferRef = useRef<SessionEvent[]>([]); | ||
| // Suppress ActiveRequests reattach while reloading conversation after | ||
| // buffer overflow — the reload replaces messages and we don't want the | ||
| // SSE reconnect to reattach with stale state before that completes. | ||
| const reloadingConversationRef = useRef(false); | ||
| const namePollingRef = useRef<ReturnType<typeof setTimeout> | null>(null); | ||
|
|
||
| // Ref to access latest state in callbacks (avoids stale closures) | ||
| // Ref to access latest state/sessionId in callbacks (avoids stale closures) | ||
| const stateRef = useRef(state); | ||
| stateRef.current = state; | ||
| const sessionIdRef = useRef(sessionId); | ||
| sessionIdRef.current = sessionId; | ||
| const doReattachRef = useRef<((requestId: string, messages: Message[]) => void) | null>(null); | ||
|
|
||
| useEffect(() => { | ||
|
|
@@ -453,16 +459,53 @@ export function useChatStream({ | |
| // Reload the full conversation from the server, e.g. after the SSE | ||
| // stream indicates the client fell too far behind the replay buffer. | ||
| const reloadConversation = useCallback(() => { | ||
| // Clear active request state so the SSE reconnect doesn't reattach | ||
| // to the same request and replay buffered deltas on top of the | ||
| // freshly-loaded conversation. | ||
| if (activeUnsubscribeRef.current) { | ||
| activeUnsubscribeRef.current(); | ||
| activeUnsubscribeRef.current = null; | ||
| } | ||
| activeRequestIdRef.current = null; | ||
| activeRequestSessionIdRef.current = null; | ||
| pendingReattachRequestIdRef.current = null; | ||
| pendingReattachBufferRef.current = []; | ||
| // Suppress reattach until the reload completes | ||
| reloadingConversationRef.current = true; | ||
|
|
||
| // Capture the session ID so we can guard against session switches | ||
| // that happen while getSession is in flight. | ||
| const reloadSessionId = sessionId; | ||
|
|
||
| getSession({ | ||
| path: { session_id: sessionId }, | ||
| path: { session_id: reloadSessionId }, | ||
| throwOnError: true, | ||
| }).then((response) => { | ||
| // Session switched while we were reloading — discard stale result | ||
| if (reloadSessionId !== sessionIdRef.current) return; | ||
|
Comment on lines
+484
to
+485
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This guard only compares the stored session ID to Useful? React with 👍 / 👎. |
||
|
|
||
| const session = response.data as Session; | ||
| if (session?.conversation) { | ||
| dispatch({ type: 'SET_MESSAGES', payload: session.conversation }); | ||
| const messages = session?.conversation || []; | ||
| if (messages.length > 0) { | ||
| dispatch({ type: 'SET_MESSAGES', payload: messages }); | ||
| } | ||
| reloadingConversationRef.current = false; | ||
|
Comment on lines
+489
to
+492
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Fresh evidence in this follow-up patch: Useful? React with 👍 / 👎. |
||
| // If ActiveRequests arrived during the reload, complete the | ||
| // deferred reattach now that we have fresh messages. | ||
| const pendingRequestId = pendingReattachRequestIdRef.current; | ||
| if (pendingRequestId) { | ||
| doReattachRef.current?.(pendingRequestId, messages); | ||
|
Comment on lines
+495
to
+497
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If the user switches from session A to session B while Useful? React with 👍 / 👎. |
||
| } | ||
| }).catch((e) => { | ||
| console.warn('Failed to reload conversation after buffer overflow:', e); | ||
| reloadingConversationRef.current = false; | ||
|
Comment on lines
499
to
+501
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
By the time execution reaches this Useful? React with 👍 / 👎. |
||
| // Best-effort recovery: if ActiveRequests arrived during the failed | ||
| // reload, reattach with current messages so the reply isn't lost. | ||
| if (reloadSessionId !== sessionIdRef.current) return; | ||
| const pendingRequestId = pendingReattachRequestIdRef.current; | ||
| if (pendingRequestId) { | ||
| doReattachRef.current?.(pendingRequestId, stateRef.current.messages); | ||
| } | ||
| }); | ||
| }, [sessionId]); | ||
|
|
||
|
|
@@ -540,15 +583,18 @@ export function useChatStream({ | |
| if (activeRequestIdRef.current) return; | ||
| if (requestIds.length === 0) return; | ||
|
|
||
| // Reattach to the first (most recent) active request. | ||
| // Reattach to the most recent active request (uuidv7 is time-ordered, | ||
| // so the lexicographically largest ID is the newest). | ||
| // Multiple concurrent requests per session aren't supported in the UI. | ||
| const requestId = requestIds[0]; | ||
| const sorted = [...requestIds].sort(); | ||
| const requestId = sorted[sorted.length - 1]; | ||
| const currentMessages = stateRef.current.messages; | ||
|
|
||
| if (currentMessages.length === 0) { | ||
| // Cold mount: resumeAgent hasn't populated messages yet. | ||
| // Defer event processing until session load completes so the | ||
| // processor starts with the full conversation history. | ||
| if (currentMessages.length === 0 || reloadingConversationRef.current) { | ||
| // Either cold mount (resumeAgent hasn't populated messages) or | ||
| // overflow reload (getSession is replacing stale messages). | ||
| // Defer event processing until the load completes so the | ||
| // processor starts with the correct conversation history. | ||
| // Register a buffering listener NOW so replayed events aren't | ||
| // lost while we wait. | ||
| pendingReattachRequestIdRef.current = requestId; | ||
|
|
@@ -652,6 +698,24 @@ export function useChatStream({ | |
| [addListener, onFinish, reloadConversation] | ||
| ); | ||
|
|
||
| // Reset request-tracking state when switching sessions so the previous | ||
| // session's in-flight request doesn't leak into the next one. | ||
| // This runs for ALL session changes (including cache hits). | ||
| useEffect(() => { | ||
| return () => { | ||
| if (activeUnsubscribeRef.current) { | ||
| activeUnsubscribeRef.current(); | ||
| activeUnsubscribeRef.current = null; | ||
| } | ||
| activeRequestIdRef.current = null; | ||
| activeRequestSessionIdRef.current = null; | ||
| activeAbortRef.current = null; | ||
| pendingReattachRequestIdRef.current = null; | ||
| pendingReattachBufferRef.current = []; | ||
| reloadingConversationRef.current = false; | ||
| }; | ||
| }, [sessionId]); | ||
|
|
||
| // Load session on mount or sessionId change | ||
| useEffect(() => { | ||
| if (!sessionId) return; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.