Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d77023a
create conversation with uuid if does not exist
chrisbmar May 1, 2026
e07d494
remove new cache key
chrisbmar May 1, 2026
4619e63
lift streaming provider to global not just conversation scoped
chrisbmar May 1, 2026
aab405c
fix error handling from sidebar and everywhere
chrisbmar May 1, 2026
66f547a
introduce and name properly is any conversation streaming
chrisbmar May 1, 2026
f6ec749
fix streaming text
chrisbmar May 1, 2026
e563894
remove connector logic from stream provider
chrisbmar May 1, 2026
27fc300
create global stream provider
chrisbmar May 1, 2026
604ee61
stream mutations
chrisbmar May 1, 2026
7c65442
updated chat events
chrisbmar May 1, 2026
c8416f9
update use conversation gating condition - allows users to change con…
chrisbmar May 1, 2026
7f8c14a
fix test
chrisbmar May 1, 2026
1efe986
disable hitl buttons if any other stream is in flight in another conv…
chrisbmar May 1, 2026
fbbaec9
update contributor guide
chrisbmar May 1, 2026
c2128c0
Merge branch 'main' into ab-14293-remove-new-cache-key
chrisbmar May 1, 2026
ee476c8
Merge branch 'main' into ab-14293-remove-new-cache-key
chrisbmar May 5, 2026
7fbbb2d
fix error handling
chrisbmar May 5, 2026
bd6dfcb
Merge branch 'main' into ab-14293-remove-new-cache-key
chrisbmar May 5, 2026
59813f7
remove clean conversation dead code
chrisbmar May 5, 2026
a257333
fix error handling test just add retry to wait
chrisbmar May 6, 2026
f6cb46b
Merge branch 'main' into ab-14293-remove-new-cache-key
chrisbmar May 6, 2026
ce1d522
Merge branch 'main' into ab-14293-remove-new-cache-key
chrisbmar May 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1280,3 +1280,98 @@ setupDeps.agentBuilder.sml.registerType(visualizationSmlType);

The full implementation is ~130 lines and serves as the reference for new types.

## Streams lifecycle (frontend)

The chat streaming layer lives in `public/application/context/send_message/`. This section
documents how mutations, lifted state, and the React Query cache fit together. Read this
before touching any of:
`send_message_context.tsx`, `use_send_message.ts`, `use_send_message_mutation.ts`,
`use_resume_round_mutation.ts`, `use_subscribe_to_chat_events.ts`,
`use_is_any_conversation_streaming.ts`.

### The lift

`<SendMessageProvider>` is mounted **once** above the routes/sidebar:

- Routed app: in `mount.tsx`, above `<AgentBuilderRoutes>`. The sidebar is part of the
routes, so it can read streaming state directly.
- Embeddable: in `embeddable_conversations_provider.tsx`, one provider per embeddable
instance because each instance has its own `QueryClient`.

The sidebar uses `useIsAnyConversationStreaming()` and `useSendMessageContext()` directly.
Anything inside the conversation tree should use the per-conversation scoped hook,
`useSendMessage()` (in `use_send_message.ts`).

### Lifted state

`SendMessageProvider` owns:

- `activeStream: { conversationId, type, agentReasoning } | undefined` — points at the
conversation that is currently streaming. Set synchronously when each mutation kicks
off; cleared in the mutation's `finally`.
- `byConversationId: Record<string, StreamRecord>` — per-conversation `pendingMessage`,
`error`, `errorSteps`. Persists across stream end so a user can hit Retry after a
failure.

### Mutations: single-scope `mutationFn`

`useSendMessageMutation` and `useResumeRoundMutation` use a **single-scope `mutationFn`
with `try / catch / finally`**, not React Query's lifecycle methods (`onMutate`,
`onSuccess`, `onError`, `onSettled`). The shape is:

```ts
mutationFn: async (vars) => {
// setup phase (sync, before any await): seed the optimistic round, set pending message.
// Note: `activeStream` is set by the provider's `mutateSendMessage` wrapper *before*
// `mutate()` returns — not here.
const streamActions = createConversationActions({ conversationId: vars.conversationId, ... });

try {
await subscribeToChatEvents({ events$, conversationActions: streamActions, ... });
// success cleanup
} catch (err) {
// error cleanup
throw err;
} finally {
// cleanup: invalidate cache (skipped if round paused on HITL),
// clear `activeStream`, clear abort ref.
}
}
```

