diff --git a/ui/desktop/src/hooks/useChatStream.ts b/ui/desktop/src/hooks/useChatStream.ts index 6a94b0933888..9c58aad291e5 100644 --- a/ui/desktop/src/hooks/useChatStream.ts +++ b/ui/desktop/src/hooks/useChatStream.ts @@ -367,11 +367,17 @@ export function useChatStream({ // the full conversation history. Events are buffered in the meantime. const pendingReattachRequestIdRef = useRef(null); const pendingReattachBufferRef = useRef([]); + // Suppress ActiveRequests reattach while reloading conversation after + // buffer overflow — the reload replaces messages and we don't want the + // SSE reconnect to reattach with stale state before that completes. + const reloadingConversationRef = useRef(false); const namePollingRef = useRef | null>(null); - // Ref to access latest state in callbacks (avoids stale closures) + // Ref to access latest state/sessionId in callbacks (avoids stale closures) const stateRef = useRef(state); stateRef.current = state; + const sessionIdRef = useRef(sessionId); + sessionIdRef.current = sessionId; const doReattachRef = useRef<((requestId: string, messages: Message[]) => void) | null>(null); useEffect(() => { @@ -453,16 +459,53 @@ export function useChatStream({ // Reload the full conversation from the server, e.g. after the SSE // stream indicates the client fell too far behind the replay buffer. const reloadConversation = useCallback(() => { + // Clear active request state so the SSE reconnect doesn't reattach + // to the same request and replay buffered deltas on top of the + // freshly-loaded conversation. + if (activeUnsubscribeRef.current) { + activeUnsubscribeRef.current(); + activeUnsubscribeRef.current = null; + } + activeRequestIdRef.current = null; + activeRequestSessionIdRef.current = null; + pendingReattachRequestIdRef.current = null; + pendingReattachBufferRef.current = []; + // Suppress reattach until the reload completes + reloadingConversationRef.current = true; + + // Capture the session ID so we can guard against session switches + // that happen while getSession is in flight. + const reloadSessionId = sessionId; + getSession({ - path: { session_id: sessionId }, + path: { session_id: reloadSessionId }, throwOnError: true, }).then((response) => { + // Session switched while we were reloading — discard stale result + if (reloadSessionId !== sessionIdRef.current) return; + const session = response.data as Session; - if (session?.conversation) { - dispatch({ type: 'SET_MESSAGES', payload: session.conversation }); + const messages = session?.conversation || []; + if (messages.length > 0) { + dispatch({ type: 'SET_MESSAGES', payload: messages }); + } + reloadingConversationRef.current = false; + // If ActiveRequests arrived during the reload, complete the + // deferred reattach now that we have fresh messages. + const pendingRequestId = pendingReattachRequestIdRef.current; + if (pendingRequestId) { + doReattachRef.current?.(pendingRequestId, messages); } }).catch((e) => { console.warn('Failed to reload conversation after buffer overflow:', e); + reloadingConversationRef.current = false; + // Best-effort recovery: if ActiveRequests arrived during the failed + // reload, reattach with current messages so the reply isn't lost. + if (reloadSessionId !== sessionIdRef.current) return; + const pendingRequestId = pendingReattachRequestIdRef.current; + if (pendingRequestId) { + doReattachRef.current?.(pendingRequestId, stateRef.current.messages); + } }); }, [sessionId]); @@ -540,15 +583,18 @@ export function useChatStream({ if (activeRequestIdRef.current) return; if (requestIds.length === 0) return; - // Reattach to the first (most recent) active request. + // Reattach to the most recent active request (uuidv7 is time-ordered, + // so the lexicographically largest ID is the newest). // Multiple concurrent requests per session aren't supported in the UI. - const requestId = requestIds[0]; + const sorted = [...requestIds].sort(); + const requestId = sorted[sorted.length - 1]; const currentMessages = stateRef.current.messages; - if (currentMessages.length === 0) { - // Cold mount: resumeAgent hasn't populated messages yet. - // Defer event processing until session load completes so the - // processor starts with the full conversation history. + if (currentMessages.length === 0 || reloadingConversationRef.current) { + // Either cold mount (resumeAgent hasn't populated messages) or + // overflow reload (getSession is replacing stale messages). + // Defer event processing until the load completes so the + // processor starts with the correct conversation history. // Register a buffering listener NOW so replayed events aren't // lost while we wait. pendingReattachRequestIdRef.current = requestId; @@ -652,6 +698,24 @@ export function useChatStream({ [addListener, onFinish, reloadConversation] ); + // Reset request-tracking state when switching sessions so the previous + // session's in-flight request doesn't leak into the next one. + // This runs for ALL session changes (including cache hits). + useEffect(() => { + return () => { + if (activeUnsubscribeRef.current) { + activeUnsubscribeRef.current(); + activeUnsubscribeRef.current = null; + } + activeRequestIdRef.current = null; + activeRequestSessionIdRef.current = null; + activeAbortRef.current = null; + pendingReattachRequestIdRef.current = null; + pendingReattachBufferRef.current = []; + reloadingConversationRef.current = false; + }; + }, [sessionId]); + // Load session on mount or sessionId change useEffect(() => { if (!sessionId) return;