From d77023a65d7c71de37649b8c4921f94314dfa5b2 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 1 May 2026 13:07:33 +0100 Subject: [PATCH 01/17] create conversation with uuid if does not exist --- .../plugins/shared/agent_builder/server/routes/chat.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x-pack/platform/plugins/shared/agent_builder/server/routes/chat.ts b/x-pack/platform/plugins/shared/agent_builder/server/routes/chat.ts index 12c0583fc1b25..0342a265f6b26 100644 --- a/x-pack/platform/plugins/shared/agent_builder/server/routes/chat.ts +++ b/x-pack/platform/plugins/shared/agent_builder/server/routes/chat.ts @@ -6,6 +6,7 @@ */ import { omit } from 'lodash'; +import { validate as uuidValidate } from 'uuid'; import { schema } from '@kbn/config-schema'; import path from 'node:path'; import type { Observable } from 'rxjs'; @@ -73,6 +74,7 @@ export function registerChatRoutes({ ), conversation_id: schema.maybe( schema.string({ + validate: (v) => (uuidValidate(v) ? undefined : 'conversation_id must be a valid UUID'), meta: { description: 'Optional existing conversation ID to continue a previous conversation.', }, @@ -301,6 +303,7 @@ export function registerChatRoutes({ agentId, connectorId, conversationId, + autoCreateConversationWithId: true, capabilities, browserApiTools, configurationOverrides, From e07d49422b4b8fb8b5303d7949a1c8f75ced57f5 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 1 May 2026 13:30:56 +0100 Subject: [PATCH 02/17] remove new cache key --- .../conversation_input/conversation_input.tsx | 6 +- .../round_response/chat_message_text.test.tsx | 2 - .../views/conversation_view/index.tsx | 3 +- .../legacy_conversation_redirect.tsx | 3 +- .../embeddable_conversations_provider.tsx | 8 - .../routed_conversations_provider.tsx | 44 +--- .../conversation/use_conversation_actions.ts | 106 ++------- .../send_message/send_message_context.tsx | 2 +- .../send_message/use_pending_message_state.ts | 18 +- .../send_message/use_resume_round_mutation.ts | 2 +- .../send_message/use_send_message_mutation.ts | 92 ++++--- .../use_subscribe_to_chat_events.ts | 225 +++++++++--------- .../application/hooks/use_conversation.ts | 9 +- .../application/hooks/use_initial_message.ts | 24 +- .../application/hooks/use_submit_message.ts | 58 +++++ .../public/application/utils/app_paths.ts | 5 +- .../application/utils/new_conversation.ts | 15 +- 17 files changed, 303 insertions(+), 319 deletions(-) create mode 100644 x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_submit_message.ts diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/conversation_input.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/conversation_input.tsx index 63b21b222f443..8054371d7a687 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/conversation_input.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/conversation_input.tsx @@ -18,6 +18,7 @@ import type { PropsWithChildren } from 'react'; import React, { useEffect, useMemo } from 'react'; import { useConversationId } from '../../../context/conversation/use_conversation_id'; import { useSendMessage } from '../../../context/send_message/send_message_context'; +import { useSubmitMessage } from '../../../hooks/use_submit_message'; import { useAgentBuilderAgents } from '../../../hooks/agents/use_agents'; import { useValidateAgentId } from '../../../hooks/agents/use_validate_agent_id'; import { useIsSendingMessage } from '../../../hooks/use_is_sending_message'; @@ -145,7 +146,7 @@ export const ConversationInput: React.FC = ({ onEditorFocus, }) => { const isSendingMessage = useIsSendingMessage(); - const { sendMessage, pendingMessage, error, isResuming } = useSendMessage(); + const { pendingMessage, error, isResuming } = useSendMessage(); const { isFetched } = useAgentBuilderAgents(); const agentId = useAgentId(); const conversationId = useConversationId(); @@ -158,6 +159,7 @@ export const ConversationInput: React.FC = ({ const isAwaitingPrompt = useIsAwaitingPrompt(); const { attachments, initialMessage, autoSendInitialMessage, resetInitialMessage } = useConversationContext(); + const submitMessage = useSubmitMessage(); const validateAgentId = useValidateAgentId(); const isAgentIdValid = validateAgentId(agentId); @@ -241,7 +243,7 @@ export const ConversationInput: React.FC = ({ } return; } - sendMessage({ message: content }); + submitMessage(content); messageEditorController.clear(); onSubmit?.(); }; diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_response/chat_message_text.test.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_response/chat_message_text.test.tsx index cb9d32a352ecc..8b23e03fce8e4 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_response/chat_message_text.test.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_response/chat_message_text.test.tsx @@ -112,11 +112,9 @@ describe('chat_message_text', () => { isEmbeddedContext: false, browserApiTools: undefined, conversationActions: { - removeNewConversationQuery: jest.fn(), invalidateConversation: jest.fn(), addOptimisticRound: jest.fn(), removeOptimisticRound: jest.fn(), - setAgentId: jest.fn(), addReasoningStep: jest.fn(), addToolCall: jest.fn(), setToolCallProgress: jest.fn(), diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/layout/unified_sidebar/views/conversation_view/index.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/layout/unified_sidebar/views/conversation_view/index.tsx index 237775470b0e3..5eb02c79a4949 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/layout/unified_sidebar/views/conversation_view/index.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/layout/unified_sidebar/views/conversation_view/index.tsx @@ -23,7 +23,6 @@ import { css } from '@emotion/react'; import { i18n } from '@kbn/i18n'; import { agentBuilderDefaultAgentId } from '@kbn/agent-builder-common'; import { appPaths } from '../../../../../utils/app_paths'; -import { newConversationId } from '../../../../../utils/new_conversation'; import { getAgentIdFromPath, getAgentSettingsNavItems, @@ -83,7 +82,7 @@ export const ConversationSidebarView: React.FC = () => { const hasConversations = conversations.length > 0; const isNewConversationRoute = - conversationId === newConversationId || pathname === appPaths.agent.root({ agentId }); + conversationId === 'new' || pathname === appPaths.agent.root({ agentId }); const navItems = useMemo( () => getAgentSettingsNavItems(agentId, featureFlags), diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/redirects/legacy_conversation_redirect.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/redirects/legacy_conversation_redirect.tsx index 9b573c171f00a..0467a4b4cc8b4 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/redirects/legacy_conversation_redirect.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/redirects/legacy_conversation_redirect.tsx @@ -15,7 +15,6 @@ import { useQuery } from '@kbn/react-query'; import { useLastAgentId } from '../../hooks/use_last_agent_id'; import { useAgentBuilderServices } from '../../hooks/use_agent_builder_service'; import { appPaths } from '../../utils/app_paths'; -import { newConversationId } from '../../utils/new_conversation'; export const LegacyConversationRedirect: React.FC = () => { const { conversationId } = useParams<{ conversationId?: string }>(); @@ -23,7 +22,7 @@ export const LegacyConversationRedirect: React.FC = () => { const lastAgentId = useLastAgentId(); const { conversationsService } = useAgentBuilderServices(); - const isNewConversation = !conversationId || conversationId === newConversationId; + const isNewConversation = !conversationId || conversationId === 'new'; const { data: conversation, diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/conversation/embeddable_conversations_provider.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/context/conversation/embeddable_conversations_provider.tsx index 16dd18dc41f14..5408bcd37cb4b 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/conversation/embeddable_conversations_provider.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/conversation/embeddable_conversations_provider.tsx @@ -123,13 +123,6 @@ export const EmbeddableConversationsProvider: React.FC { - setConversationId(id); - }, - [setConversationId] - ); - const onDeleteConversation = useCallback(() => { setConversationId(undefined); }, [setConversationId]); @@ -147,7 +140,6 @@ export const EmbeddableConversationsProvider: React.FC(); const conversationId = useMemo(() => { - return conversationIdParam === newConversationId ? undefined : conversationIdParam; + return conversationIdParam === 'new' ? undefined : conversationIdParam; }, [conversationIdParam]); const agentIdFromPath = agentIdParam; @@ -45,43 +43,6 @@ export const RoutedConversationsProvider: React.FC { - return () => { - // On unmount disable conversation redirect - shouldAllowConversationRedirectRef.current = false; - }; - }, []); - - // Clear new conversation cache when agent changes to ensure fresh state - useEffect(() => { - if (!conversationId) { - queryClient.removeQueries({ queryKey: queryKeys.conversations.byId(newConversationId) }); - } - }, [agentIdFromPath, conversationId, queryClient]); - - const navigateToConversation = useCallback( - ({ nextConversationId }: { nextConversationId: string }) => { - // Navigate to the conversation if redirect is allowed - if (shouldAllowConversationRedirectRef.current && agentIdFromPath) { - const path = appPaths.agent.conversations.byId({ - agentId: agentIdFromPath, - conversationId: nextConversationId, - }); - const state = { shouldStickToBottom: false }; - navigateToAgentBuilderUrl(path, undefined, state); - } - }, - [shouldAllowConversationRedirectRef, navigateToAgentBuilderUrl, agentIdFromPath] - ); - - const onConversationCreated = useCallback( - ({ conversationId: id }: { conversationId: string }) => { - navigateToConversation({ nextConversationId: id }); - }, - [navigateToConversation] - ); const onDeleteConversation = useCallback( ({ isCurrentConversation }: { isCurrentConversation: boolean }) => { @@ -99,7 +60,6 @@ export const RoutedConversationsProvider: React.FC void; invalidateConversation: () => void; addOptimisticRound: ({ userMessage, attachments, + agentId, }: { userMessage: string; attachments?: AttachmentInput[]; + agentId: string; }) => void; removeOptimisticRound: () => void; clearLastRoundResponse: () => void; - setAgentId: (agentId: string) => void; addReasoningStep: ({ step }: { step: ReasoningStep }) => void; addToolCall: ({ step }: { step: ToolCallStep }) => void; setToolCallProgress: ({ @@ -71,13 +65,7 @@ export interface ConversationActions { setTimeToFirstToken: ({ timeToFirstToken }: { timeToFirstToken: number }) => void; addPendingPrompt: ({ prompt }: { prompt: PromptRequest }) => void; clearPendingPrompts: () => void; - onConversationCreated: ({ - conversationId, - title, - }: { - conversationId: string; - title: string; - }) => void; + onConversationCreated: ({ title }: { title: string }) => void; addBackgroundExecutionCompleteStep: ({ step }: { step: BackgroundAgentCompleteStep }) => void; addCompactionStep: ({ tokenCountBefore }: { tokenCountBefore: number }) => void; setCompactionStepComplete: ({ @@ -95,23 +83,16 @@ interface UseConversationActionsParams { conversationId?: string; queryClient: QueryClient; conversationsService: ConversationsService; - onConversationCreated?: (params: { conversationId: string; title: string }) => void; onDeleteConversation?: (params: { id: string; isCurrentConversation: boolean }) => void; } -interface CreateConversationActionsParams extends UseConversationActionsParams { - setAgentIdStorage: (value: string) => void; -} - -const createConversationActions = ({ +export const createConversationActions = ({ conversationId, queryClient, - setAgentIdStorage, conversationsService, - onConversationCreated, onDeleteConversation, -}: CreateConversationActionsParams): ConversationActions => { - const queryKey = queryKeys.conversations.byId(conversationId ?? newConversationId); +}: UseConversationActionsParams): ConversationActions => { + const queryKey = queryKeys.conversations.byId(conversationId ?? ''); const setConversation = (updater: (conversation?: Conversation) => Conversation) => { queryClient.setQueryData(queryKey, updater); }; @@ -127,9 +108,6 @@ const createConversationActions = ({ }; return { - removeNewConversationQuery: () => { - queryClient.removeQueries({ queryKey: queryKeys.conversations.byId(newConversationId) }); - }, invalidateConversation: () => { queryClient.invalidateQueries({ queryKey }); }, @@ -137,10 +115,15 @@ const createConversationActions = ({ addOptimisticRound: ({ userMessage, attachments, + agentId, }: { userMessage: string; attachments?: AttachmentInput[]; + agentId: string; }) => { + if (!conversationId) { + return; + } setConversation( produce((draft) => { const current = queryClient.getQueryData(queryKey); @@ -158,7 +141,7 @@ const createConversationActions = ({ } if (!draft) { - const newConversation = createNewConversation(); + const newConversation = createNewConversation({ id: conversationId, agentId }); newConversation.rounds.push(nextRound); return newConversation; } @@ -181,24 +164,6 @@ const createConversationActions = ({ round.status = ConversationRoundStatus.inProgress; }); }, - setAgentId: (agentId: string) => { - // We allow to change agent only at the start of the conversation - if (conversationId) { - return; - } - setConversation( - produce((draft) => { - if (!draft) { - const newConversation = createNewConversation(); - newConversation.agent_id = agentId; - return newConversation; - } - - draft.agent_id = agentId; - }) - ); - setAgentIdStorage(agentId); - }, addReasoningStep: ({ step }: { step: ReasoningStep }) => { setCurrentRound((round) => { round.steps.push(step); @@ -295,34 +260,15 @@ const createConversationActions = ({ round.status = ConversationRoundStatus.inProgress; }); }, - onConversationCreated: ({ - conversationId: id, - title, - }: { - conversationId: string; - title: string; - }) => { - const current = queryClient.getQueryData(queryKey); - if (!current) { - throw new Error('Conversation not created'); - } - - // Update individual conversation cache (with rounds) - queryClient.setQueryData( - queryKeys.conversations.byId(id), - produce(current, (draft) => { - draft.id = id; - draft.title = title; + onConversationCreated: ({ title }: { title: string }) => { + setConversation( + produce((draft) => { + if (draft) { + draft.title = title; + } }) ); - - // Invalidate conversation list to get updated data from server queryClient.invalidateQueries({ queryKey: queryKeys.conversations.all }); - - // Call provider-specific callback if provided - if (onConversationCreated) { - onConversationCreated({ conversationId: id, title }); - } }, deleteConversation: async (id: string) => { await conversationsService.delete({ conversationId: id }); @@ -363,29 +309,17 @@ export const useConversationActions = ({ conversationId, queryClient, conversationsService, - onConversationCreated, onDeleteConversation, }: UseConversationActionsParams): ConversationActions => { - const [, setAgentIdStorage] = useLocalStorage(storageKeys.agentId); - const conversationActions = useMemo( () => createConversationActions({ conversationId, queryClient, - setAgentIdStorage, conversationsService, - onConversationCreated, onDeleteConversation, }), - [ - conversationId, - queryClient, - setAgentIdStorage, - conversationsService, - onConversationCreated, - onDeleteConversation, - ] + [conversationId, queryClient, conversationsService, onDeleteConversation] ); return conversationActions; diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/send_message_context.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/send_message_context.tsx index fafe7c27e2f57..b0ad90aa7ffd1 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/send_message_context.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/send_message_context.tsx @@ -12,7 +12,7 @@ import { useResumeRoundMutation } from './use_resume_round_mutation'; import { useConnectorSelection } from '../../hooks/chat/use_connector_selection'; interface SendMessageState { - sendMessage: ({ message }: { message: string }) => void; + sendMessage: (params: { message: string; conversationId: string }) => void; isResponseLoading: boolean; pendingMessage: string | undefined; error: unknown; diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_pending_message_state.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_pending_message_state.ts index e972218fb892c..6992e0290c45d 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_pending_message_state.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_pending_message_state.ts @@ -7,7 +7,6 @@ import produce from 'immer'; import { useState } from 'react'; -import { newConversationId } from '../../utils/new_conversation'; interface PendingMessageState { pendingMessage?: string; @@ -18,9 +17,8 @@ export const usePendingMessageState = ({ conversationId }: { conversationId?: st Record >({}); - const id = conversationId ?? newConversationId; - - const updateState = (updater: (c: PendingMessageState) => void) => { + const updateStateFor = (id: string | undefined, updater: (c: PendingMessageState) => void) => { + if (!id) return; setConversationIdToPendingMessageState( produce((draft) => { draft[id] ??= {}; @@ -29,17 +27,19 @@ export const usePendingMessageState = ({ conversationId }: { conversationId?: st ); }; - const pendingMessageState = conversationIdToPendingMessageState[id] ?? {}; + const pendingMessageState = conversationId + ? conversationIdToPendingMessageState[conversationId] ?? {} + : {}; return { pendingMessageState, - setPendingMessage: (pendingMessage: string) => { - updateState((state) => { + setPendingMessage: (pendingMessage: string, id: string | undefined = conversationId) => { + updateStateFor(id, (state) => { state.pendingMessage = pendingMessage; }); }, - removePendingMessage: () => { - updateState((state) => { + removePendingMessage: (id: string | undefined = conversationId) => { + updateStateFor(id, (state) => { delete state.pendingMessage; }); }, diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_resume_round_mutation.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_resume_round_mutation.ts index 1699352a56a98..9b59db5f8bcb0 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_resume_round_mutation.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_resume_round_mutation.ts @@ -66,7 +66,7 @@ export const useResumeRoundMutation = ({ connectorId }: UseResumeRoundMutationPr browserApiTools: browserApiToolsMetadata, }); - return subscribeToChatEvents(events$); + return subscribeToChatEvents(events$, conversationActions); }; const { mutate, isLoading } = useMutation({ diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message_mutation.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message_mutation.ts index 092762873cd6f..f54823f65330f 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message_mutation.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message_mutation.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { useMutation } from '@kbn/react-query'; +import { useMutation, useQueryClient } from '@kbn/react-query'; import { useRef, useState, useMemo, useCallback } from 'react'; import { toToolMetadata } from '@kbn/agent-builder-browser/tools/browser_api_tool'; import { firstValueFrom } from 'rxjs'; @@ -30,6 +30,7 @@ import { mutationKeys } from '../../mutation_keys'; import { usePendingMessageState } from './use_pending_message_state'; import { useSubscribeToChatEvents } from './use_subscribe_to_chat_events'; import { BrowserToolExecutor } from '../../services/browser_tool_executor'; +import { createConversationActions } from '../conversation/use_conversation_actions'; interface UseSendMessageMutationProps { connectorId?: string; @@ -38,6 +39,7 @@ interface UseSendMessageMutationProps { interface SendMessageParams { message?: string; action?: ConversationAction; + conversationId: string; } const SCREEN_CONTEXT_ATTACHMENT_ID = 'screen-context'; @@ -99,19 +101,36 @@ const withScreenContextAttachment = async ({ }; export const useSendMessageMutation = ({ connectorId }: UseSendMessageMutationProps = {}) => { - const { chatService } = useAgentBuilderServices(); + const { chatService, conversationsService } = useAgentBuilderServices(); const { services } = useKibana(); - const { conversationActions, attachments, resetAttachments, browserApiTools } = - useConversationContext(); + const { attachments, resetAttachments, browserApiTools } = useConversationContext(); + const queryClient = useQueryClient(); const [isResponseLoading, setIsResponseLoading] = useState(false); const [agentReasoning, setAgentReasoning] = useState(null); const conversationId = useConversationId(); const { conversation } = useConversation(); - const isMutatingNewConversationRef = useRef(false); const isRegeneratingRef = useRef(false); const agentId = useAgentId(); const messageControllerRef = useRef(null); + // Build actions bound to a specific conversation id. Mutation callbacks pass the id from vars + // so cache writes target the right key regardless of context. + // + // TODO: this per-call factory exists because one `useSendMessageMutation` instance serves + // multiple conversations during its lifetime (the SendMessageProvider doesn't unmount on + // navigation between /conversations/new and /conversations/). When the streaming state + // is lifted to an app-level provider keyed by conversation id (concurrent-streams PR), each + // conversation will own its own actions instance built once, and this builder goes away. + const buildActionsFor = useCallback( + (id: string) => + createConversationActions({ + conversationId: id, + queryClient, + conversationsService, + }), + [queryClient, conversationsService] + ); + const [error, setError] = useState(null); const [errorSteps, setErrorSteps] = useState([]); @@ -141,7 +160,11 @@ export const useSendMessageMutation = ({ connectorId }: UseSendMessageMutationPr browserToolExecutor, }); - const sendMessage = async ({ message, action }: SendMessageParams) => { + const sendMessage = async ({ + message, + action, + conversationId: targetConversationId, + }: SendMessageParams) => { const signal = messageControllerRef.current?.signal; const isRegenerate = action === 'regenerate'; if (!signal) { @@ -149,19 +172,15 @@ export const useSendMessageMutation = ({ connectorId }: UseSendMessageMutationPr } if (isRegenerate) { - if (!conversationId) { - return Promise.reject(new Error('Conversation ID is required to resend')); - } - const events$ = chatService.regenerate({ signal, - conversationId, + conversationId: targetConversationId, agentId, connectorId, browserApiTools: browserApiToolsMetadata, }); - return subscribeToChatEvents(events$); + return subscribeToChatEvents(events$, buildActionsFor(targetConversationId)); } // Normal send: requires a message @@ -177,50 +196,48 @@ export const useSendMessageMutation = ({ connectorId }: UseSendMessageMutationPr const events$ = chatService.chat({ signal, input: message, - conversationId, + conversationId: targetConversationId, agentId, connectorId, attachments: [...(attachments || []), ...contextAttachments], browserApiTools: browserApiToolsMetadata, }); - return subscribeToChatEvents(events$); + return subscribeToChatEvents(events$, buildActionsFor(targetConversationId)); }; const { mutate, isLoading } = useMutation({ mutationKey: mutationKeys.sendMessage, mutationFn: sendMessage, - onMutate: ({ message, action }) => { + onMutate: ({ message, action, conversationId: targetConversationId }) => { const isRegenerate = action === 'regenerate'; removeError(); messageControllerRef.current = new AbortController(); isRegeneratingRef.current = isRegenerate; + const conversationActions = buildActionsFor(targetConversationId); + if (isRegenerate) { // Clear the existing response immediately so UI shows empty state // This must happen before setIsResponseLoading triggers the streaming UI conversationActions.clearLastRoundResponse(); } else if (message) { - const isNewConversation = !conversationId; - isMutatingNewConversationRef.current = isNewConversation; - setPendingMessage(message); + if (!agentId) { + throw new Error('Agent id must be defined to send a message'); + } + setPendingMessage(message, targetConversationId); conversationActions.addOptimisticRound({ userMessage: message, attachments: attachments ?? [], + agentId, }); - if (isNewConversation) { - if (!agentId) { - throw new Error('Agent id must be defined for a new conversation'); - } - conversationActions.setAgentId(agentId); - } } else { throw new Error('Message is required'); } setIsResponseLoading(true); }, - onSettled: () => { - conversationActions.invalidateConversation(); + onSettled: (_data, _err, vars) => { + buildActionsFor(vars.conversationId).invalidateConversation(); messageControllerRef.current = null; setAgentReasoning(null); if (isResponseLoading) { @@ -228,15 +245,12 @@ export const useSendMessageMutation = ({ connectorId }: UseSendMessageMutationPr } isRegeneratingRef.current = false; }, - onSuccess: () => { + onSuccess: (_data, vars) => { if (isRegeneratingRef.current) return; - removePendingMessage(); + removePendingMessage(vars.conversationId); resetAttachments?.(); - if (isMutatingNewConversationRef.current) { - conversationActions.removeNewConversationQuery(); - } }, - onError: (err) => { + onError: (err, vars) => { setError(err); const steps = conversation?.rounds?.at(-1)?.steps; if (steps) { @@ -245,7 +259,7 @@ export const useSendMessageMutation = ({ connectorId }: UseSendMessageMutationPr if (isRegeneratingRef.current) return; // When we error, we should immediately remove the round rather than waiting for a refetch after invalidation // Otherwise, the error round and the optimistic round will be visible together. - conversationActions.removeOptimisticRound(); + buildActionsFor(vars.conversationId).removeOptimisticRound(); }, }); @@ -280,8 +294,11 @@ export const useSendMessageMutation = ({ connectorId }: UseSendMessageMutationPr // If we are in an error state, pending message will be present throw new Error('Pending message is not present'); } + if (!conversationId) { + throw new Error('Cannot retry without a conversation id'); + } - mutate({ message: pendingMessage }); + mutate({ message: pendingMessage, conversationId }); }, canCancel, cancel, @@ -299,7 +316,12 @@ export const useSendMessageMutation = ({ connectorId }: UseSendMessageMutationPr * Regenerate the last conversation round. * Uses the same mutation flow but with action=regenerate. */ - regenerate: () => mutate({ action: 'regenerate' }), + regenerate: () => { + if (!conversationId) { + throw new Error('Cannot regenerate without a conversation id'); + } + mutate({ action: 'regenerate', conversationId }); + }, isRegenerating: isLoading && isRegeneratingRef.current, removeError, }; diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_subscribe_to_chat_events.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_subscribe_to_chat_events.ts index d1889b856a146..806003f09c8d8 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_subscribe_to_chat_events.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_subscribe_to_chat_events.ts @@ -31,6 +31,7 @@ import { finalize, type Observable, type Subscription } from 'rxjs'; import { isBrowserToolCallEvent } from '@kbn/agent-builder-common/chat/events'; import { useRef } from 'react'; import { useConversationContext } from '../conversation/conversation_context'; +import type { ConversationActions } from '../conversation/use_conversation_actions'; import type { BrowserToolExecutor } from '../../services/browser_tool_executor'; export const useSubscribeToChatEvents = ({ @@ -44,7 +45,7 @@ export const useSubscribeToChatEvents = ({ isAborted: () => boolean; browserToolExecutor?: BrowserToolExecutor; }) => { - const { conversationActions, browserApiTools } = useConversationContext(); + const { browserApiTools } = useConversationContext(); const unsubscribedRef = useRef(false); const subscriptionRef = useRef(null); @@ -53,120 +54,122 @@ export const useSubscribeToChatEvents = ({ subscriptionRef.current?.unsubscribe(); }; - const nextChatEvent = (event: ChatEvent) => { - // chunk received, we append it to the chunk buffer - if (isMessageChunkEvent(event)) { - conversationActions.addAssistantMessageChunk({ messageChunk: event.data.text_chunk }); - } - // full message received, override chunk buffer - else if (isMessageCompleteEvent(event)) { - conversationActions.setAssistantMessage({ - assistantMessage: event.data.message_content, - }); - } else if (isToolProgressEvent(event)) { - const isInternalProgress = event.data.metadata?.internal === 'true'; - conversationActions.setToolCallProgress({ - progress: { - message: event.data.message, - metadata: event.data.metadata ?? {}, - }, - toolCallId: event.data.tool_call_id, - }); - // Individual tool progression message should also be displayed as reasoning - // (but skip internal progress messages) - if (!isInternalProgress) { - setAgentReasoning(event.data.message); + const subscribeToChatEvents = ( + events$: Observable, + conversationActions: ConversationActions + ) => { + const nextChatEvent = (event: ChatEvent) => { + // chunk received, we append it to the chunk buffer + if (isMessageChunkEvent(event)) { + conversationActions.addAssistantMessageChunk({ messageChunk: event.data.text_chunk }); } - } else if (isReasoningEvent(event)) { - conversationActions.addReasoningStep({ - step: createReasoningStep({ - reasoning: event.data.reasoning, - transient: event.data.transient, - tool_call_id: event.data.tool_call_id, - tool_call_group_id: event.data.tool_call_group_id, - }), - }); - setAgentReasoning(event.data.reasoning); - } else if (isToolCallEvent(event)) { - conversationActions.addToolCall({ - step: createToolCallStep({ - params: event.data.params, - results: [], - tool_call_id: event.data.tool_call_id, - tool_id: event.data.tool_id, - tool_call_group_id: event.data.tool_call_group_id, - tool_origin: event.data.tool_origin, - }), - }); - } else if (isBrowserToolCallEvent(event)) { - // Check if this is a browser tool call and execute it immediately - const toolId = event.data.tool_id; - if (toolId && browserToolExecutor && browserApiTools) { - const toolDef = browserApiTools.find((tool) => tool.id === toolId); - if (toolDef) { - const toolsMap = new Map([[toolId, toolDef]]); - browserToolExecutor - .executeToolCalls( - [ - { - tool_id: toolId, - call_id: event.data.tool_call_id, - params: event.data.params, - timestamp: Date.now(), - }, - ], - toolsMap - ) - .catch((error) => { - // eslint-disable-next-line no-console - console.error('Failed to execute browser tool:', error); - }); + // full message received, override chunk buffer + else if (isMessageCompleteEvent(event)) { + conversationActions.setAssistantMessage({ + assistantMessage: event.data.message_content, + }); + } else if (isToolProgressEvent(event)) { + const isInternalProgress = event.data.metadata?.internal === 'true'; + conversationActions.setToolCallProgress({ + progress: { + message: event.data.message, + metadata: event.data.metadata ?? {}, + }, + toolCallId: event.data.tool_call_id, + }); + // Individual tool progression message should also be displayed as reasoning + // (but skip internal progress messages) + if (!isInternalProgress) { + setAgentReasoning(event.data.message); } + } else if (isReasoningEvent(event)) { + conversationActions.addReasoningStep({ + step: createReasoningStep({ + reasoning: event.data.reasoning, + transient: event.data.transient, + tool_call_id: event.data.tool_call_id, + tool_call_group_id: event.data.tool_call_group_id, + }), + }); + setAgentReasoning(event.data.reasoning); + } else if (isToolCallEvent(event)) { + conversationActions.addToolCall({ + step: createToolCallStep({ + params: event.data.params, + results: [], + tool_call_id: event.data.tool_call_id, + tool_id: event.data.tool_id, + tool_call_group_id: event.data.tool_call_group_id, + tool_origin: event.data.tool_origin, + }), + }); + } else if (isBrowserToolCallEvent(event)) { + // Check if this is a browser tool call and execute it immediately + const toolId = event.data.tool_id; + if (toolId && browserToolExecutor && browserApiTools) { + const toolDef = browserApiTools.find((tool) => tool.id === toolId); + if (toolDef) { + const toolsMap = new Map([[toolId, toolDef]]); + browserToolExecutor + .executeToolCalls( + [ + { + tool_id: toolId, + call_id: event.data.tool_call_id, + params: event.data.params, + timestamp: Date.now(), + }, + ], + toolsMap + ) + .catch((error) => { + // eslint-disable-next-line no-console + console.error('Failed to execute browser tool:', error); + }); + } + } + } else if (isToolResultEvent(event)) { + const { tool_call_id: toolCallId, results } = event.data; + conversationActions.setToolCallResult({ results, toolCallId }); + } else if (isRoundCompleteEvent(event)) { + // Now we have the full response and can stop the loading indicators + setIsResponseLoading(false); + } else if (isConversationCreatedEvent(event)) { + conversationActions.onConversationCreated({ title: event.data.title }); + } else if (isThinkingCompleteEvent(event)) { + conversationActions.setTimeToFirstToken({ + timeToFirstToken: event.data.time_to_first_token, + }); + } else if (isPromptRequestEvent(event)) { + conversationActions.addPendingPrompt({ + prompt: event.data.prompt, + }); + // Stop loading when a prompt is requested - the round is now awaiting user input + setIsResponseLoading(false); + } else if (isCompactionStartedEvent(event)) { + conversationActions.addCompactionStep({ + tokenCountBefore: event.data.token_count_before, + }); + setAgentReasoning( + i18n.translate('xpack.agentBuilder.chatEvents.compactionStarted', { + defaultMessage: 'Compacting conversation context', + }) + ); + } else if (isCompactionCompletedEvent(event)) { + conversationActions.setCompactionStepComplete({ + tokenCountAfter: event.data.token_count_after, + summarizedRoundCount: event.data.summarized_round_count, + }); + } else if (isBackgroundAgentCompleteEvent(event)) { + conversationActions.addBackgroundExecutionCompleteStep({ + step: { + type: ConversationRoundStepType.backgroundAgentComplete, + ...event.data.execution, + }, + }); } - } else if (isToolResultEvent(event)) { - const { tool_call_id: toolCallId, results } = event.data; - conversationActions.setToolCallResult({ results, toolCallId }); - } else if (isRoundCompleteEvent(event)) { - // Now we have the full response and can stop the loading indicators - setIsResponseLoading(false); - } else if (isConversationCreatedEvent(event)) { - const { conversation_id: id, title } = event.data; - conversationActions.onConversationCreated({ conversationId: id, title }); - } else if (isThinkingCompleteEvent(event)) { - conversationActions.setTimeToFirstToken({ - timeToFirstToken: event.data.time_to_first_token, - }); - } else if (isPromptRequestEvent(event)) { - conversationActions.addPendingPrompt({ - prompt: event.data.prompt, - }); - // Stop loading when a prompt is requested - the round is now awaiting user input - setIsResponseLoading(false); - } else if (isCompactionStartedEvent(event)) { - conversationActions.addCompactionStep({ - tokenCountBefore: event.data.token_count_before, - }); - setAgentReasoning( - i18n.translate('xpack.agentBuilder.chatEvents.compactionStarted', { - defaultMessage: 'Compacting conversation context', - }) - ); - } else if (isCompactionCompletedEvent(event)) { - conversationActions.setCompactionStepComplete({ - tokenCountAfter: event.data.token_count_after, - summarizedRoundCount: event.data.summarized_round_count, - }); - } else if (isBackgroundAgentCompleteEvent(event)) { - conversationActions.addBackgroundExecutionCompleteStep({ - step: { - type: ConversationRoundStepType.backgroundAgentComplete, - ...event.data.execution, - }, - }); - } - }; + }; - const subscribeToChatEvents = (events$: Observable) => { return new Promise((resolve, reject) => { if (unsubscribedRef.current) { resolve(); diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_conversation.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_conversation.ts index 663346cfc8b09..88c382b2832dd 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_conversation.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_conversation.ts @@ -12,7 +12,7 @@ import { agentBuilderDefaultAgentId, ConversationRoundStatus } from '@kbn/agent- import type { IHttpFetchError } from '@kbn/core-http-browser'; import type { ErrorPromptType } from '../components/common/prompt/error_prompt'; import { queryKeys } from '../query_keys'; -import { newConversationId, createNewRound } from '../utils/new_conversation'; +import { createNewRound } from '../utils/new_conversation'; import { useConversationId } from '../context/conversation/use_conversation_id'; import { useIsSendingMessage } from './use_is_sending_message'; import { useAgentBuilderServices } from './use_agent_builder_service'; @@ -24,7 +24,7 @@ import { useConversationContext } from '../context/conversation/conversation_con export const useConversation = () => { const conversationId = useConversationId(); const { conversationsService } = useAgentBuilderServices(); - const queryKey = queryKeys.conversations.byId(conversationId ?? newConversationId); + const queryKey = queryKeys.conversations.byId(conversationId ?? ''); const isSendingMessage = useIsSendingMessage(); const { @@ -36,8 +36,9 @@ export const useConversation = () => { error, } = useQuery({ queryKey, - // Disable query if we are on a new conversation or if there is a message currently being sent - // Otherwise a refetch will overwrite our optimistic updates + // Disable query when there's no conversationId yet (entry "new" state) or while a message is + // streaming (a refetch would overwrite optimistic updates). When the mutation finishes, + // `onSettled` invalidates this query and the gate flips open, triggering the GET. enabled: Boolean(conversationId) && !isSendingMessage, queryFn: () => { if (!conversationId) { diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_initial_message.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_initial_message.ts index fcefe63cc7eb6..1f3908f3bab47 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_initial_message.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_initial_message.ts @@ -8,21 +8,35 @@ import { useEffect } from 'react'; import { useConversationContext } from '../context/conversation/conversation_context'; import { useConversationId } from '../context/conversation/use_conversation_id'; -import { useSendMessage } from '../context/send_message/send_message_context'; +import { useSubmitMessage } from './use_submit_message'; +/** + * Auto-send an initial message when one is provided via location state (workplace_ai_app + * deep-link -> `/conversations/new`) or via embeddable props (host opens with `initialMessage` + + * `autoSendInitialMessage: true`). Only fires when there is no `conversationId` yet — a refresh + * on `/conversations/` shouldn't replay the original message. + * + * Routes through `useSubmitMessage` so deep-link sends take the same UUID-generation + + * navigation path as a user-typed submit. + */ export const useSendPredefinedInitialMessage = () => { const { initialMessage, autoSendInitialMessage, resetInitialMessage } = useConversationContext(); const conversationId = useConversationId(); - const { sendMessage } = useSendMessage(); - + const submitMessage = useSubmitMessage(); const isNewConversation = !conversationId; useEffect(() => { if (initialMessage && isNewConversation && autoSendInitialMessage) { - sendMessage({ message: initialMessage }); + submitMessage(initialMessage); resetInitialMessage?.(); } - }, [initialMessage, autoSendInitialMessage, isNewConversation, sendMessage, resetInitialMessage]); + }, [ + initialMessage, + autoSendInitialMessage, + isNewConversation, + submitMessage, + resetInitialMessage, + ]); return null; }; diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_submit_message.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_submit_message.ts new file mode 100644 index 0000000000000..870921eaf3cf6 --- /dev/null +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_submit_message.ts @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { useCallback } from 'react'; +import { v4 as uuidv4 } from 'uuid'; +import { useConversationContext } from '../context/conversation/conversation_context'; +import { useConversationId } from '../context/conversation/use_conversation_id'; +import { useSendMessage } from '../context/send_message/send_message_context'; +import { useNavigation } from './use_navigation'; +import { appPaths } from '../utils/app_paths'; + +/** + * Single source of truth for "send this message". The conversationId is always passed explicitly + * to the mutation — for an existing conversation it's just the current id, for a new conversation + * it's a freshly minted UUID. The mutation never reads conversationId from context closure. + * + * For new conversations we also need to transition the user to the new id — that means a URL + * navigation in the routed app, or an internal state update in the embeddable. We branch on + * `isEmbeddedContext` rather than asking each provider to expose its own helper. + */ +export const useSubmitMessage = () => { + const conversationId = useConversationId(); + const { sendMessage } = useSendMessage(); + const { isEmbeddedContext, setConversationId, agentId } = useConversationContext(); + const { navigateToAgentBuilderUrl } = useNavigation(); + + return useCallback( + (message: string) => { + const isNew = !conversationId; + const targetId = conversationId ?? uuidv4(); + + sendMessage({ message, conversationId: targetId }); + + if (!isNew) return; + + // navigate only for new conversations, not for continued conversations + if (isEmbeddedContext) { + setConversationId?.(targetId); + } else if (agentId) { + navigateToAgentBuilderUrl( + appPaths.agent.conversations.byId({ agentId, conversationId: targetId }) + ); + } + }, + [ + conversationId, + sendMessage, + isEmbeddedContext, + setConversationId, + agentId, + navigateToAgentBuilderUrl, + ] + ); +}; diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/utils/app_paths.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/utils/app_paths.ts index d6c88ef1f4e43..5475a4bb64c86 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/utils/app_paths.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/utils/app_paths.ts @@ -5,8 +5,6 @@ * 2.0. */ -import { newConversationId } from './new_conversation'; - export const appPaths = { root: '/', @@ -14,8 +12,7 @@ export const appPaths = { agent: { root: ({ agentId }: { agentId: string }) => `/agents/${agentId}`, conversations: { - new: ({ agentId }: { agentId: string }) => - `/agents/${agentId}/conversations/${newConversationId}`, + new: ({ agentId }: { agentId: string }) => `/agents/${agentId}/conversations/new`, byId: ({ agentId, conversationId }: { agentId: string; conversationId: string }) => `/agents/${agentId}/conversations/${conversationId}`, }, diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/utils/new_conversation.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/utils/new_conversation.ts index 80baa9f8d2fb3..0df3b7d3090e1 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/utils/new_conversation.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/utils/new_conversation.ts @@ -10,15 +10,20 @@ import type { ConversationRound, ConversationRoundStep, } from '@kbn/agent-builder-common'; -import { ConversationRoundStatus, agentBuilderDefaultAgentId } from '@kbn/agent-builder-common'; +import { ConversationRoundStatus } from '@kbn/agent-builder-common'; import type { Attachment } from '@kbn/agent-builder-common/attachments'; -export const newConversationId = 'new'; -export const createNewConversation = (): Conversation => { +export const createNewConversation = ({ + id, + agentId, +}: { + id: string; + agentId: string; +}): Conversation => { const now = new Date().toISOString(); return { - id: newConversationId, - agent_id: agentBuilderDefaultAgentId, + id, + agent_id: agentId, user: { id: '', username: '' }, title: '', created_at: now, From 4619e638f563d19cbdf63e7dd81e6cefb6e1cbc6 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 1 May 2026 16:44:41 +0100 Subject: [PATCH 03/17] lift streaming provider to global not just conversation scoped --- .../conversation/embeddable_conversations_provider.tsx | 10 ++++++---- .../shared/agent_builder/public/application/mount.tsx | 5 ++++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/conversation/embeddable_conversations_provider.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/context/conversation/embeddable_conversations_provider.tsx index 5408bcd37cb4b..0174dfad3a57e 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/conversation/embeddable_conversations_provider.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/conversation/embeddable_conversations_provider.tsx @@ -221,10 +221,12 @@ export const EmbeddableConversationsProvider: React.FC - - - {children} - + + + + {children} + + diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/mount.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/mount.tsx index 7deafd155dc83..5dc12ae25404d 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/mount.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/mount.tsx @@ -19,6 +19,7 @@ import type { AgentBuilderStartDependencies } from '../types'; import { AgentBuilderServicesContext } from './context/agent_builder_services_context'; import { PageWrapper } from './page_wrapper'; import { AppLeaveContext, type OnAppLeave } from './context/app_leave_context'; +import { SendMessageProvider } from './context/send_message/send_message_context'; export const mountApp = async ({ core, @@ -52,7 +53,9 @@ export const mountApp = async ({ - + + + From aab405cb80427e2256483592628b554e81dc9ee6 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 1 May 2026 16:47:34 +0100 Subject: [PATCH 04/17] fix error handling from sidebar and everywhere --- .../conversations/conversations_view.tsx | 36 +++++-------------- .../agents_popover_view.tsx | 6 ++-- .../conversations_popover_view.tsx | 6 ++-- .../embeddable_conversation_list.tsx | 6 ++-- .../views/conversation_view/index.tsx | 9 +++++ 5 files changed, 26 insertions(+), 37 deletions(-) diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversations_view.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversations_view.tsx index ed152159d4301..5f0caaee6b76b 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversations_view.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversations_view.tsx @@ -7,29 +7,12 @@ import { useEuiTheme } from '@elastic/eui'; import { css } from '@emotion/react'; -import React, { useEffect } from 'react'; -import { useLocation } from 'react-router-dom'; +import React from 'react'; import { Conversation } from './conversation'; import { ConversationHeader } from './conversation_header/conversation_header'; import { RoutedConversationsProvider } from '../../context/conversation/routed_conversations_provider'; -import { - SendMessageProvider, - useSendMessage, -} from '../../context/send_message/send_message_context'; import { conversationBackgroundStyles, headerHeight } from './conversation.styles'; -// Clears error state on every navigation. Rendered inside SendMessageProvider (for context access) -// and inside the Router (for useLocation access), so it's intentionally placed in the routed view -// only — the embeddable/sidebar context has no navigation and doesn't need this behavior. -const LocationErrorClearer: React.FC<{}> = () => { - const { key: locationKey } = useLocation(); - const { removeError } = useSendMessage(); - useEffect(() => { - removeError(); - }, [locationKey, removeError]); - return null; -}; - export const AgentBuilderConversationsView: React.FC<{}> = () => { const { euiTheme } = useEuiTheme(); @@ -60,17 +43,14 @@ export const AgentBuilderConversationsView: React.FC<{}> = () => { return ( - - -
-
- -
-
- -
+
+
+ +
+
+
- +
); }; diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/embeddable_conversation_header/agents_popover_view.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/embeddable_conversation_header/agents_popover_view.tsx index 8a9951c59034b..973c056b50fa8 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/embeddable_conversation_header/agents_popover_view.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/embeddable_conversation_header/agents_popover_view.tsx @@ -20,7 +20,7 @@ import { css } from '@emotion/react'; import { i18n } from '@kbn/i18n'; import type { AgentDefinition } from '@kbn/agent-builder-common'; import { useConversationContext } from '../../../context/conversation/conversation_context'; -import { useSendMessage } from '../../../context/send_message/send_message_context'; +import { useSendMessageContext } from '../../../context/send_message/send_message_context'; import { useAgentBuilderAgents } from '../../../hooks/agents/use_agents'; import { useNavigation } from '../../../hooks/use_navigation'; import { appPaths } from '../../../utils/app_paths'; @@ -56,7 +56,7 @@ export const AgentsPopoverView: React.FC = ({ }) => { const { euiTheme } = useEuiTheme(); const { agentId, setAgentId } = useConversationContext(); - const { removeError } = useSendMessage(); + const { removeAllErrors } = useSendMessageContext(); const { agents } = useAgentBuilderAgents(); const { agentOptions, renderAgentOption } = useAgentOptions({ agents, selectedAgentId: agentId }); @@ -79,7 +79,7 @@ export const AgentsPopoverView: React.FC = ({ ) => { const { checked, key: newAgentId } = changedOption; if (checked === 'on' && newAgentId) { - removeError(); + removeAllErrors(); setAgentId?.(newAgentId); onClose(); } diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/embeddable_conversation_header/conversations_popover_view.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/embeddable_conversation_header/conversations_popover_view.tsx index 1c0112495351b..355f04b75dc41 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/embeddable_conversation_header/conversations_popover_view.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/embeddable_conversation_header/conversations_popover_view.tsx @@ -19,7 +19,7 @@ import { import { css } from '@emotion/react'; import { i18n } from '@kbn/i18n'; import { useConversationContext } from '../../../context/conversation/conversation_context'; -import { useSendMessage } from '../../../context/send_message/send_message_context'; +import { useSendMessageContext } from '../../../context/send_message/send_message_context'; import { useAgentBuilderAgents } from '../../../hooks/agents/use_agents'; import { useAgentId } from '../../../hooks/use_conversation'; import { AgentAvatar } from '../../common/agent_avatar'; @@ -56,14 +56,14 @@ export const ConversationsPopoverView: React.FC = const { euiTheme } = useEuiTheme(); const { setConversationId } = useConversationContext(); - const { removeError } = useSendMessage(); + const { removeAllErrors } = useSendMessageContext(); const { agents } = useAgentBuilderAgents(); const agentId = useAgentId(); const currentAgent = agents.find((a) => a.id === agentId); const handleNewChat = () => { - removeError(); + removeAllErrors(); setConversationId?.(undefined); onClose(); }; diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/embeddable_conversation_header/embeddable_conversation_list.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/embeddable_conversation_header/embeddable_conversation_list.tsx index b56f89dd64819..6b8f18db179b6 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/embeddable_conversation_header/embeddable_conversation_list.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/embeddable_conversation_header/embeddable_conversation_list.tsx @@ -15,7 +15,7 @@ import { } from '@elastic/eui'; import { css } from '@emotion/react'; import { useConversationContext } from '../../../context/conversation/conversation_context'; -import { useSendMessage } from '../../../context/send_message/send_message_context'; +import { useSendMessageContext } from '../../../context/send_message/send_message_context'; import { useConversationList } from '../../../hooks/use_conversation_list'; import { createConversationListItemStyles, @@ -34,7 +34,7 @@ export const EmbeddableConversationList: React.FC { const { euiTheme } = useEuiTheme(); const { agentId, conversationId, setConversationId } = useConversationContext(); - const { removeError } = useSendMessage(); + const { removeAllErrors } = useSendMessageContext(); const { conversations = [], isLoading } = useConversationList({ agentId }); const sortedConversations = useMemo( @@ -83,7 +83,7 @@ export const EmbeddableConversationList: React.FC { - removeError(); + removeAllErrors(); setConversationId?.(conversation.id); onClose(); }} diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/layout/unified_sidebar/views/conversation_view/index.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/layout/unified_sidebar/views/conversation_view/index.tsx index 5eb02c79a4949..a94cf367c9c16 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/layout/unified_sidebar/views/conversation_view/index.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/layout/unified_sidebar/views/conversation_view/index.tsx @@ -34,6 +34,7 @@ import { useValidateAgentId } from '../../../../../hooks/agents/use_validate_age import { useAgentBuilderAgents } from '../../../../../hooks/agents/use_agents'; import { useLastAgentId } from '../../../../../hooks/use_last_agent_id'; import { useConversationList } from '../../../../../hooks/use_conversation_list'; +import { useSendMessageContext } from '../../../../../context/send_message/send_message_context'; import { SidebarNavList } from '../../shared/sidebar_nav_list'; import { ConversationFooter } from './conversation_footer'; @@ -80,6 +81,7 @@ export const ConversationSidebarView: React.FC = () => { const { conversations = [] } = useConversationList({ agentId }); const hasConversations = conversations.length > 0; + const { removeAllErrors } = useSendMessageContext(); const isNewConversationRoute = conversationId === 'new' || pathname === appPaths.agent.root({ agentId }); @@ -127,9 +129,14 @@ export const ConversationSidebarView: React.FC = () => { ]); const handlePressNewConversation = () => { + removeAllErrors(); navigateToAgentBuilderUrl(appPaths.agent.conversations.new({ agentId })); }; + const handleConversationItemClick = () => { + removeAllErrors(); + }; + return ( { agentId={agentId} currentConversationId={conversationId} isNewConversationRoute={isNewConversationRoute} + onItemClick={handleConversationItemClick} /> @@ -253,6 +261,7 @@ export const ConversationSidebarView: React.FC = () => { currentConversationId={conversationId} onClose={() => setIsSearchModalOpen(false)} onSelectConversation={(id) => { + removeAllErrors(); navigateToAgentBuilderUrl( appPaths.agent.conversations.byId({ agentId, conversationId: id }) ); From 66f547a3b927d2fdc56e732fcd66c643c94929dd Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 1 May 2026 16:49:15 +0100 Subject: [PATCH 05/17] introduce and name properly is any conversation streaming --- .../components/conversations/conversation.tsx | 5 ++++- .../conversation_input/conversation_input.tsx | 6 ++++-- .../conversation_rounds/round_layout.tsx | 8 ++++++++ ...essage.ts => use_is_any_conversation_streaming.ts} | 11 ++++++++++- .../public/embeddable/embeddable_welcome_message.tsx | 8 ++++---- 5 files changed, 30 insertions(+), 8 deletions(-) rename x-pack/platform/plugins/shared/agent_builder/public/application/hooks/{use_is_sending_message.ts => use_is_any_conversation_streaming.ts} (52%) diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation.tsx index 2c20ec10a54ca..e95fe56bfe1f8 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation.tsx @@ -26,6 +26,7 @@ import { NewConversationPrompt } from './new_conversation_prompt'; import { useConversationId } from '../../context/conversation/use_conversation_id'; import { useShouldStickToBottom } from '../../context/conversation/use_should_stick_to_bottom'; import { useSendMessage } from '../../context/send_message/send_message_context'; +import { useIsAnyConversationStreaming } from '../../hooks/use_is_any_conversation_streaming'; import { useConversationScrollActions } from '../../hooks/use_conversation_scroll_actions'; import { useConversationStatus } from '../../hooks/use_conversation'; import { useSendPredefinedInitialMessage } from '../../hooks/use_initial_message'; @@ -53,6 +54,7 @@ export const Conversation: React.FC<{}> = () => { const conversationId = useConversationId(); const hasActiveConversation = useHasActiveConversation(); const { isResponseLoading } = useSendMessage(); + const isAnyStreaming = useIsAnyConversationStreaming(); const conversationRounds = useConversationRounds(); const lastRound = conversationRounds.at(-1); const { isFetched } = useConversationStatus(); @@ -65,9 +67,10 @@ export const Conversation: React.FC<{}> = () => { const [dismissStaleAttachments, setDismissStaleAttachments] = useState(false); useSendPredefinedInitialMessage(); + // Page-leave guard fires for any in-flight stream, not just this conversation's. useNavigationAbort({ onAppLeave, - isResponseLoading, + isResponseLoading: isAnyStreaming, }); const scrollContainerRef = useRef(null); diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/conversation_input.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/conversation_input.tsx index 8054371d7a687..d9c910e2136f1 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/conversation_input.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/conversation_input.tsx @@ -21,7 +21,9 @@ import { useSendMessage } from '../../../context/send_message/send_message_conte import { useSubmitMessage } from '../../../hooks/use_submit_message'; import { useAgentBuilderAgents } from '../../../hooks/agents/use_agents'; import { useValidateAgentId } from '../../../hooks/agents/use_validate_agent_id'; -import { useIsSendingMessage } from '../../../hooks/use_is_sending_message'; +// Submit is gated globally on any-conversation streaming until concurrent streams are +// unblocked in a future PR — at which point it becomes a per-conversation check. +import { useIsAnyConversationStreaming } from '../../../hooks/use_is_any_conversation_streaming'; import { useAgentId, useConversationTitle, @@ -145,7 +147,7 @@ export const ConversationInput: React.FC = ({ onSubmit, onEditorFocus, }) => { - const isSendingMessage = useIsSendingMessage(); + const isSendingMessage = useIsAnyConversationStreaming(); const { pendingMessage, error, isResuming } = useSendMessage(); const { isFetched } = useAgentBuilderAgents(); const agentId = useAgentId(); diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_layout.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_layout.tsx index d0f73ccda9b09..5b24f212cd2e1 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_layout.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_layout.tsx @@ -21,6 +21,7 @@ import { RoundInput } from './round_input'; import { RoundThinking } from './round_thinking/round_thinking'; import { RoundResponse } from './round_response/round_response'; import { useSendMessage } from '../../../context/send_message/send_message_context'; +import { useIsAnyConversationStreaming } from '../../../hooks/use_is_any_conversation_streaming'; import { RoundError } from './round_error/round_error'; import { ConfirmationPrompt } from './round_prompt'; import { RoundAttachmentReferences } from './round_attachment_references'; @@ -88,6 +89,12 @@ export const RoundLayout: React.FC = ({ resumeRound, isResuming, } = useSendMessage(); + // Approve / Cancel for HITL must be gated on global streaming state: while ANY other + // conversation is streaming, racing two mutations against the same single-stream + // backend would corrupt cache state. This becomes a per-conversation check in the + // concurrent-streams follow-up PR. + const isAnyStreaming = useIsAnyConversationStreaming(); + const isHitlDisabled = isAnyStreaming && !isResuming; const isLoadingCurrentRound = isResponseLoading && isCurrentRound; const isErrorCurrentRound = Boolean(error) && isCurrentRound; @@ -193,6 +200,7 @@ export const RoundLayout: React.FC = ({ onConfirm={() => handlePromptResponse(prompt.id, true)} onCancel={() => handlePromptResponse(prompt.id, false)} isLoading={isResuming} + isDisabled={isHitlDisabled} isAnswered={promptResponses[prompt.id] !== undefined} answeredValue={promptResponses[prompt.id]?.allow} /> diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_is_sending_message.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_is_any_conversation_streaming.ts similarity index 52% rename from x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_is_sending_message.ts rename to x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_is_any_conversation_streaming.ts index 82aa264d55d6d..720b284b1c0d0 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_is_sending_message.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_is_any_conversation_streaming.ts @@ -8,7 +8,16 @@ import { useIsMutating } from '@kbn/react-query'; import { mutationKeys } from '../mutation_keys'; -export const useIsSendingMessage = () => { +/** + * Returns true while ANY send/resume mutation is in flight, anywhere in the app. + * + * Single-stream-at-a-time is a current product constraint, so the global gates + * (HITL Approve, submit button, page-leave guard) all read this. When concurrent + * streams are unblocked in a follow-up PR, those gates become per-conversation + * checks and most callers of this hook go away — but the page-leave guard will + * still want a global "is anything in flight?" answer. + */ +export const useIsAnyConversationStreaming = () => { const numSending = useIsMutating({ mutationKey: mutationKeys.sendMessage, fetching: true }); const numResuming = useIsMutating({ mutationKey: mutationKeys.resumeRound, fetching: true }); return numSending > 0 || numResuming > 0; diff --git a/x-pack/platform/plugins/shared/agent_builder/public/embeddable/embeddable_welcome_message.tsx b/x-pack/platform/plugins/shared/agent_builder/public/embeddable/embeddable_welcome_message.tsx index 542d86684df81..a3d86a36c6af1 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/embeddable/embeddable_welcome_message.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/embeddable/embeddable_welcome_message.tsx @@ -12,7 +12,7 @@ import { css } from '@emotion/react'; import { useKibana } from '../application/hooks/use_kibana'; import { useAgentBuilderServices } from '../application/hooks/use_agent_builder_service'; import { useConversationList } from '../application/hooks/use_conversation_list'; -import { useSendMessage } from '../application/context/send_message/send_message_context'; +import { useIsAnyConversationStreaming } from '../application/hooks/use_is_any_conversation_streaming'; import { useHasConnectorsAllPrivileges } from '../application/hooks/use_has_connectors_all_privileges'; import { storageKeys } from '../application/storage_keys'; @@ -33,14 +33,14 @@ export const EmbeddableWelcomeMessage = () => { setShowCallOut(false); }; - const { isResponseLoading } = useSendMessage(); + const isAnyStreaming = useIsAnyConversationStreaming(); // Dismiss the welcome message automatically when a message has been sent useEffect(() => { - if (isResponseLoading) { + if (isAnyStreaming) { onDismiss(); } - }, [isResponseLoading]); + }, [isAnyStreaming]); const { conversations = [], isLoading } = useConversationList(); const hasNoConversations = isLoading === false && conversations.length === 0; From f6ec749b05cfeacd5aead252fcb97a158416d296 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 1 May 2026 17:01:20 +0100 Subject: [PATCH 06/17] fix streaming text --- .../conversation_rounds/round_response/streaming_text.tsx | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_response/streaming_text.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_response/streaming_text.tsx index b0199b217e185..402408652fe3e 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_response/streaming_text.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_response/streaming_text.tsx @@ -31,10 +31,12 @@ export const StreamingText = ({ attachmentRefs, conversationId, }: StreamingTextProps) => { - const [displayedText, setDisplayedText] = useState(''); + // Initial state derives from the content already in the cache so navigating away and back + // mid-stream doesn't replay the full text. Only chunks arriving AFTER mount get animated. + const [displayedText, setDisplayedText] = useState(content); const tokenQueueRef = useRef([]); const intervalRef = useRef(null); - const previousContentLengthRef = useRef(0); + const previousContentLengthRef = useRef(content.length); useEffect(() => { const previousContentLength = previousContentLengthRef.current; From e563894a3d6c84d7ddee072875a9776eef38245a Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 1 May 2026 17:12:34 +0100 Subject: [PATCH 07/17] remove connector logic from stream provider --- .../connector_selector.test.tsx | 42 +++++++++---------- .../connector_selector/connector_selector.tsx | 14 +++---- 2 files changed, 26 insertions(+), 30 deletions(-) diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/input_actions/connector_selector/connector_selector.test.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/input_actions/connector_selector/connector_selector.test.tsx index 2cc6bc00cce9a..06d770684399e 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/input_actions/connector_selector/connector_selector.test.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/input_actions/connector_selector/connector_selector.test.tsx @@ -18,8 +18,8 @@ jest.mock('../../../../../hooks/use_kibana', () => ({ useKibana: jest.fn(), })); -jest.mock('../../../../../context/send_message/send_message_context', () => ({ - useSendMessage: jest.fn(), +jest.mock('../../../../../hooks/chat/use_connector_selection', () => ({ + useConnectorSelection: jest.fn(), })); jest.mock('../../../../../hooks/chat/use_default_connector', () => ({ @@ -74,13 +74,15 @@ jest.mock('./connector_icon', () => ({ import { useLoadConnectors } from '@kbn/inference-connectors'; import { useKibana } from '../../../../../hooks/use_kibana'; -import { useSendMessage } from '../../../../../context/send_message/send_message_context'; +import { useConnectorSelection } from '../../../../../hooks/chat/use_connector_selection'; import { useDefaultConnector } from '../../../../../hooks/chat/use_default_connector'; import { ConnectorSelector } from './connector_selector'; const mockUseLoadConnectors = useLoadConnectors as jest.MockedFunction; const mockUseKibana = useKibana as jest.MockedFunction; -const mockUseSendMessage = useSendMessage as jest.MockedFunction; +const mockUseConnectorSelection = useConnectorSelection as jest.MockedFunction< + typeof useConnectorSelection +>; const mockUseDefaultConnector = useDefaultConnector as jest.MockedFunction< typeof useDefaultConnector >; @@ -137,14 +139,12 @@ const setup = ({ const selectConnector = jest.fn(); - mockUseSendMessage.mockReturnValue({ - connectorSelection: { - selectedConnector, - selectConnector, - defaultConnectorId, - defaultConnectorOnly, - }, - } as any); + mockUseConnectorSelection.mockReturnValue({ + selectedConnector, + selectConnector, + defaultConnectorId, + defaultConnectorOnly, + }); const utils = render( @@ -154,17 +154,15 @@ const setup = ({ return { ...utils, selectConnector, - // Helper to re-render with a new send-message context (simulates admin changing a setting). + // Helper to re-render with a new connector selection (simulates admin changing a setting). updateContext: (next: Partial) => { - mockUseSendMessage.mockReturnValue({ - connectorSelection: { - selectedConnector: next.selectedConnector ?? selectedConnector, - selectConnector, - defaultConnectorId: - 'defaultConnectorId' in next ? next.defaultConnectorId : defaultConnectorId, - defaultConnectorOnly: next.defaultConnectorOnly ?? defaultConnectorOnly, - }, - } as any); + mockUseConnectorSelection.mockReturnValue({ + selectedConnector: next.selectedConnector ?? selectedConnector, + selectConnector, + defaultConnectorId: + 'defaultConnectorId' in next ? next.defaultConnectorId : defaultConnectorId, + defaultConnectorOnly: next.defaultConnectorOnly ?? defaultConnectorOnly, + }); act(() => { utils.rerender( diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/input_actions/connector_selector/connector_selector.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/input_actions/connector_selector/connector_selector.tsx index 057a9f2a7c5aa..ca88da0923cb7 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/input_actions/connector_selector/connector_selector.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/input_actions/connector_selector/connector_selector.tsx @@ -21,7 +21,7 @@ import { FormattedMessage } from '@kbn/i18n-react'; import React, { useEffect, useMemo, useRef, useState } from 'react'; import { useUiPrivileges } from '../../../../../hooks/use_ui_privileges'; import { useNavigation } from '../../../../../hooks/use_navigation'; -import { useSendMessage } from '../../../../../context/send_message/send_message_context'; +import { useConnectorSelection } from '../../../../../hooks/chat/use_connector_selection'; import { useDefaultConnector } from '../../../../../hooks/chat/use_default_connector'; import { useKibana } from '../../../../../hooks/use_kibana'; import { @@ -165,13 +165,11 @@ export const ConnectorSelector: React.FC<{}> = () => { services: { http, settings }, } = useKibana(); const { - connectorSelection: { - selectConnector: onSelectConnector, - selectedConnector: selectedConnectorId, - defaultConnectorId, - defaultConnectorOnly, - }, - } = useSendMessage(); + selectConnector: onSelectConnector, + selectedConnector: selectedConnectorId, + defaultConnectorId, + defaultConnectorOnly, + } = useConnectorSelection(); const [isPopoverOpen, setIsPopoverOpen] = useState(false); const { data: aiConnectors, isLoading } = useLoadConnectors({ From 27fc300c23afe28824a84ab02384ed6b3687b5c9 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 1 May 2026 17:13:32 +0100 Subject: [PATCH 08/17] create global stream provider --- .../send_message/send_message_context.tsx | 274 ++++++++++++------ .../application/context/send_message/types.ts | 22 ++ .../send_message/use_pending_message_state.ts | 47 --- 3 files changed, 213 insertions(+), 130 deletions(-) create mode 100644 x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/types.ts delete mode 100644 x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_pending_message_state.ts diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/send_message_context.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/send_message_context.tsx index b0ad90aa7ffd1..61f3820642f32 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/send_message_context.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/send_message_context.tsx @@ -5,103 +5,211 @@ * 2.0. */ -import React, { createContext, useContext } from 'react'; +/** + * Lifted streaming state. + * + * `SendMessageProvider` is mounted ONCE above the routes/sidebar (in `mount.tsx` for the + * routed app, in `embeddable_conversations_provider.tsx` for the embeddable). All streaming + * state lives here so the sidebar can observe it. + * + * State: + * - `activeStream`: which conversation is currently streaming, the stream type + * (send / regenerate / resume), and the latest agent-reasoning text. Set + * synchronously when each mutation kicks off; cleared in the mutation's `finally`. + * - `byConversationId`: per-conversation pending message, error, and errorSteps. + * Persists across stream end so a user can hit Retry after a failure. + */ + +import React, { createContext, useCallback, useContext, useMemo, useState } from 'react'; +import produce from 'immer'; import type { ConversationRoundStep } from '@kbn/agent-builder-common'; import { useSendMessageMutation } from './use_send_message_mutation'; +import type { SendMessageVars } from './use_send_message_mutation'; import { useResumeRoundMutation } from './use_resume_round_mutation'; -import { useConnectorSelection } from '../../hooks/chat/use_connector_selection'; - -interface SendMessageState { - sendMessage: (params: { message: string; conversationId: string }) => void; - isResponseLoading: boolean; - pendingMessage: string | undefined; - error: unknown; - errorSteps: ConversationRoundStep[]; - agentReasoning: string | null; - retry: () => void; - canCancel: boolean; - cancel: () => void; - cleanConversation: () => void; - removeError: () => void; - resumeRound: (opts: { prompts: Record }) => void; - isResuming: boolean; - regenerate: () => void; - isRegenerating: boolean; - connectorSelection: { - selectedConnector: string | undefined; - selectConnector: (connectorId: string) => void; - defaultConnectorId?: string; - defaultConnectorOnly: boolean; - }; +import type { ResumeRoundVars } from './use_resume_round_mutation'; +import type { ActiveStream, StreamRecord } from './types'; + +export interface SendMessageContextValue { + activeStream: ActiveStream | undefined; + byConversationId: Record; + mutateSendMessage: (vars: SendMessageVars) => void; + mutateResumeRound: (vars: ResumeRoundVars) => void; + cancelActiveStream: () => void; + cleanConversation: (params: { conversationId?: string; hasError: boolean }) => void; + removeError: (conversationId: string) => void; + removeAllErrors: () => void; } -const SendMessageContext = createContext(null); +const SendMessageContext = createContext(null); + +const emptyRecord: StreamRecord = { errorSteps: [] }; export const SendMessageProvider = ({ children }: { children: React.ReactNode }) => { - const connectorSelection = useConnectorSelection(); - - const { - sendMessage, - isResponseLoading, - pendingMessage, - error, - errorSteps, - agentReasoning: sendAgentReasoning, - retry, - canCancel, - cancel, - cleanConversation, - removeError, - regenerate, - isRegenerating, - } = useSendMessageMutation({ connectorId: connectorSelection.selectedConnector }); - - const { - resumeRound, - isResuming, - agentReasoning: resumeAgentReasoning, - } = useResumeRoundMutation({ - connectorId: connectorSelection.selectedConnector, + const [activeStream, setActiveStream] = useState(undefined); + const [byConversationId, setByConversationId] = useState>({}); + + const updateActiveReasoning = useCallback((reasoning: string) => { + setActiveStream((prev) => (prev ? { ...prev, agentReasoning: reasoning } : prev)); + }, []); + + const setPendingMessage = useCallback((conversationId: string, message: string) => { + setByConversationId( + produce((draft) => { + if (!draft[conversationId]) draft[conversationId] = { errorSteps: [] }; + draft[conversationId].pendingMessage = message; + }) + ); + }, []); + + const clearPendingMessage = useCallback((conversationId: string) => { + setByConversationId( + produce((draft) => { + if (draft[conversationId]) { + delete draft[conversationId].pendingMessage; + } + }) + ); + }, []); + + const setError = useCallback( + (conversationId: string, error: unknown, errorSteps: ConversationRoundStep[]) => { + setByConversationId( + produce((draft) => { + if (!draft[conversationId]) draft[conversationId] = { errorSteps: [] }; + draft[conversationId].error = error; + draft[conversationId].errorSteps = errorSteps; + }) + ); + }, + [] + ); + + const removeError = useCallback((conversationId: string) => { + setByConversationId( + produce((draft) => { + const record = draft[conversationId]; + if (record) { + delete record.error; + record.errorSteps = []; + } + }) + ); + }, []); + + const removeAllErrors = useCallback(() => { + setByConversationId( + produce((draft) => { + for (const id of Object.keys(draft)) { + delete draft[id].error; + draft[id].errorSteps = []; + } + }) + ); + }, []); + + const clearActiveStream = useCallback(() => { + setActiveStream(undefined); + }, []); + + const sendMutation = useSendMessageMutation({ + updateActiveReasoning, + setPendingMessage, + clearPendingMessage, + setError, + clearActiveStream, + }); + + const resumeMutation = useResumeRoundMutation({ + updateActiveReasoning, + setError, + clearActiveStream, }); - // Combine agentReasoning from mutations - use the one that's currently active - const agentReasoning = isResuming ? resumeAgentReasoning : sendAgentReasoning; - - return ( - - {children} - + // Pull stable references out of the mutation result objects. The result object itself is + // a NEW reference each render (React Query rebuilds it), so anything that depends on the + // whole object would re-evaluate every render. The individual fields below are stable. + const sendMutate = sendMutation.mutate; + const sendCancel = sendMutation.cancel; + const sendCleanConversation = sendMutation.cleanConversation; + const resumeMutate = resumeMutation.mutate; + const resumeCancel = resumeMutation.cancel; + + const cancelActiveStream = useCallback(() => { + sendCancel(); + resumeCancel(); + }, [sendCancel, resumeCancel]); + + // Wrappers around `mutate` that set `activeStream` SYNCHRONOUSLY before queueing the + // mutation. Without this, callers like `useSubmitMessage` (which call `mutate` and then + // immediately navigate to `/conversations/`) would render the new URL with + // `activeStream` still undefined — the `useConversation` gate would open, fire a GET + // for the not-yet-persisted conversation, and 404. The mutation's `mutationFn` runs + // asynchronously, so setting `activeStream` from inside `mutationFn` is too late. + const mutateSendMessage = useCallback( + (vars: SendMessageVars) => { + setActiveStream({ + conversationId: vars.conversationId, + type: vars.action === 'regenerate' ? 'regenerate' : 'send', + agentReasoning: null, + }); + sendMutate(vars); + }, + [sendMutate] + ); + const mutateResumeRound = useCallback( + (vars: ResumeRoundVars) => { + setActiveStream({ + conversationId: vars.conversationId, + type: 'resume', + agentReasoning: null, + }); + resumeMutate(vars); + }, + [resumeMutate] + ); + + const value = useMemo( + () => ({ + activeStream, + byConversationId, + mutateSendMessage, + mutateResumeRound, + cancelActiveStream, + cleanConversation: sendCleanConversation, + removeError, + removeAllErrors, + }), + [ + activeStream, + byConversationId, + mutateSendMessage, + mutateResumeRound, + cancelActiveStream, + sendCleanConversation, + removeError, + removeAllErrors, + ] ); + + return {children}; }; -export const useSendMessage = () => { +export const useSendMessageContext = () => { const context = useContext(SendMessageContext); if (!context) { - throw new Error('useSendMessage must be used within a SendMessageProvider'); + throw new Error('useSendMessageContext must be used within a SendMessageProvider'); } return context; }; + +export const useStreamRecord = (conversationId: string | undefined): StreamRecord => { + const { byConversationId } = useSendMessageContext(); + if (!conversationId) return emptyRecord; + return byConversationId[conversationId] ?? emptyRecord; +}; + +// Re-exported so consumers can keep importing `useSendMessage` from this file. The actual +// implementation lives in `./use_send_message` to avoid a circular import with +// `use_conversation.ts` (the per-conversation scoped hook reads from `useConversation()`, +// while `useConversation()` reads from `useSendMessageContext` here). +export { useSendMessage } from './use_send_message'; diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/types.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/types.ts new file mode 100644 index 0000000000000..ae2d253e93585 --- /dev/null +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/types.ts @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ConversationRoundStep } from '@kbn/agent-builder-common'; + +export type StreamType = 'send' | 'regenerate' | 'resume'; + +export interface ActiveStream { + conversationId: string; + type: StreamType; + agentReasoning: string | null; +} + +export interface StreamRecord { + pendingMessage?: string; + error?: unknown; + errorSteps: ConversationRoundStep[]; +} diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_pending_message_state.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_pending_message_state.ts deleted file mode 100644 index 6992e0290c45d..0000000000000 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_pending_message_state.ts +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import produce from 'immer'; -import { useState } from 'react'; - -interface PendingMessageState { - pendingMessage?: string; -} - -export const usePendingMessageState = ({ conversationId }: { conversationId?: string }) => { - const [conversationIdToPendingMessageState, setConversationIdToPendingMessageState] = useState< - Record - >({}); - - const updateStateFor = (id: string | undefined, updater: (c: PendingMessageState) => void) => { - if (!id) return; - setConversationIdToPendingMessageState( - produce((draft) => { - draft[id] ??= {}; - updater(draft[id]); - }) - ); - }; - - const pendingMessageState = conversationId - ? conversationIdToPendingMessageState[conversationId] ?? {} - : {}; - - return { - pendingMessageState, - setPendingMessage: (pendingMessage: string, id: string | undefined = conversationId) => { - updateStateFor(id, (state) => { - state.pendingMessage = pendingMessage; - }); - }, - removePendingMessage: (id: string | undefined = conversationId) => { - updateStateFor(id, (state) => { - delete state.pendingMessage; - }); - }, - }; -}; From 604ee61fd6317e95059676c1392788901383ae16 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 1 May 2026 17:18:24 +0100 Subject: [PATCH 09/17] stream mutations --- .../send_message/use_resume_round_mutation.ts | 141 ++++--- .../context/send_message/use_send_message.ts | 196 ++++++++++ .../send_message/use_send_message_mutation.ts | 361 ++++++++---------- 3 files changed, 426 insertions(+), 272 deletions(-) create mode 100644 x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message.ts diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_resume_round_mutation.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_resume_round_mutation.ts index 9b59db5f8bcb0..daee6238637e5 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_resume_round_mutation.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_resume_round_mutation.ts @@ -5,91 +5,108 @@ * 2.0. */ -import { useMutation } from '@kbn/react-query'; -import { useRef, useState, useMemo } from 'react'; +import { useMutation, useQueryClient } from '@kbn/react-query'; +import { useCallback, useMemo, useRef } from 'react'; import { toToolMetadata } from '@kbn/agent-builder-browser/tools/browser_api_tool'; +import type { BrowserApiToolDefinition } from '@kbn/agent-builder-browser/tools/browser_api_tool'; import { useKibana } from '@kbn/kibana-react-plugin/public'; -import { useAgentId } from '../../hooks/use_conversation'; -import { useConversationContext } from '../conversation/conversation_context'; -import { useConversationId } from '../conversation/use_conversation_id'; +import type { ConversationRoundStep } from '@kbn/agent-builder-common'; import { useAgentBuilderServices } from '../../hooks/use_agent_builder_service'; import { mutationKeys } from '../../mutation_keys'; -import { useSubscribeToChatEvents } from './use_subscribe_to_chat_events'; +import { subscribeToChatEvents } from './use_subscribe_to_chat_events'; import { BrowserToolExecutor } from '../../services/browser_tool_executor'; +import { createConversationActions } from '../conversation/use_conversation_actions'; -interface UseResumeRoundMutationProps { +export interface ResumeRoundVars { + prompts: Record; + conversationId: string; + agentId: string; connectorId?: string; + lastRoundSteps?: ConversationRoundStep[]; + browserApiTools?: Array>; } -export const useResumeRoundMutation = ({ connectorId }: UseResumeRoundMutationProps = {}) => { - const { chatService } = useAgentBuilderServices(); - const { services } = useKibana(); - const { conversationActions, browserApiTools } = useConversationContext(); - const [isResuming, setIsResuming] = useState(false); - const [agentReasoning, setAgentReasoning] = useState(null); - const conversationId = useConversationId(); - const agentId = useAgentId(); - const resumeControllerRef = useRef(null); +export interface ResumeRoundMutationBindings { + updateActiveReasoning: (reasoning: string) => void; + setError: (conversationId: string, error: unknown, errorSteps: ConversationRoundStep[]) => void; + clearActiveStream: () => void; +} - const browserApiToolsMetadata = useMemo(() => { - if (!browserApiTools) return undefined; - return browserApiTools.map(toToolMetadata); - }, [browserApiTools]); +type UseResumeRoundMutationProps = ResumeRoundMutationBindings; + +/** + * Resume mutation, used after a HITL pause when the user clicks Approve / Cancel on a + * `ConfirmationPrompt`. Same single-scope `mutationFn` shape as the send mutation. + */ +export const useResumeRoundMutation = ({ + updateActiveReasoning, + setError, + clearActiveStream, +}: UseResumeRoundMutationProps) => { + const { chatService, conversationsService } = useAgentBuilderServices(); + const { services } = useKibana(); + const queryClient = useQueryClient(); + const abortControllerRef = useRef(null); const browserToolExecutor = useMemo(() => { return new BrowserToolExecutor(services.notifications?.toasts); }, [services.notifications?.toasts]); - const { subscribeToChatEvents } = useSubscribeToChatEvents({ - setAgentReasoning, - setIsResponseLoading: setIsResuming, - isAborted: () => Boolean(resumeControllerRef?.current?.signal?.aborted), - browserToolExecutor, - }); + const { mutate, isLoading } = useMutation({ + mutationKey: mutationKeys.resumeRound, + mutationFn: async (vars: ResumeRoundVars) => { + const streamActions = createConversationActions({ + conversationId: vars.conversationId, + queryClient, + conversationsService, + }); - const resumeRound = async ({ prompts }: { prompts: Record }) => { - const signal = resumeControllerRef.current?.signal; - if (!signal) { - return Promise.reject(new Error('Abort signal not present')); - } + const controller = new AbortController(); + abortControllerRef.current = controller; - if (!conversationId) { - return Promise.reject(new Error('Conversation ID is required to resume a round')); - } + // Drop pending prompts from the round — the user has answered, the round is back in progress. + streamActions.clearPendingPrompts(); - const events$ = chatService.resume({ - signal, - prompts, - conversationId, - agentId, - connectorId, - browserApiTools: browserApiToolsMetadata, - }); + try { + const browserApiToolsMetadata = vars.browserApiTools?.map(toToolMetadata); - return subscribeToChatEvents(events$, conversationActions); - }; + const events$ = chatService.resume({ + signal: controller.signal, + prompts: vars.prompts, + conversationId: vars.conversationId, + agentId: vars.agentId, + connectorId: vars.connectorId, + browserApiTools: browserApiToolsMetadata, + }); - const { mutate, isLoading } = useMutation({ - mutationKey: mutationKeys.resumeRound, - mutationFn: resumeRound, - onMutate: () => { - resumeControllerRef.current = new AbortController(); - // Clear the pending prompts from the round - we're now processing - conversationActions.clearPendingPrompts(); - setIsResuming(true); - }, - onSettled: () => { - conversationActions.invalidateConversation(); - resumeControllerRef.current = null; - setAgentReasoning(null); - setIsResuming(false); + await subscribeToChatEvents({ + events$, + conversationActions: streamActions, + browserApiTools: vars.browserApiTools, + browserToolExecutor, + isAborted: () => controller.signal.aborted, + setAgentReasoning: updateActiveReasoning, + }); + } catch (err) { + setError(vars.conversationId, err, vars.lastRoundSteps ?? []); + throw err; + } finally { + streamActions.invalidateConversation(); + clearActiveStream(); + if (abortControllerRef.current === controller) { + abortControllerRef.current = null; + } + } }, - onError: (err) => {}, }); + const cancel = useCallback(() => { + abortControllerRef.current?.abort(); + }, []); + return { - resumeRound: mutate, - isResuming: isLoading || isResuming, - agentReasoning, + mutate, + isLoading, + cancel, }; }; diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message.ts new file mode 100644 index 0000000000000..eee8bc7a5b6f9 --- /dev/null +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message.ts @@ -0,0 +1,196 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { useCallback, useMemo } from 'react'; +import { ConversationRoundStatus } from '@kbn/agent-builder-common'; +import { useConversationContext } from '../conversation/conversation_context'; +import { useConversationId } from '../conversation/use_conversation_id'; +import { useAgentId, useConversation } from '../../hooks/use_conversation'; +import { useConnectorSelection } from '../../hooks/chat/use_connector_selection'; +import { useSendMessageContext, useStreamRecord } from './send_message_context'; + +/** + * Per-conversation scoped hook. Use INSIDE a conversation tree — it reads `conversationId` + * and `agentId` from context. Components asking "am I streaming?" get an answer about + * their own conversation, not the global app. + * + * Outside a conversation tree (e.g. the global sidebar), use `useSendMessageContext()` + * directly. + */ +export const useSendMessage = () => { + const conversationId = useConversationId(); + const agentId = useAgentId(); + const { conversation } = useConversation(); + const { attachments, resetAttachments, browserApiTools } = useConversationContext(); + const { selectedConnector: connectorId } = useConnectorSelection(); + + const { + activeStream, + mutateSendMessage, + mutateResumeRound, + cancelActiveStream, + cleanConversation: cleanConversationCtx, + removeError: removeErrorCtx, + } = useSendMessageContext(); + + const record = useStreamRecord(conversationId); + + const isMyStreamActive = Boolean( + activeStream && conversationId && activeStream.conversationId === conversationId + ); + + const lastRound = conversation?.rounds?.at(-1); + const isLastRoundInProgress = lastRound?.status === ConversationRoundStatus.inProgress; + + const isResponseLoading = + isMyStreamActive && (isLastRoundInProgress || activeStream?.type === 'resume'); + const isResuming = isMyStreamActive && activeStream?.type === 'resume'; + const isRegenerating = isMyStreamActive && activeStream?.type === 'regenerate'; + + const sendMessage = useCallback( + ({ + message, + conversationId: targetConversationId, + }: { + message: string; + conversationId: string; + }) => { + if (!agentId) { + throw new Error('agentId is required to send a message'); + } + mutateSendMessage({ + message, + conversationId: targetConversationId, + agentId, + connectorId, + attachments, + conversationAttachments: conversation?.attachments, + lastRoundSteps: lastRound?.steps, + resetAttachments, + browserApiTools, + }); + }, + [ + mutateSendMessage, + agentId, + connectorId, + attachments, + conversation?.attachments, + lastRound?.steps, + resetAttachments, + browserApiTools, + ] + ); + + const regenerate = useCallback(() => { + if (!conversationId) { + throw new Error('Cannot regenerate without a conversation id'); + } + if (!agentId) { + throw new Error('agentId is required to regenerate'); + } + mutateSendMessage({ + action: 'regenerate', + conversationId, + agentId, + connectorId, + conversationAttachments: conversation?.attachments, + lastRoundSteps: lastRound?.steps, + browserApiTools, + }); + }, [ + mutateSendMessage, + conversationId, + agentId, + connectorId, + conversation?.attachments, + lastRound?.steps, + browserApiTools, + ]); + + const resumeRound = useCallback( + ({ prompts }: { prompts: Record }) => { + if (!conversationId) { + throw new Error('Cannot resume without a conversation id'); + } + if (!agentId) { + throw new Error('agentId is required to resume'); + } + mutateResumeRound({ + prompts, + conversationId, + agentId, + connectorId, + lastRoundSteps: lastRound?.steps, + browserApiTools, + }); + }, + [mutateResumeRound, conversationId, agentId, connectorId, lastRound?.steps, browserApiTools] + ); + + const retry = useCallback(() => { + if (isResponseLoading || !record.error) return; + if (!record.pendingMessage) { + throw new Error('Pending message is not present'); + } + if (!conversationId) { + throw new Error('Cannot retry without a conversation id'); + } + sendMessage({ message: record.pendingMessage, conversationId }); + }, [isResponseLoading, record.error, record.pendingMessage, conversationId, sendMessage]); + + const cancel = useCallback(() => { + cancelActiveStream(); + }, [cancelActiveStream]); + + const cleanConversation = useCallback(() => { + cleanConversationCtx({ conversationId, hasError: Boolean(record.error) }); + }, [cleanConversationCtx, conversationId, record.error]); + + const removeError = useCallback(() => { + if (conversationId) { + removeErrorCtx(conversationId); + } + }, [removeErrorCtx, conversationId]); + + return useMemo( + () => ({ + sendMessage, + regenerate, + resumeRound, + retry, + cancel, + cleanConversation, + removeError, + isResponseLoading, + isResuming, + isRegenerating, + pendingMessage: record.pendingMessage, + error: record.error, + errorSteps: record.errorSteps, + agentReasoning: isMyStreamActive ? activeStream?.agentReasoning ?? null : null, + canCancel: isMyStreamActive, + }), + [ + sendMessage, + regenerate, + resumeRound, + retry, + cancel, + cleanConversation, + removeError, + isResponseLoading, + isResuming, + isRegenerating, + record.pendingMessage, + record.error, + record.errorSteps, + isMyStreamActive, + activeStream?.agentReasoning, + ] + ); +}; diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message_mutation.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message_mutation.ts index f54823f65330f..d21ced973d3a1 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message_mutation.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message_mutation.ts @@ -6,43 +6,57 @@ */ import { useMutation, useQueryClient } from '@kbn/react-query'; -import { useRef, useState, useMemo, useCallback } from 'react'; +import { useCallback, useMemo, useRef } from 'react'; import { toToolMetadata } from '@kbn/agent-builder-browser/tools/browser_api_tool'; +import type { BrowserApiToolDefinition } from '@kbn/agent-builder-browser/tools/browser_api_tool'; import { firstValueFrom } from 'rxjs'; import { isEqual } from 'lodash'; import type { ConversationAction, ConversationRoundStep, -} from '@kbn/agent-builder-common/chat/conversation'; + Conversation, +} from '@kbn/agent-builder-common'; +import { ConversationRoundStatus } from '@kbn/agent-builder-common'; import type { Attachment, + AttachmentInput, ScreenContextAttachmentData, VersionedAttachment, } from '@kbn/agent-builder-common/attachments'; import { AttachmentType, getLatestVersion } from '@kbn/agent-builder-common/attachments'; +import { queryKeys } from '../../query_keys'; import { useKibana } from '../../hooks/use_kibana'; import type { StartServices } from '../../hooks/use_kibana'; -import { useAgentId, useConversation } from '../../hooks/use_conversation'; -import { useConversationContext } from '../conversation/conversation_context'; -import { useConversationId } from '../conversation/use_conversation_id'; import { useAgentBuilderServices } from '../../hooks/use_agent_builder_service'; import { mutationKeys } from '../../mutation_keys'; -import { usePendingMessageState } from './use_pending_message_state'; -import { useSubscribeToChatEvents } from './use_subscribe_to_chat_events'; +import { subscribeToChatEvents } from './use_subscribe_to_chat_events'; import { BrowserToolExecutor } from '../../services/browser_tool_executor'; import { createConversationActions } from '../conversation/use_conversation_actions'; -interface UseSendMessageMutationProps { - connectorId?: string; -} +const SCREEN_CONTEXT_ATTACHMENT_ID = 'screen-context'; -interface SendMessageParams { +export interface SendMessageVars { message?: string; action?: ConversationAction; conversationId: string; + agentId: string; + connectorId?: string; + attachments?: AttachmentInput[]; + conversationAttachments?: VersionedAttachment[]; + lastRoundSteps?: ConversationRoundStep[]; + resetAttachments?: () => void; + browserApiTools?: Array>; } -const SCREEN_CONTEXT_ATTACHMENT_ID = 'screen-context'; +export interface SendMessageMutationBindings { + updateActiveReasoning: (reasoning: string) => void; + setPendingMessage: (conversationId: string, message: string) => void; + clearPendingMessage: (conversationId: string) => void; + setError: (conversationId: string, error: unknown, errorSteps: ConversationRoundStep[]) => void; + clearActiveStream: () => void; +} + +type UseSendMessageMutationProps = SendMessageMutationBindings; const buildScreenContextData = async ({ services, @@ -100,229 +114,156 @@ const withScreenContextAttachment = async ({ ]; }; -export const useSendMessageMutation = ({ connectorId }: UseSendMessageMutationProps = {}) => { +/** + * Send and regenerate-round mutation. Lives in the lifted SendMessageProvider so streaming + * state is visible to the whole app (sidebar included). + * + * Single-scope `mutationFn` (setup → try → catch → finally) — no `onMutate` / `onSettled` + * lifecycle methods, no refs to bridge phases. Each invocation builds its own + * `streamActions` instance targeting `vars.conversationId`, so stream events keep writing + * to the right cache regardless of where the user has navigated. + */ +export const useSendMessageMutation = ({ + updateActiveReasoning, + setPendingMessage, + clearPendingMessage, + setError, + clearActiveStream, +}: UseSendMessageMutationProps) => { const { chatService, conversationsService } = useAgentBuilderServices(); const { services } = useKibana(); - const { attachments, resetAttachments, browserApiTools } = useConversationContext(); const queryClient = useQueryClient(); - const [isResponseLoading, setIsResponseLoading] = useState(false); - const [agentReasoning, setAgentReasoning] = useState(null); - const conversationId = useConversationId(); - const { conversation } = useConversation(); - const isRegeneratingRef = useRef(false); - const agentId = useAgentId(); - const messageControllerRef = useRef(null); - - // Build actions bound to a specific conversation id. Mutation callbacks pass the id from vars - // so cache writes target the right key regardless of context. - // - // TODO: this per-call factory exists because one `useSendMessageMutation` instance serves - // multiple conversations during its lifetime (the SendMessageProvider doesn't unmount on - // navigation between /conversations/new and /conversations/). When the streaming state - // is lifted to an app-level provider keyed by conversation id (concurrent-streams PR), each - // conversation will own its own actions instance built once, and this builder goes away. - const buildActionsFor = useCallback( - (id: string) => - createConversationActions({ - conversationId: id, - queryClient, - conversationsService, - }), - [queryClient, conversationsService] - ); - - const [error, setError] = useState(null); - const [errorSteps, setErrorSteps] = useState([]); - - const removeError = useCallback(() => { - setError(null); - setErrorSteps((prev) => (prev.length === 0 ? prev : [])); - }, []); - - const browserApiToolsMetadata = useMemo(() => { - if (!browserApiTools) return undefined; - return browserApiTools.map(toToolMetadata); - }, [browserApiTools]); + const abortControllerRef = useRef(null); const browserToolExecutor = useMemo(() => { return new BrowserToolExecutor(services.notifications?.toasts); }, [services.notifications?.toasts]); - const { - pendingMessageState: { pendingMessage }, - setPendingMessage, - removePendingMessage, - } = usePendingMessageState({ conversationId }); - const { subscribeToChatEvents, unsubscribeFromChatEvents } = useSubscribeToChatEvents({ - setAgentReasoning, - setIsResponseLoading, - isAborted: () => Boolean(messageControllerRef?.current?.signal?.aborted), - browserToolExecutor, - }); - - const sendMessage = async ({ - message, - action, - conversationId: targetConversationId, - }: SendMessageParams) => { - const signal = messageControllerRef.current?.signal; - const isRegenerate = action === 'regenerate'; - if (!signal) { - return Promise.reject(new Error('Abort signal not present')); - } - - if (isRegenerate) { - const events$ = chatService.regenerate({ - signal, - conversationId: targetConversationId, - agentId, - connectorId, - browserApiTools: browserApiToolsMetadata, - }); - - return subscribeToChatEvents(events$, buildActionsFor(targetConversationId)); - } - - // Normal send: requires a message - if (!message) { - return Promise.reject(new Error('Message is required')); - } - - const contextAttachments = await withScreenContextAttachment({ - services, - conversationAttachments: conversation?.attachments, - }); - - const events$ = chatService.chat({ - signal, - input: message, - conversationId: targetConversationId, - agentId, - connectorId, - attachments: [...(attachments || []), ...contextAttachments], - browserApiTools: browserApiToolsMetadata, - }); - - return subscribeToChatEvents(events$, buildActionsFor(targetConversationId)); - }; - const { mutate, isLoading } = useMutation({ mutationKey: mutationKeys.sendMessage, - mutationFn: sendMessage, - onMutate: ({ message, action, conversationId: targetConversationId }) => { - const isRegenerate = action === 'regenerate'; - removeError(); - messageControllerRef.current = new AbortController(); - isRegeneratingRef.current = isRegenerate; + mutationFn: async (vars: SendMessageVars) => { + const isRegenerate = vars.action === 'regenerate'; + + // Each conversation owns its streaming lifecycle. The streamActions instance built + // here is closure-bound to vars.conversationId for the duration of this mutation — + // stream events target that conversation regardless of navigation. + const streamActions = createConversationActions({ + conversationId: vars.conversationId, + queryClient, + conversationsService, + }); - const conversationActions = buildActionsFor(targetConversationId); + const controller = new AbortController(); + abortControllerRef.current = controller; if (isRegenerate) { - // Clear the existing response immediately so UI shows empty state - // This must happen before setIsResponseLoading triggers the streaming UI - conversationActions.clearLastRoundResponse(); - } else if (message) { - if (!agentId) { - throw new Error('Agent id must be defined to send a message'); + // Clear the existing response immediately so UI shows empty state. + streamActions.clearLastRoundResponse(); + } else { + if (!vars.message) { + throw new Error('Message is required'); } - setPendingMessage(message, targetConversationId); - conversationActions.addOptimisticRound({ - userMessage: message, - attachments: attachments ?? [], - agentId, + setPendingMessage(vars.conversationId, vars.message); + streamActions.addOptimisticRound({ + userMessage: vars.message, + attachments: vars.attachments ?? [], + agentId: vars.agentId, }); - } else { - throw new Error('Message is required'); - } - setIsResponseLoading(true); - }, - onSettled: (_data, _err, vars) => { - buildActionsFor(vars.conversationId).invalidateConversation(); - messageControllerRef.current = null; - setAgentReasoning(null); - if (isResponseLoading) { - setIsResponseLoading(false); } - isRegeneratingRef.current = false; - }, - onSuccess: (_data, vars) => { - if (isRegeneratingRef.current) return; - removePendingMessage(vars.conversationId); - resetAttachments?.(); - }, - onError: (err, vars) => { - setError(err); - const steps = conversation?.rounds?.at(-1)?.steps; - if (steps) { - setErrorSteps(steps); + + try { + const browserApiToolsMetadata = vars.browserApiTools?.map(toToolMetadata); + + const events$ = isRegenerate + ? chatService.regenerate({ + signal: controller.signal, + conversationId: vars.conversationId, + agentId: vars.agentId, + connectorId: vars.connectorId, + browserApiTools: browserApiToolsMetadata, + }) + : chatService.chat({ + signal: controller.signal, + input: vars.message!, + conversationId: vars.conversationId, + agentId: vars.agentId, + connectorId: vars.connectorId, + attachments: [ + ...(vars.attachments ?? []), + ...(await withScreenContextAttachment({ + services, + conversationAttachments: vars.conversationAttachments, + })), + ], + browserApiTools: browserApiToolsMetadata, + }); + + await subscribeToChatEvents({ + events$, + conversationActions: streamActions, + browserApiTools: vars.browserApiTools, + browserToolExecutor, + isAborted: () => controller.signal.aborted, + setAgentReasoning: updateActiveReasoning, + }); + + if (!isRegenerate) { + clearPendingMessage(vars.conversationId); + vars.resetAttachments?.(); + } + } catch (err) { + setError(vars.conversationId, err, vars.lastRoundSteps ?? []); + if (!isRegenerate) { + // Remove the optimistic round immediately so the error round and the optimistic + // round are not both visible. + streamActions.removeOptimisticRound(); + } + throw err; + } finally { + // Skip invalidation when the round paused on a HITL prompt. The cache is the + // canonical source of truth in that state (server has the same pending prompt), + // and a stale-marked cache would race with the resume mutation's optimistic + // updates if anything reopened the `useConversation` gate. + const cached = queryClient.getQueryData( + queryKeys.conversations.byId(vars.conversationId) + ); + const endedInAwaitingPrompt = + cached?.rounds?.at(-1)?.status === ConversationRoundStatus.awaitingPrompt; + if (!endedInAwaitingPrompt) { + streamActions.invalidateConversation(); + } + clearActiveStream(); + if (abortControllerRef.current === controller) { + abortControllerRef.current = null; + } } - if (isRegeneratingRef.current) return; - // When we error, we should immediately remove the round rather than waiting for a refetch after invalidation - // Otherwise, the error round and the optimistic round will be visible together. - buildActionsFor(vars.conversationId).removeOptimisticRound(); }, }); - const canCancel = isLoading; - const cancel = () => { - if (!canCancel) { - return; - } - removePendingMessage(); - messageControllerRef.current?.abort(); - }; + const cancel = useCallback(() => { + abortControllerRef.current?.abort(); + }, []); - return { - sendMessage: mutate, - isResponseLoading, - error, - errorSteps, - pendingMessage, - agentReasoning, - retry: () => { - if ( - // Retrying should not be allowed if a response is still being fetched - // or if we're not in an error state - isResponseLoading || - !error - ) { + const cleanConversation = useCallback( + ({ conversationId, hasError }: { conversationId?: string; hasError: boolean }) => { + // Cleaning the conversation is only invoked when the user wants to back out of a + // pending or errored state. If a stream is in flight, abort it; otherwise just clear + // any leftover error/pending state. + if (isLoading) { + abortControllerRef.current?.abort(); return; } - - if (!pendingMessage) { - // Should never happen - // If we are in an error state, pending message will be present - throw new Error('Pending message is not present'); + if (hasError && conversationId) { + clearPendingMessage(conversationId); } - if (!conversationId) { - throw new Error('Cannot retry without a conversation id'); - } - - mutate({ message: pendingMessage, conversationId }); }, - canCancel, + [isLoading, clearPendingMessage] + ); + + return { + mutate, + isLoading, cancel, - cleanConversation: () => { - // Cleaning the conversation only happens when we are on "/new" and the user wants to back out of a pending or errored conversation and return to an empty conversation state - if (isLoading) { - // Conversation round is pending, unsubscribe from chat events and resolve mutation - unsubscribeFromChatEvents(); - } else if (Boolean(error)) { - removeError(); - removePendingMessage(); - } - }, - /** - * Regenerate the last conversation round. - * Uses the same mutation flow but with action=regenerate. - */ - regenerate: () => { - if (!conversationId) { - throw new Error('Cannot regenerate without a conversation id'); - } - mutate({ action: 'regenerate', conversationId }); - }, - isRegenerating: isLoading && isRegeneratingRef.current, - removeError, + cleanConversation, }; }; From 7c654420e639e3c364f013b4a8e40e411333edf4 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 1 May 2026 17:25:13 +0100 Subject: [PATCH 10/17] updated chat events --- .../use_subscribe_to_chat_events.ts | 316 ++++++++---------- 1 file changed, 148 insertions(+), 168 deletions(-) diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_subscribe_to_chat_events.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_subscribe_to_chat_events.ts index 806003f09c8d8..326b5fcf396f1 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_subscribe_to_chat_events.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_subscribe_to_chat_events.ts @@ -27,183 +27,163 @@ import { createToolCallStep, } from '@kbn/agent-builder-common/chat/conversation'; import { i18n } from '@kbn/i18n'; -import { finalize, type Observable, type Subscription } from 'rxjs'; +import { finalize, type Observable } from 'rxjs'; import { isBrowserToolCallEvent } from '@kbn/agent-builder-common/chat/events'; -import { useRef } from 'react'; -import { useConversationContext } from '../conversation/conversation_context'; +import type { BrowserApiToolDefinition } from '@kbn/agent-builder-browser/tools/browser_api_tool'; import type { ConversationActions } from '../conversation/use_conversation_actions'; import type { BrowserToolExecutor } from '../../services/browser_tool_executor'; -export const useSubscribeToChatEvents = ({ - setAgentReasoning, - setIsResponseLoading, - isAborted, - browserToolExecutor, -}: { - setAgentReasoning: (agentReasoning: string) => void; - setIsResponseLoading: (isResponseLoading: boolean) => void; - isAborted: () => boolean; +interface SubscribeOptions { + events$: Observable; + conversationActions: ConversationActions; + browserApiTools?: Array>; browserToolExecutor?: BrowserToolExecutor; -}) => { - const { browserApiTools } = useConversationContext(); - const unsubscribedRef = useRef(false); - const subscriptionRef = useRef(null); - - const unsubscribeFromChatEvents = () => { - unsubscribedRef.current = true; - subscriptionRef.current?.unsubscribe(); - }; + isAborted: () => boolean; + setAgentReasoning: (agentReasoning: string) => void; +} - const subscribeToChatEvents = ( - events$: Observable, - conversationActions: ConversationActions - ) => { - const nextChatEvent = (event: ChatEvent) => { - // chunk received, we append it to the chunk buffer - if (isMessageChunkEvent(event)) { - conversationActions.addAssistantMessageChunk({ messageChunk: event.data.text_chunk }); +/** + * Subscribe to a chat event stream and dispatch every event to the conversation cache via + * `conversationActions`. Returns a Promise that resolves when the stream completes (success + * or abort) and rejects on a real error. + * + * Plain function (not a hook) so mutation `mutationFn` can call it inline. Takes + * `conversationActions` as a parameter rather than reading from React context — each mutation + * builds its own actions targeting the mutation-owned conversation id, so events keep writing + * to the right cache regardless of where the user has navigated. + */ +export const subscribeToChatEvents = ({ + events$, + conversationActions, + browserApiTools, + browserToolExecutor, + isAborted, + setAgentReasoning, +}: SubscribeOptions): Promise => { + const nextChatEvent = (event: ChatEvent) => { + if (isMessageChunkEvent(event)) { + conversationActions.addAssistantMessageChunk({ messageChunk: event.data.text_chunk }); + } else if (isMessageCompleteEvent(event)) { + conversationActions.setAssistantMessage({ + assistantMessage: event.data.message_content, + }); + } else if (isToolProgressEvent(event)) { + const isInternalProgress = event.data.metadata?.internal === 'true'; + conversationActions.setToolCallProgress({ + progress: { + message: event.data.message, + metadata: event.data.metadata ?? {}, + }, + toolCallId: event.data.tool_call_id, + }); + if (!isInternalProgress) { + setAgentReasoning(event.data.message); } - // full message received, override chunk buffer - else if (isMessageCompleteEvent(event)) { - conversationActions.setAssistantMessage({ - assistantMessage: event.data.message_content, - }); - } else if (isToolProgressEvent(event)) { - const isInternalProgress = event.data.metadata?.internal === 'true'; - conversationActions.setToolCallProgress({ - progress: { - message: event.data.message, - metadata: event.data.metadata ?? {}, - }, - toolCallId: event.data.tool_call_id, - }); - // Individual tool progression message should also be displayed as reasoning - // (but skip internal progress messages) - if (!isInternalProgress) { - setAgentReasoning(event.data.message); - } - } else if (isReasoningEvent(event)) { - conversationActions.addReasoningStep({ - step: createReasoningStep({ - reasoning: event.data.reasoning, - transient: event.data.transient, - tool_call_id: event.data.tool_call_id, - tool_call_group_id: event.data.tool_call_group_id, - }), - }); - setAgentReasoning(event.data.reasoning); - } else if (isToolCallEvent(event)) { - conversationActions.addToolCall({ - step: createToolCallStep({ - params: event.data.params, - results: [], - tool_call_id: event.data.tool_call_id, - tool_id: event.data.tool_id, - tool_call_group_id: event.data.tool_call_group_id, - tool_origin: event.data.tool_origin, - }), - }); - } else if (isBrowserToolCallEvent(event)) { - // Check if this is a browser tool call and execute it immediately - const toolId = event.data.tool_id; - if (toolId && browserToolExecutor && browserApiTools) { - const toolDef = browserApiTools.find((tool) => tool.id === toolId); - if (toolDef) { - const toolsMap = new Map([[toolId, toolDef]]); - browserToolExecutor - .executeToolCalls( - [ - { - tool_id: toolId, - call_id: event.data.tool_call_id, - params: event.data.params, - timestamp: Date.now(), - }, - ], - toolsMap - ) - .catch((error) => { - // eslint-disable-next-line no-console - console.error('Failed to execute browser tool:', error); - }); - } + } else if (isReasoningEvent(event)) { + conversationActions.addReasoningStep({ + step: createReasoningStep({ + reasoning: event.data.reasoning, + transient: event.data.transient, + tool_call_id: event.data.tool_call_id, + tool_call_group_id: event.data.tool_call_group_id, + }), + }); + setAgentReasoning(event.data.reasoning); + } else if (isToolCallEvent(event)) { + conversationActions.addToolCall({ + step: createToolCallStep({ + params: event.data.params, + results: [], + tool_call_id: event.data.tool_call_id, + tool_id: event.data.tool_id, + tool_call_group_id: event.data.tool_call_group_id, + tool_origin: event.data.tool_origin, + }), + }); + } else if (isBrowserToolCallEvent(event)) { + const toolId = event.data.tool_id; + if (toolId && browserToolExecutor && browserApiTools) { + const toolDef = browserApiTools.find((tool) => tool.id === toolId); + if (toolDef) { + const toolsMap = new Map([[toolId, toolDef]]); + browserToolExecutor + .executeToolCalls( + [ + { + tool_id: toolId, + call_id: event.data.tool_call_id, + params: event.data.params, + timestamp: Date.now(), + }, + ], + toolsMap + ) + .catch((error) => { + // eslint-disable-next-line no-console + console.error('Failed to execute browser tool:', error); + }); } - } else if (isToolResultEvent(event)) { - const { tool_call_id: toolCallId, results } = event.data; - conversationActions.setToolCallResult({ results, toolCallId }); - } else if (isRoundCompleteEvent(event)) { - // Now we have the full response and can stop the loading indicators - setIsResponseLoading(false); - } else if (isConversationCreatedEvent(event)) { - conversationActions.onConversationCreated({ title: event.data.title }); - } else if (isThinkingCompleteEvent(event)) { - conversationActions.setTimeToFirstToken({ - timeToFirstToken: event.data.time_to_first_token, - }); - } else if (isPromptRequestEvent(event)) { - conversationActions.addPendingPrompt({ - prompt: event.data.prompt, - }); - // Stop loading when a prompt is requested - the round is now awaiting user input - setIsResponseLoading(false); - } else if (isCompactionStartedEvent(event)) { - conversationActions.addCompactionStep({ - tokenCountBefore: event.data.token_count_before, - }); - setAgentReasoning( - i18n.translate('xpack.agentBuilder.chatEvents.compactionStarted', { - defaultMessage: 'Compacting conversation context', - }) - ); - } else if (isCompactionCompletedEvent(event)) { - conversationActions.setCompactionStepComplete({ - tokenCountAfter: event.data.token_count_after, - summarizedRoundCount: event.data.summarized_round_count, - }); - } else if (isBackgroundAgentCompleteEvent(event)) { - conversationActions.addBackgroundExecutionCompleteStep({ - step: { - type: ConversationRoundStepType.backgroundAgentComplete, - ...event.data.execution, - }, - }); } - }; - - return new Promise((resolve, reject) => { - if (unsubscribedRef.current) { - resolve(); - return; - } - subscriptionRef.current = events$ - .pipe( - finalize(() => { - // When the subscription is unsubscribed from, `complete` will not be called, but the `finalize` callback will be - if (unsubscribedRef.current) { - resolve(); - } - }) - ) - .subscribe({ - next: nextChatEvent, - complete: () => { - resolve(); - }, - error: (err) => { - // If the request is aborted, we don't want to show an error - if (isAborted()) { - resolve(); - return; - } - reject(err); - }, - }); - }).finally(() => { - if (unsubscribedRef.current) { - unsubscribedRef.current = false; - } - }); + } else if (isToolResultEvent(event)) { + const { tool_call_id: toolCallId, results } = event.data; + conversationActions.setToolCallResult({ results, toolCallId }); + } else if (isRoundCompleteEvent(event)) { + // No-op. `isResponseLoading` is derived in `useSendMessage` from `activeStream`, + // and `activeStream` is cleared in the mutation's `finally` when the stream ends — + // so we don't need an explicit signal here. + } else if (isConversationCreatedEvent(event)) { + conversationActions.onConversationCreated({ title: event.data.title }); + } else if (isThinkingCompleteEvent(event)) { + conversationActions.setTimeToFirstToken({ + timeToFirstToken: event.data.time_to_first_token, + }); + } else if (isPromptRequestEvent(event)) { + conversationActions.addPendingPrompt({ + prompt: event.data.prompt, + }); + } else if (isCompactionStartedEvent(event)) { + conversationActions.addCompactionStep({ + tokenCountBefore: event.data.token_count_before, + }); + setAgentReasoning( + i18n.translate('xpack.agentBuilder.chatEvents.compactionStarted', { + defaultMessage: 'Compacting conversation context', + }) + ); + } else if (isCompactionCompletedEvent(event)) { + conversationActions.setCompactionStepComplete({ + tokenCountAfter: event.data.token_count_after, + summarizedRoundCount: event.data.summarized_round_count, + }); + } else if (isBackgroundAgentCompleteEvent(event)) { + conversationActions.addBackgroundExecutionCompleteStep({ + step: { + type: ConversationRoundStepType.backgroundAgentComplete, + ...event.data.execution, + }, + }); + } }; - return { subscribeToChatEvents, unsubscribeFromChatEvents }; + return new Promise((resolve, reject) => { + events$ + .pipe( + finalize(() => { + if (isAborted()) { + resolve(); + } + }) + ) + .subscribe({ + next: nextChatEvent, + complete: () => resolve(), + error: (err) => { + if (isAborted()) { + resolve(); + return; + } + reject(err); + }, + }); + }); }; From c8416f9031b2d1c9c0f53461d6ba171834904f2d Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 1 May 2026 17:28:19 +0100 Subject: [PATCH 11/17] update use conversation gating condition - allows users to change conversations mid stream --- .../application/hooks/use_conversation.ts | 39 ++++++++++++++----- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_conversation.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_conversation.ts index 88c382b2832dd..a2d113b777c6c 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_conversation.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_conversation.ts @@ -5,27 +5,45 @@ * 2.0. */ -import { useQuery } from '@kbn/react-query'; +import { useQuery, useQueryClient } from '@kbn/react-query'; import { useMemo } from 'react'; import useLocalStorage from 'react-use/lib/useLocalStorage'; -import { agentBuilderDefaultAgentId, ConversationRoundStatus } from '@kbn/agent-builder-common'; +import { + agentBuilderDefaultAgentId, + ConversationRoundStatus, + type Conversation, +} from '@kbn/agent-builder-common'; import type { IHttpFetchError } from '@kbn/core-http-browser'; import type { ErrorPromptType } from '../components/common/prompt/error_prompt'; import { queryKeys } from '../query_keys'; import { createNewRound } from '../utils/new_conversation'; import { useConversationId } from '../context/conversation/use_conversation_id'; -import { useIsSendingMessage } from './use_is_sending_message'; import { useAgentBuilderServices } from './use_agent_builder_service'; import { storageKeys } from '../storage_keys'; -import { useSendMessage } from '../context/send_message/send_message_context'; +import { + useSendMessageContext, + useStreamRecord, +} from '../context/send_message/send_message_context'; import { useValidateAgentId } from './agents/use_validate_agent_id'; import { useConversationContext } from '../context/conversation/conversation_context'; export const useConversation = () => { const conversationId = useConversationId(); const { conversationsService } = useAgentBuilderServices(); + const queryClient = useQueryClient(); const queryKey = queryKeys.conversations.byId(conversationId ?? ''); - const isSendingMessage = useIsSendingMessage(); + const { activeStream } = useSendMessageContext(); + + // Disable the query when this conversation is being written to by a stream, OR when + // its cached state shows a HITL pause. The cache is authoritative in both states; a + // refetch would race with optimistic chunks (streaming) or with the resume mutation + // about to fire (HITL). Reading the round status from the cache rather than relying + // on mutation timing makes the handoff between send and resume safe. + const isAwaitingPrompt = + queryClient.getQueryData(queryKey)?.rounds?.at(-1)?.status === + ConversationRoundStatus.awaitingPrompt; + + const isThisConversationStreaming = activeStream?.conversationId === conversationId; const { data: conversation, @@ -36,10 +54,7 @@ export const useConversation = () => { error, } = useQuery({ queryKey, - // Disable query when there's no conversationId yet (entry "new" state) or while a message is - // streaming (a refetch would overwrite optimistic updates). When the mutation finishes, - // `onSettled` invalidates this query and the gate flips open, triggering the GET. - enabled: Boolean(conversationId) && !isSendingMessage, + enabled: Boolean(conversationId) && !isThisConversationStreaming && !isAwaitingPrompt, queryFn: () => { if (!conversationId) { return Promise.reject(new Error('Invalid conversation id')); @@ -53,6 +68,9 @@ export const useConversation = () => { } return failureCount < 3; }, + // Refetching an errored query (no cached success) resets status `error` → `loading`, + // which would clear `errorType` and flip `Conversation`'s conditional rendering. Resulting in a loop of unmounts/remounts. + retryOnMount: false, }); return { conversation, isLoading, isFetching, isFetched, isError, error }; @@ -127,7 +145,8 @@ export const useConversationTitle = () => { export const useConversationRounds = () => { const { conversation } = useConversation(); - const { pendingMessage, error, errorSteps } = useSendMessage(); + const conversationId = useConversationId(); + const { pendingMessage, error, errorSteps } = useStreamRecord(conversationId); const conversationRounds = useMemo(() => { const rounds = conversation?.rounds ?? []; From 7f8c14ac4a657c471868eff593e0130ba6192988 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 1 May 2026 17:28:36 +0100 Subject: [PATCH 12/17] fix test --- .../layout/unified_sidebar/unified_sidebar.test.tsx | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/layout/unified_sidebar/unified_sidebar.test.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/layout/unified_sidebar/unified_sidebar.test.tsx index 1cd50af26bb50..7f41066ceb577 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/layout/unified_sidebar/unified_sidebar.test.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/layout/unified_sidebar/unified_sidebar.test.tsx @@ -48,6 +48,10 @@ jest.mock('react-use/lib/useLocalStorage', () => ({ default: () => [undefined, jest.fn()], })); +jest.mock('../../../context/send_message/send_message_context', () => ({ + useSendMessageContext: () => ({ removeAllErrors: jest.fn() }), +})); + import { UnifiedSidebar } from './unified_sidebar'; const renderSidebar = (path: string) => From 1efe9864a8332ee04c38db1bec5eb2ac14d2187e Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 1 May 2026 17:38:41 +0100 Subject: [PATCH 13/17] disable hitl buttons if any other stream is in flight in another conversation --- .../round_prompt/confirmation_prompt.tsx | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_prompt/confirmation_prompt.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_prompt/confirmation_prompt.tsx index 74c1b18d215a4..43064f1c05929 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_prompt/confirmation_prompt.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_prompt/confirmation_prompt.tsx @@ -50,6 +50,7 @@ export interface ConfirmationPromptProps { onConfirm: () => void; onCancel: () => void; isLoading?: boolean; + isDisabled?: boolean; isAnswered?: boolean; answeredValue?: boolean; } @@ -59,6 +60,7 @@ export const ConfirmationPrompt: React.FC = ({ onConfirm, onCancel, isLoading = false, + isDisabled = false, isAnswered = false, answeredValue, }) => { @@ -120,7 +122,7 @@ export const ConfirmationPrompt: React.FC = ({ = ({ Date: Fri, 1 May 2026 17:46:40 +0100 Subject: [PATCH 14/17] update contributor guide --- .../shared/agent_builder/CONTRIBUTOR_GUIDE.md | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/x-pack/platform/plugins/shared/agent_builder/CONTRIBUTOR_GUIDE.md b/x-pack/platform/plugins/shared/agent_builder/CONTRIBUTOR_GUIDE.md index 5a6d92f685c87..4494d4abe77d9 100644 --- a/x-pack/platform/plugins/shared/agent_builder/CONTRIBUTOR_GUIDE.md +++ b/x-pack/platform/plugins/shared/agent_builder/CONTRIBUTOR_GUIDE.md @@ -1261,3 +1261,98 @@ setupDeps.agentBuilder.sml.registerType(visualizationSmlType); The full implementation is ~130 lines and serves as the reference for new types. +## Streams lifecycle (frontend) + +The chat streaming layer lives in `public/application/context/send_message/`. This section +documents how mutations, lifted state, and the React Query cache fit together. Read this +before touching any of: +`send_message_context.tsx`, `use_send_message.ts`, `use_send_message_mutation.ts`, +`use_resume_round_mutation.ts`, `use_subscribe_to_chat_events.ts`, +`use_is_any_conversation_streaming.ts`. + +### The lift + +`` is mounted **once** above the routes/sidebar: + +- Routed app: in `mount.tsx`, above ``. The sidebar is part of the + routes, so it can read streaming state directly. +- Embeddable: in `embeddable_conversations_provider.tsx`, one provider per embeddable + instance because each instance has its own `QueryClient`. + +The sidebar uses `useIsAnyConversationStreaming()` and `useSendMessageContext()` directly. +Anything inside the conversation tree should use the per-conversation scoped hook, +`useSendMessage()` (in `use_send_message.ts`). + +### Lifted state + +`SendMessageProvider` owns: + +- `activeStream: { conversationId, type, agentReasoning } | undefined` — points at the + conversation that is currently streaming. Set synchronously when each mutation kicks + off; cleared in the mutation's `finally`. +- `byConversationId: Record` — per-conversation `pendingMessage`, + `error`, `errorSteps`. Persists across stream end so a user can hit Retry after a + failure. + +### Mutations: single-scope `mutationFn` + +`useSendMessageMutation` and `useResumeRoundMutation` use a **single-scope `mutationFn` +with `try / catch / finally`**, not React Query's lifecycle methods (`onMutate`, +`onSuccess`, `onError`, `onSettled`). The shape is: + +```ts +mutationFn: async (vars) => { + // setup phase (sync, before any await): seed the optimistic round, set pending message. + // Note: `activeStream` is set by the provider's `mutateSendMessage` wrapper *before* + // `mutate()` returns — not here. + const streamActions = createConversationActions({ conversationId: vars.conversationId, ... }); + + try { + await subscribeToChatEvents({ events$, conversationActions: streamActions, ... }); + // success cleanup + } catch (err) { + // error cleanup + throw err; + } finally { + // cleanup: invalidate cache (skipped if round paused on HITL), + // clear `activeStream`, clear abort ref. + } +} +``` + +**Why not lifecycle methods?** Streams aren't typical mutations — the bulk of the work +happens *during* `mutationFn`, with state mutations flowing for many seconds. Splitting +the work across lifecycle callbacks forces you to bridge state between scopes via refs +or React Query's `context` return — neither is clean for streaming. With single-scope, +`streamActions` and `vars` are visible throughout. No refs to bridge phases. Reads +top-to-bottom. + +### Each conversation owns its streaming lifecycle + +Every `mutationFn` invocation builds its **own** `streamActions` instance via +`createConversationActions({ conversationId: vars.conversationId, ... })`. That instance +is closure-bound to the mutation's conversation id. **Stream events target the +conversation the mutation was started for, regardless of where the user has navigated.** + +If the user submits on conversation A and immediately switches to B, the stream events +keep writing to A's cache. B loads cleanly from the server. + +### Per-conversation `useConversation` gate + +`useConversation` is disabled for a conversation when (a) a stream is currently writing +to its cache, or (b) the cache shows it's paused on a HITL prompt. The cache is +authoritative in both cases, so a refetch would race with optimistic chunks (streaming) +or with the resume mutation about to fire on Approve (HITL). Other conversations stay +free to refetch — switch to conversation B while A streams and B loads cleanly. See the +inline comment on the `enabled` predicate for details. + +### Single-stream vs concurrent streams + +Today the app enforces single-stream-at-a-time. The global gates (HITL Approve, +submit button, page-leave guard) all read `useIsAnyConversationStreaming()`. + +The architecture supports concurrent streams in principle — per-conversation cache, +mutation-scoped `streamActions`, lifted `byConversationId`. The follow-up PR removes +the global gates, moves the abort controller into a per-conversation slot so each +stream can be cancelled independently, and enables concurrent streams. + From 7fbbb2d16a342b45f8eddd40c145c22cbf2e3661 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Tue, 5 May 2026 18:07:37 +0100 Subject: [PATCH 15/17] fix error handling --- .../send_message/send_message_context.tsx | 1 + .../send_message/use_resume_round_mutation.ts | 7 ++++++- .../send_message/use_send_message_mutation.ts | 21 ++++++++++++++----- .../application/hooks/use_conversation.ts | 21 +++++++++++++------ 4 files changed, 38 insertions(+), 12 deletions(-) diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/send_message_context.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/send_message_context.tsx index 61f3820642f32..ec924eac747a4 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/send_message_context.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/send_message_context.tsx @@ -116,6 +116,7 @@ export const SendMessageProvider = ({ children }: { children: React.ReactNode }) setPendingMessage, clearPendingMessage, setError, + clearError: removeError, clearActiveStream, }); diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_resume_round_mutation.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_resume_round_mutation.ts index daee6238637e5..cdebd5f5173ed 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_resume_round_mutation.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_resume_round_mutation.ts @@ -67,6 +67,7 @@ export const useResumeRoundMutation = ({ // Drop pending prompts from the round — the user has answered, the round is back in progress. streamActions.clearPendingPrompts(); + let succeeded = false; try { const browserApiToolsMetadata = vars.browserApiTools?.map(toToolMetadata); @@ -87,11 +88,15 @@ export const useResumeRoundMutation = ({ isAborted: () => controller.signal.aborted, setAgentReasoning: updateActiveReasoning, }); + succeeded = true; } catch (err) { setError(vars.conversationId, err, vars.lastRoundSteps ?? []); throw err; } finally { - streamActions.invalidateConversation(); + // Only invalidate on success — see use_send_message_mutation.ts for rationale. + if (succeeded) { + streamActions.invalidateConversation(); + } clearActiveStream(); if (abortControllerRef.current === controller) { abortControllerRef.current = null; diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message_mutation.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message_mutation.ts index d21ced973d3a1..cfdba52327ca8 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message_mutation.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message_mutation.ts @@ -53,6 +53,7 @@ export interface SendMessageMutationBindings { setPendingMessage: (conversationId: string, message: string) => void; clearPendingMessage: (conversationId: string) => void; setError: (conversationId: string, error: unknown, errorSteps: ConversationRoundStep[]) => void; + clearError: (conversationId: string) => void; clearActiveStream: () => void; } @@ -128,6 +129,7 @@ export const useSendMessageMutation = ({ setPendingMessage, clearPendingMessage, setError, + clearError, clearActiveStream, }: UseSendMessageMutationProps) => { const { chatService, conversationsService } = useAgentBuilderServices(); @@ -144,6 +146,12 @@ export const useSendMessageMutation = ({ mutationFn: async (vars: SendMessageVars) => { const isRegenerate = vars.action === 'regenerate'; + // Clear any previous error for this conversation before starting the new mutation. + // Covers retry, fresh-send-after-error, and regenerate-after-error uniformly — + // otherwise `useConversationRounds` would render the stale error round alongside + // the new optimistic round. + clearError(vars.conversationId); + // Each conversation owns its streaming lifecycle. The streamActions instance built // here is closure-bound to vars.conversationId for the duration of this mutation — // stream events target that conversation regardless of navigation. @@ -171,6 +179,7 @@ export const useSendMessageMutation = ({ }); } + let succeeded = false; try { const browserApiToolsMetadata = vars.browserApiTools?.map(toToolMetadata); @@ -211,6 +220,7 @@ export const useSendMessageMutation = ({ clearPendingMessage(vars.conversationId); vars.resetAttachments?.(); } + succeeded = true; } catch (err) { setError(vars.conversationId, err, vars.lastRoundSteps ?? []); if (!isRegenerate) { @@ -220,16 +230,17 @@ export const useSendMessageMutation = ({ } throw err; } finally { - // Skip invalidation when the round paused on a HITL prompt. The cache is the - // canonical source of truth in that state (server has the same pending prompt), - // and a stale-marked cache would race with the resume mutation's optimistic - // updates if anything reopened the `useConversation` gate. + // Only invalidate on success. On error: refetching a fresh conversation that + // never persisted server-side would 404 and replace the in-round error UI with + // the "Conversation not found" page. The cache already holds the right state + // for `useConversationRounds` to render the synthetic error round. + // Also skip when paused on a HITL prompt (cache is canonical there too). const cached = queryClient.getQueryData( queryKeys.conversations.byId(vars.conversationId) ); const endedInAwaitingPrompt = cached?.rounds?.at(-1)?.status === ConversationRoundStatus.awaitingPrompt; - if (!endedInAwaitingPrompt) { + if (succeeded && !endedInAwaitingPrompt) { streamActions.invalidateConversation(); } clearActiveStream(); diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_conversation.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_conversation.ts index ed1dd2a52184b..7ba2c54dde884 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_conversation.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/hooks/use_conversation.ts @@ -33,19 +33,24 @@ export const useConversation = () => { const { conversationsService } = useAgentBuilderServices(); const queryClient = useQueryClient(); const queryKey = queryKeys.conversations.byId(conversationId ?? ''); - const { activeStream } = useSendMessageContext(); + const { activeStream, byConversationId } = useSendMessageContext(); // Disable the query when this conversation is being written to by a stream, OR when - // its cached state shows a HITL pause. The cache is authoritative in both states; a - // refetch would race with optimistic chunks (streaming) or with the resume mutation - // about to fire (HITL). Reading the round status from the cache rather than relying - // on mutation timing makes the handoff between send and resume safe. + // its cached state shows a HITL pause, OR when there's an unpersisted error in the + // per-conversation error map. The cache is authoritative in all three cases; a + // refetch would race with optimistic chunks (streaming), or with the resume mutation + // about to fire (HITL), or 404 a fresh conversation that errored before the backend + // persisted it (overriding the in-round error UI with "Conversation not found"). const isAwaitingPrompt = queryClient.getQueryData(queryKey)?.rounds?.at(-1)?.status === ConversationRoundStatus.awaitingPrompt; const isThisConversationStreaming = activeStream?.conversationId === conversationId; + const hasUnpersistedError = conversationId + ? Boolean(byConversationId[conversationId]?.error) + : false; + const { data: conversation, isLoading, @@ -55,7 +60,11 @@ export const useConversation = () => { error, } = useQuery({ queryKey, - enabled: Boolean(conversationId) && !isThisConversationStreaming && !isAwaitingPrompt, + enabled: + Boolean(conversationId) && + !isThisConversationStreaming && + !isAwaitingPrompt && + !hasUnpersistedError, queryFn: () => { if (!conversationId) { return Promise.reject(new Error('Invalid conversation id')); From 59813f748f8a53cf647f52b66760bf439bb3a2fa Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Tue, 5 May 2026 19:37:55 +0100 Subject: [PATCH 16/17] remove clean conversation dead code --- .../send_message/send_message_context.tsx | 4 ---- .../context/send_message/use_send_message.ts | 7 ------- .../send_message/use_send_message_mutation.ts | 17 ----------------- 3 files changed, 28 deletions(-) diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/send_message_context.tsx b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/send_message_context.tsx index ec924eac747a4..c237d0e171360 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/send_message_context.tsx +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/send_message_context.tsx @@ -35,7 +35,6 @@ export interface SendMessageContextValue { mutateSendMessage: (vars: SendMessageVars) => void; mutateResumeRound: (vars: ResumeRoundVars) => void; cancelActiveStream: () => void; - cleanConversation: (params: { conversationId?: string; hasError: boolean }) => void; removeError: (conversationId: string) => void; removeAllErrors: () => void; } @@ -131,7 +130,6 @@ export const SendMessageProvider = ({ children }: { children: React.ReactNode }) // whole object would re-evaluate every render. The individual fields below are stable. const sendMutate = sendMutation.mutate; const sendCancel = sendMutation.cancel; - const sendCleanConversation = sendMutation.cleanConversation; const resumeMutate = resumeMutation.mutate; const resumeCancel = resumeMutation.cancel; @@ -176,7 +174,6 @@ export const SendMessageProvider = ({ children }: { children: React.ReactNode }) mutateSendMessage, mutateResumeRound, cancelActiveStream, - cleanConversation: sendCleanConversation, removeError, removeAllErrors, }), @@ -186,7 +183,6 @@ export const SendMessageProvider = ({ children }: { children: React.ReactNode }) mutateSendMessage, mutateResumeRound, cancelActiveStream, - sendCleanConversation, removeError, removeAllErrors, ] diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message.ts index eee8bc7a5b6f9..6d888df2ec253 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message.ts @@ -33,7 +33,6 @@ export const useSendMessage = () => { mutateSendMessage, mutateResumeRound, cancelActiveStream, - cleanConversation: cleanConversationCtx, removeError: removeErrorCtx, } = useSendMessageContext(); @@ -147,10 +146,6 @@ export const useSendMessage = () => { cancelActiveStream(); }, [cancelActiveStream]); - const cleanConversation = useCallback(() => { - cleanConversationCtx({ conversationId, hasError: Boolean(record.error) }); - }, [cleanConversationCtx, conversationId, record.error]); - const removeError = useCallback(() => { if (conversationId) { removeErrorCtx(conversationId); @@ -164,7 +159,6 @@ export const useSendMessage = () => { resumeRound, retry, cancel, - cleanConversation, removeError, isResponseLoading, isResuming, @@ -181,7 +175,6 @@ export const useSendMessage = () => { resumeRound, retry, cancel, - cleanConversation, removeError, isResponseLoading, isResuming, diff --git a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message_mutation.ts b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message_mutation.ts index cfdba52327ca8..57154c5f8c4be 100644 --- a/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message_mutation.ts +++ b/x-pack/platform/plugins/shared/agent_builder/public/application/context/send_message/use_send_message_mutation.ts @@ -255,26 +255,9 @@ export const useSendMessageMutation = ({ abortControllerRef.current?.abort(); }, []); - const cleanConversation = useCallback( - ({ conversationId, hasError }: { conversationId?: string; hasError: boolean }) => { - // Cleaning the conversation is only invoked when the user wants to back out of a - // pending or errored state. If a stream is in flight, abort it; otherwise just clear - // any leftover error/pending state. - if (isLoading) { - abortControllerRef.current?.abort(); - return; - } - if (hasError && conversationId) { - clearPendingMessage(conversationId); - } - }, - [isLoading, clearPendingMessage] - ); - return { mutate, isLoading, cancel, - cleanConversation, }; }; From a25733310ec29d64acb040351f15cb10ee1494d6 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Wed, 6 May 2026 08:01:00 +0100 Subject: [PATCH 17/17] fix error handling test just add retry to wait --- .../converse/conversation_error_handling.ts | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/x-pack/platform/test/agent_builder_functional/tests/converse/conversation_error_handling.ts b/x-pack/platform/test/agent_builder_functional/tests/converse/conversation_error_handling.ts index a3e23ea281499..f73509a409ddb 100644 --- a/x-pack/platform/test/agent_builder_functional/tests/converse/conversation_error_handling.ts +++ b/x-pack/platform/test/agent_builder_functional/tests/converse/conversation_error_handling.ts @@ -296,10 +296,14 @@ export default function ({ getPageObjects, getService }: FtrProviderContext) { llmProxy ); - // Assert the first round is visible - const firstResponseElement = await testSubjects.find('agentBuilderRoundResponse'); - const firstResponseText = await firstResponseElement.getVisibleText(); - expect(firstResponseText).to.contain(FIRST_RESPONSE); + // Assert the first round is visible. Wrapped in retry.try because the streaming + // text component animates tokens (~17ms each) and the test can read the DOM before + // the last token has painted on resource-constrained CI runs. + await retry.try(async () => { + const firstResponseElement = await testSubjects.find('agentBuilderRoundResponse'); + const firstResponseText = await firstResponseElement.getVisibleText(); + expect(firstResponseText).to.contain(FIRST_RESPONSE); + }); // setup interceptors to return 400 error await setupAgentDirectError({ @@ -320,10 +324,12 @@ export default function ({ getPageObjects, getService }: FtrProviderContext) { await testSubjects.find('agentBuilderRoundError'); await testSubjects.existOrFail('agentBuilderRoundErrorRetryButton'); - // Assert the previous round is still visible - const previousResponseElement = await testSubjects.find('agentBuilderRoundResponse'); - const previousResponseText = await previousResponseElement.getVisibleText(); - expect(previousResponseText).to.contain(FIRST_RESPONSE); + // Assert the previous round is still visible. Same retry rationale as above. + await retry.try(async () => { + const previousResponseElement = await testSubjects.find('agentBuilderRoundResponse'); + const previousResponseText = await previousResponseElement.getVisibleText(); + expect(previousResponseText).to.contain(FIRST_RESPONSE); + }); }); }); }