**Why not lifecycle methods?** Streams aren't typical mutations — the bulk of the work
happens *during* `mutationFn`, with state mutations flowing for many seconds. Splitting
the work across lifecycle callbacks forces you to bridge state between scopes via refs
or React Query's `context` return — neither is clean for streaming. With single-scope,
`streamActions` and `vars` are visible throughout. No refs to bridge phases. Reads
top-to-bottom.

### Each conversation owns its streaming lifecycle

Every `mutationFn` invocation builds its **own** `streamActions` instance via
`createConversationActions({ conversationId: vars.conversationId, ... })`. That instance
is closure-bound to the mutation's conversation id. **Stream events target the
conversation the mutation was started for, regardless of where the user has navigated.**

If the user submits on conversation A and immediately switches to B, the stream events
keep writing to A's cache. B loads cleanly from the server.

### Per-conversation `useConversation` gate

`useConversation` is disabled for a conversation when (a) a stream is currently writing
to its cache, or (b) the cache shows it's paused on a HITL prompt. The cache is
authoritative in both cases, so a refetch would race with optimistic chunks (streaming)
or with the resume mutation about to fire on Approve (HITL). Other conversations stay
free to refetch — switch to conversation B while A streams and B loads cleanly. See the
inline comment on the `enabled` predicate for details.

### Single-stream vs concurrent streams

Today the app enforces single-stream-at-a-time. The global gates (HITL Approve,
submit button, page-leave guard) all read `useIsAnyConversationStreaming()`.

The architecture supports concurrent streams in principle — per-conversation cache,
mutation-scoped `streamActions`, lifted `byConversationId`. The follow-up PR removes
the global gates, moves the abort controller into a per-conversation slot so each
stream can be cancelled independently, and enables concurrent streams.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { NewConversationPrompt } from './new_conversation_prompt';
import { useConversationId } from '../../context/conversation/use_conversation_id';
import { useShouldStickToBottom } from '../../context/conversation/use_should_stick_to_bottom';
import { useSendMessage } from '../../context/send_message/send_message_context';
import { useIsAnyConversationStreaming } from '../../hooks/use_is_any_conversation_streaming';
import { useConversationScrollActions } from '../../hooks/use_conversation_scroll_actions';
import { useConversationStatus } from '../../hooks/use_conversation';
import { useSendPredefinedInitialMessage } from '../../hooks/use_initial_message';
Expand Down Expand Up @@ -53,6 +54,7 @@ export const Conversation: React.FC<{}> = () => {
const conversationId = useConversationId();
const hasActiveConversation = useHasActiveConversation();
const { isResponseLoading } = useSendMessage();
const isAnyStreaming = useIsAnyConversationStreaming();
const conversationRounds = useConversationRounds();
const lastRound = conversationRounds.at(-1);
const { isFetched } = useConversationStatus();
Expand All @@ -65,9 +67,10 @@ export const Conversation: React.FC<{}> = () => {
const [dismissStaleAttachments, setDismissStaleAttachments] = useState(false);
useSendPredefinedInitialMessage();

// Page-leave guard fires for any in-flight stream, not just this conversation's.
useNavigationAbort({
onAppLeave,
isResponseLoading,
isResponseLoading: isAnyStreaming,
});

const scrollContainerRef = useRef<HTMLDivElement | null>(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ import type { PropsWithChildren } from 'react';
import React, { useEffect, useMemo } from 'react';
import { useConversationId } from '../../../context/conversation/use_conversation_id';
import { useSendMessage } from '../../../context/send_message/send_message_context';
import { useSubmitMessage } from '../../../hooks/use_submit_message';
import { useAgentBuilderAgents } from '../../../hooks/agents/use_agents';
import { useValidateAgentId } from '../../../hooks/agents/use_validate_agent_id';
import { useIsSendingMessage } from '../../../hooks/use_is_sending_message';
// Submit is gated globally on any-conversation streaming until concurrent streams are
// unblocked in a future PR — at which point it becomes a per-conversation check.
import { useIsAnyConversationStreaming } from '../../../hooks/use_is_any_conversation_streaming';
import {
useAgentId,
useConversationTitle,
Expand Down Expand Up @@ -144,8 +147,8 @@ export const ConversationInput: React.FC<ConversationInputProps> = ({
onSubmit,
onEditorFocus,
}) => {
const isSendingMessage = useIsSendingMessage();
const { sendMessage, pendingMessage, error, isResuming } = useSendMessage();
const isSendingMessage = useIsAnyConversationStreaming();
const { pendingMessage, error, isResuming } = useSendMessage();
const { isFetched } = useAgentBuilderAgents();
const agentId = useAgentId();
const conversationId = useConversationId();
Expand All @@ -158,6 +161,7 @@ export const ConversationInput: React.FC<ConversationInputProps> = ({
const isAwaitingPrompt = useIsAwaitingPrompt();
const { attachments, initialMessage, autoSendInitialMessage, resetInitialMessage } =
useConversationContext();
const submitMessage = useSubmitMessage();

const validateAgentId = useValidateAgentId();
const isAgentIdValid = validateAgentId(agentId);
Expand Down Expand Up @@ -241,7 +245,7 @@ export const ConversationInput: React.FC<ConversationInputProps> = ({
}
return;
}
sendMessage({ message: content });
submitMessage(content);
messageEditorController.clear();
onSubmit?.();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ jest.mock('../../../../../hooks/use_kibana', () => ({
useKibana: jest.fn(),
}));

jest.mock('../../../../../context/send_message/send_message_context', () => ({
useSendMessage: jest.fn(),
jest.mock('../../../../../hooks/chat/use_connector_selection', () => ({
useConnectorSelection: jest.fn(),
}));

jest.mock('../../../../../hooks/chat/use_default_connector', () => ({
Expand Down Expand Up @@ -74,13 +74,15 @@ jest.mock('./connector_icon', () => ({

import { useLoadConnectors } from '@kbn/inference-connectors';
import { useKibana } from '../../../../../hooks/use_kibana';
import { useSendMessage } from '../../../../../context/send_message/send_message_context';
import { useConnectorSelection } from '../../../../../hooks/chat/use_connector_selection';
import { useDefaultConnector } from '../../../../../hooks/chat/use_default_connector';
import { ConnectorSelector } from './connector_selector';

const mockUseLoadConnectors = useLoadConnectors as jest.MockedFunction<typeof useLoadConnectors>;
const mockUseKibana = useKibana as jest.MockedFunction<typeof useKibana>;
const mockUseSendMessage = useSendMessage as jest.MockedFunction<typeof useSendMessage>;
const mockUseConnectorSelection = useConnectorSelection as jest.MockedFunction<
typeof useConnectorSelection
>;
const mockUseDefaultConnector = useDefaultConnector as jest.MockedFunction<
typeof useDefaultConnector
>;
Expand Down Expand Up @@ -137,14 +139,12 @@ const setup = ({

const selectConnector = jest.fn();

mockUseSendMessage.mockReturnValue({
connectorSelection: {
selectedConnector,
selectConnector,
defaultConnectorId,
defaultConnectorOnly,
},
} as any);
mockUseConnectorSelection.mockReturnValue({
selectedConnector,
selectConnector,
defaultConnectorId,
defaultConnectorOnly,
});

const utils = render(
<IntlProvider locale="en">
Expand All @@ -154,17 +154,15 @@ const setup = ({
return {
...utils,
selectConnector,
// Helper to re-render with a new send-message context (simulates admin changing a setting).
// Helper to re-render with a new connector selection (simulates admin changing a setting).
updateContext: (next: Partial<RenderOptions>) => {
mockUseSendMessage.mockReturnValue({
connectorSelection: {
selectedConnector: next.selectedConnector ?? selectedConnector,
selectConnector,
defaultConnectorId:
'defaultConnectorId' in next ? next.defaultConnectorId : defaultConnectorId,
defaultConnectorOnly: next.defaultConnectorOnly ?? defaultConnectorOnly,
},
} as any);
mockUseConnectorSelection.mockReturnValue({
selectedConnector: next.selectedConnector ?? selectedConnector,
selectConnector,
defaultConnectorId:
'defaultConnectorId' in next ? next.defaultConnectorId : defaultConnectorId,
defaultConnectorOnly: next.defaultConnectorOnly ?? defaultConnectorOnly,
});
act(() => {
utils.rerender(
<IntlProvider locale="en">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { FormattedMessage } from '@kbn/i18n-react';
import React, { useEffect, useMemo, useRef, useState } from 'react';
import { useUiPrivileges } from '../../../../../hooks/use_ui_privileges';
import { useNavigation } from '../../../../../hooks/use_navigation';
import { useSendMessage } from '../../../../../context/send_message/send_message_context';
import { useConnectorSelection } from '../../../../../hooks/chat/use_connector_selection';
import { useDefaultConnector } from '../../../../../hooks/chat/use_default_connector';
import { useKibana } from '../../../../../hooks/use_kibana';
import {
Expand Down Expand Up @@ -165,13 +165,11 @@ export const ConnectorSelector: React.FC<{}> = () => {
services: { http, settings },
} = useKibana();
const {
connectorSelection: {
selectConnector: onSelectConnector,
selectedConnector: selectedConnectorId,
defaultConnectorId,
defaultConnectorOnly,
},
} = useSendMessage();
selectConnector: onSelectConnector,
selectedConnector: selectedConnectorId,
defaultConnectorId,
defaultConnectorOnly,
} = useConnectorSelection();
const [isPopoverOpen, setIsPopoverOpen] = useState(false);

const { data: aiConnectors, isLoading } = useLoadConnectors({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { RoundInput } from './round_input';
import { RoundThinking } from './round_thinking/round_thinking';
import { RoundResponse } from './round_response/round_response';
import { useSendMessage } from '../../../context/send_message/send_message_context';
import { useIsAnyConversationStreaming } from '../../../hooks/use_is_any_conversation_streaming';
import { RoundError } from './round_error/round_error';
import { ConfirmationPrompt } from './round_prompt';
import { RoundAttachmentReferences } from './round_attachment_references';
Expand Down Expand Up @@ -88,6 +89,12 @@ export const RoundLayout: React.FC<RoundLayoutProps> = ({
resumeRound,
isResuming,
} = useSendMessage();
// Approve / Cancel for HITL must be gated on global streaming state: while ANY other
// conversation is streaming, racing two mutations against the same single-stream
// backend would corrupt cache state. This becomes a per-conversation check in the
// concurrent-streams follow-up PR.
const isAnyStreaming = useIsAnyConversationStreaming();
const isHitlDisabled = isAnyStreaming && !isResuming;

const isLoadingCurrentRound = isResponseLoading && isCurrentRound;
const isErrorCurrentRound = Boolean(error) && isCurrentRound;
Expand Down Expand Up @@ -193,6 +200,7 @@ export const RoundLayout: React.FC<RoundLayoutProps> = ({
onConfirm={() => handlePromptResponse(prompt.id, true)}
onCancel={() => handlePromptResponse(prompt.id, false)}
isLoading={isResuming}
isDisabled={isHitlDisabled}
isAnswered={promptResponses[prompt.id] !== undefined}
answeredValue={promptResponses[prompt.id]?.allow}
/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export interface ConfirmationPromptProps {
onConfirm: () => void;
onCancel: () => void;
isLoading?: boolean;
isDisabled?: boolean;
isAnswered?: boolean;
answeredValue?: boolean;
}
Expand All @@ -59,6 +60,7 @@ export const ConfirmationPrompt: React.FC<ConfirmationPromptProps> = ({
onConfirm,
onCancel,
isLoading = false,
isDisabled = false,
isAnswered = false,
answeredValue,
}) => {
Expand Down Expand Up @@ -120,7 +122,7 @@ export const ConfirmationPrompt: React.FC<ConfirmationPromptProps> = ({
<EuiFlexItem grow={false}>
<EuiButtonEmpty
onClick={onCancel}
disabled={isLoading || isAnswered}
disabled={isDisabled || isLoading || isAnswered}
size="s"
color={isAnswered && answeredValue === false ? 'danger' : 'text'}
data-test-subj="agentBuilderConfirmationPromptCancelButton"
Expand All @@ -132,7 +134,7 @@ export const ConfirmationPrompt: React.FC<ConfirmationPromptProps> = ({
<EuiButton
onClick={onConfirm}
isLoading={isLoading}
disabled={isAnswered}
disabled={isDisabled || isAnswered}
fill={!isAnswered || answeredValue === true}
size="s"
color={isAnswered && answeredValue === true ? 'success' : color}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,9 @@ describe('chat_message_text', () => {
isEmbeddedContext: false,
browserApiTools: undefined,
conversationActions: {
removeNewConversationQuery: jest.fn(),
invalidateConversation: jest.fn(),
addOptimisticRound: jest.fn(),
removeOptimisticRound: jest.fn(),
setAgentId: jest.fn(),
addReasoningStep: jest.fn(),
addToolCall: jest.fn(),
setToolCallProgress: jest.fn(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ export const StreamingText = ({
attachmentRefs,
conversationId,
}: StreamingTextProps) => {
const [displayedText, setDisplayedText] = useState('');
// Initial state derives from the content already in the cache so navigating away and back
// mid-stream doesn't replay the full text. Only chunks arriving AFTER mount get animated.
const [displayedText, setDisplayedText] = useState(content);
const tokenQueueRef = useRef<string[]>([]);
const intervalRef = useRef<NodeJS.Timeout | null>(null);
const previousContentLengthRef = useRef(0);
const previousContentLengthRef = useRef(content.length);

useEffect(() => {
const previousContentLength = previousContentLengthRef.current;
Expand Down
Loading
Loading