): React.R
)}
+ {data.currentIteration !== undefined && data.maxIterations !== undefined && (
+
+ {data.currentIteration}/{data.maxIterations} iterations
+
+ )}
{data.error && (
{data.error.slice(0, 60)}
diff --git a/packages/web/src/components/workflows/WorkflowDagViewer.tsx b/packages/web/src/components/workflows/WorkflowDagViewer.tsx
index 2857d9771b..2eecf911d3 100644
--- a/packages/web/src/components/workflows/WorkflowDagViewer.tsx
+++ b/packages/web/src/components/workflows/WorkflowDagViewer.tsx
@@ -89,6 +89,8 @@ export function WorkflowDagViewer({
duration: live?.duration,
error: live?.error,
selected: node.id === selectedNodeId,
+ currentIteration: live?.currentIteration,
+ maxIterations: live?.maxIterations,
},
} as ExecutionFlowNode;
});
diff --git a/packages/web/src/components/workflows/WorkflowExecution.tsx b/packages/web/src/components/workflows/WorkflowExecution.tsx
index 6cfe0ac6bd..424281eb32 100644
--- a/packages/web/src/components/workflows/WorkflowExecution.tsx
+++ b/packages/web/src/components/workflows/WorkflowExecution.tsx
@@ -21,6 +21,7 @@ import type {
WorkflowRunStatus,
DagNodeState,
WorkflowStepStatus,
+ LoopIterationInfo,
} from '@/lib/types';
import type { WorkflowEventResponse } from '@/lib/api';
@@ -133,6 +134,49 @@ export function WorkflowExecution({ runId }: WorkflowExecutionProps): React.Reac
});
}
}
+
+ // Second pass: enrich loop nodes with iteration data
+ for (const e of data.events.filter(ev => ev.event_type.startsWith('loop_iteration_'))) {
+ const nodeId = e.step_name ?? '';
+ if (!nodeId) continue;
+ const existing = nodeMap.get(nodeId);
+ if (!existing) continue; // No node_started event yet — skip (events ordered in DB)
+
+ const iteration = e.data.iteration as number | undefined;
+ const maxIter = e.data.maxIterations as number | undefined;
+ if (iteration === undefined) continue;
+
+ let iterStatus: LoopIterationInfo['status'];
+ if (e.event_type === 'loop_iteration_started') {
+ iterStatus = 'running';
+ } else if (e.event_type === 'loop_iteration_completed') {
+ iterStatus = 'completed';
+ } else {
+ iterStatus = 'failed';
+ }
+
+ const existingIters: LoopIterationInfo[] = existing.iterations ?? [];
+ const iterIdx = existingIters.findIndex(it => it.iteration === iteration);
+ const iterState: LoopIterationInfo = {
+ iteration,
+ status: iterStatus,
+ duration: e.data.duration_ms as number | undefined,
+ };
+ const newIters = [...existingIters];
+ if (iterIdx >= 0) {
+ newIters[iterIdx] = iterState;
+ } else {
+ newIters.push(iterState);
+ }
+
+ nodeMap.set(nodeId, {
+ ...existing,
+ currentIteration: iteration,
+ maxIterations: maxIter ?? existing.maxIterations,
+ iterations: newIters,
+ });
+ }
+
return Array.from(nodeMap.values());
})(),
artifacts: data.events
diff --git a/packages/web/src/components/workflows/WorkflowLogs.tsx b/packages/web/src/components/workflows/WorkflowLogs.tsx
index cc1d1d6e2f..bf8dfda1ef 100644
--- a/packages/web/src/components/workflows/WorkflowLogs.tsx
+++ b/packages/web/src/components/workflows/WorkflowLogs.tsx
@@ -388,10 +388,31 @@ export function WorkflowLogs({
filteredDbMessages = dbMessages;
}
+ // Collect DB text content for dedup against SSE text messages.
+ // During live execution, the same text (e.g., "🚀 Starting workflow...") can appear
+ // in both DB (from REST fetch on mount) and SSE (from event buffer replay).
+ // Without dedup, the text shows up twice in the message list.
+ const dbTextContents = new Set();
+ for (const dm of filteredDbMessages) {
+ if (dm.role === 'assistant' && dm.content) {
+ dbTextContents.add(dm.content);
+ }
+ }
+
// Strip SSE tool calls that already appear in DB messages (completed).
+ // Also strip SSE text messages that are already in DB (prevents duplicate text).
const dedupedSse: ChatMessage[] = [];
for (const m of sseMessages) {
if (!m.toolCalls?.length) {
+ // Skip SSE text-only messages whose content already exists in DB.
+ if (m.content && dbTextContents.has(m.content)) {
+ continue;
+ }
+ // Also skip if DB has a message that starts with the SSE content
+ // (SSE text was flushed to DB before SSE finished accumulating).
+ if (m.content && [...dbTextContents].some(dc => dc.startsWith(m.content))) {
+ continue;
+ }
if (m.isStreaming || m.content) dedupedSse.push(m);
continue;
}
@@ -415,7 +436,32 @@ export function WorkflowLogs({
const onText = useCallback((content: string): void => {
setSseMessages(prev => {
const last = prev[prev.length - 1];
+ // Workflow status messages (🚀 start, ✅ complete) should be their own message,
+ // matching ChatInterface's behavior and persistence segmentation. Without this,
+ // all text concatenates into one giant streaming message, breaking text dedup
+ // against DB messages (which are stored as separate segments).
+ const isWorkflowStatus = /^[\u{1F680}\u{2705}]/u.test(content);
+
if (last?.role === 'assistant' && last.isStreaming) {
+ const lastIsWorkflowStatus = /^[\u{1F680}\u{2705}]/u.test(last.content);
+
+ if ((isWorkflowStatus && last.content) || (lastIsWorkflowStatus && !isWorkflowStatus)) {
+ // Close the current streaming message and start a new one when:
+ // 1. Incoming is a workflow status and current has content
+ // 2. Current is a workflow status and incoming is regular text
+ return [
+ ...prev.slice(0, -1),
+ { ...last, isStreaming: false },
+ {
+ id: `msg-${String(Date.now())}`,
+ role: 'assistant' as const,
+ content,
+ timestamp: Date.now(),
+ isStreaming: true,
+ toolCalls: [],
+ },
+ ];
+ }
return [...prev.slice(0, -1), { ...last, content: last.content + content }];
}
return [
diff --git a/packages/web/src/hooks/useDashboardSSE.ts b/packages/web/src/hooks/useDashboardSSE.ts
index cdf09daa0e..72380d4ce1 100644
--- a/packages/web/src/hooks/useDashboardSSE.ts
+++ b/packages/web/src/hooks/useDashboardSSE.ts
@@ -1,6 +1,11 @@
import { useEffect } from 'react';
import { workflowSSEHandlers } from '@/stores/workflow-store';
-import type { WorkflowStatusEvent, DagNodeEvent, WorkflowToolActivityEvent } from '@/lib/types';
+import type {
+ WorkflowStatusEvent,
+ DagNodeEvent,
+ WorkflowToolActivityEvent,
+ LoopIterationEvent,
+} from '@/lib/types';
/** Connects to the multiplexed dashboard SSE stream and routes events to the Zustand store. */
export function useDashboardSSE(): void {
@@ -25,6 +30,9 @@ export function useDashboardSSE(): void {
case 'workflow_tool_activity':
workflowSSEHandlers.onToolActivity(event as WorkflowToolActivityEvent);
break;
+ case 'workflow_step':
+ workflowSSEHandlers.onLoopIteration(event as LoopIterationEvent);
+ break;
// heartbeat — ignore
}
};
diff --git a/packages/web/src/hooks/useSSE.ts b/packages/web/src/hooks/useSSE.ts
index 8a9b152be9..390b779445 100644
--- a/packages/web/src/hooks/useSSE.ts
+++ b/packages/web/src/hooks/useSSE.ts
@@ -2,6 +2,7 @@ import { useEffect, useRef, useState, useCallback } from 'react';
import type {
SSEEvent,
ErrorDisplay,
+ LoopIterationEvent,
WorkflowStatusEvent,
WorkflowArtifactEvent,
WorkflowDispatchEvent,
@@ -37,6 +38,7 @@ interface SSEHandlers {
onWorkflowStatus?: (event: WorkflowStatusEvent) => void;
onWorkflowArtifact?: (event: WorkflowArtifactEvent) => void;
onDagNode?: (event: DagNodeEvent) => void;
+ onLoopIteration?: (event: LoopIterationEvent) => void;
onWorkflowDispatch?: (event: WorkflowDispatchEvent) => void;
onWorkflowOutputPreview?: (event: WorkflowOutputPreviewEvent) => void;
onWarning?: (message: string) => void;
@@ -187,6 +189,9 @@ export function useSSE(
case 'dag_node':
h.onDagNode?.(data);
break;
+ case 'workflow_step':
+ h.onLoopIteration?.(data);
+ break;
case 'workflow_dispatch':
// Flush buffered text before dispatch events to ensure the dispatch
// message (🚀) is committed as an assistant message before
diff --git a/packages/web/src/lib/types.ts b/packages/web/src/lib/types.ts
index 2e5edc16cb..b5284efae7 100644
--- a/packages/web/src/lib/types.ts
+++ b/packages/web/src/lib/types.ts
@@ -89,6 +89,26 @@ export interface WorkflowStatusEvent extends BaseSSEEvent {
approval?: { nodeId: string; message: string };
}
+// Loop iteration info (per-iteration state stored in DagNodeState)
+export interface LoopIterationInfo {
+ iteration: number;
+ status: 'running' | 'completed' | 'failed';
+ duration?: number;
+}
+
+// Loop iteration SSE event (emitted as 'workflow_step' by the bridge)
+export interface LoopIterationEvent extends BaseSSEEvent {
+ type: 'workflow_step';
+ runId: string;
+ nodeId?: string;
+ step: number;
+ total: number;
+ name: string;
+ status: 'running' | 'completed' | 'failed';
+ iteration: number;
+ duration?: number;
+}
+
// DAG node status (emitted during DAG workflow execution)
export interface DagNodeEvent extends BaseSSEEvent {
type: 'dag_node';
@@ -161,6 +181,7 @@ export type SSEEvent =
| HeartbeatEvent
| WorkflowStatusEvent
| DagNodeEvent
+ | LoopIterationEvent
| WorkflowToolActivityEvent
| WorkflowArtifactEvent
| WorkflowDispatchEvent
@@ -226,6 +247,9 @@ export interface DagNodeState {
duration?: number;
error?: string;
reason?: 'when_condition' | 'trigger_rule';
+ currentIteration?: number;
+ maxIterations?: number;
+ iterations?: LoopIterationInfo[];
}
export interface WorkflowArtifact {
diff --git a/packages/web/src/stores/workflow-store.test.ts b/packages/web/src/stores/workflow-store.test.ts
index 1dcaf4782a..f8e420d6cf 100644
--- a/packages/web/src/stores/workflow-store.test.ts
+++ b/packages/web/src/stores/workflow-store.test.ts
@@ -4,6 +4,7 @@ import type {
WorkflowStatusEvent,
WorkflowArtifactEvent,
DagNodeEvent,
+ LoopIterationEvent,
WorkflowState,
} from '@/lib/types';
@@ -324,3 +325,186 @@ describe('selectActiveWorkflow / activeWorkflowId', () => {
expect(useWorkflowStore.getState().activeWorkflowId).toBe('a');
});
});
+
+function loopIterationEvent(
+ overrides: Partial & { runId: string; iteration: number }
+): LoopIterationEvent {
+ return {
+ type: 'workflow_step',
+ nodeId: 'loop-node',
+ step: overrides.iteration - 1,
+ total: 5,
+ name: `iteration-${String(overrides.iteration)}`,
+ status: 'running',
+ timestamp: 1000,
+ ...overrides,
+ };
+}
+
+describe('handleLoopIteration', () => {
+ test('no-ops when event has no nodeId (non-DAG loop)', () => {
+ useWorkflowStore.getState().handleWorkflowStatus(statusEvent({ runId: 'run-li0' }));
+ const before = useWorkflowStore.getState().workflows;
+ useWorkflowStore
+ .getState()
+ .handleLoopIteration(
+ loopIterationEvent({ runId: 'run-li0', iteration: 1, nodeId: undefined })
+ );
+ // Map reference must not change — no mutation
+ expect(useWorkflowStore.getState().workflows).toBe(before);
+ });
+
+ test('no-ops when nodeId not yet in dagNodes', () => {
+ useWorkflowStore.getState().handleWorkflowStatus(statusEvent({ runId: 'run-li1' }));
+ useWorkflowStore
+ .getState()
+ .handleLoopIteration(
+ loopIterationEvent({ runId: 'run-li1', iteration: 1, nodeId: 'ghost-node' })
+ );
+ // Node was not registered — dagNodes must remain empty
+ const wf = useWorkflowStore.getState().workflows.get('run-li1')!;
+ expect(wf.dagNodes).toHaveLength(0);
+ });
+
+ test('appends first iteration to existing node', () => {
+ useWorkflowStore.getState().handleWorkflowStatus(statusEvent({ runId: 'run-li2' }));
+ useWorkflowStore
+ .getState()
+ .handleDagNode(dagNodeEvent({ runId: 'run-li2', nodeId: 'loop-node', name: 'My Loop' }));
+ useWorkflowStore.getState().handleLoopIteration(
+ loopIterationEvent({
+ runId: 'run-li2',
+ nodeId: 'loop-node',
+ iteration: 1,
+ total: 3,
+ status: 'running',
+ })
+ );
+ const wf = useWorkflowStore.getState().workflows.get('run-li2')!;
+ const node = wf.dagNodes.find(n => n.nodeId === 'loop-node')!;
+ expect(node.iterations).toHaveLength(1);
+ expect(node.iterations![0]).toEqual({ iteration: 1, status: 'running', duration: undefined });
+ expect(node.currentIteration).toBe(1);
+ expect(node.maxIterations).toBe(3);
+ });
+
+ test('updates existing iteration entry (upsert by iteration number)', () => {
+ useWorkflowStore.getState().handleWorkflowStatus(statusEvent({ runId: 'run-li3' }));
+ useWorkflowStore
+ .getState()
+ .handleDagNode(dagNodeEvent({ runId: 'run-li3', nodeId: 'loop-node', name: 'My Loop' }));
+ // First: started
+ useWorkflowStore.getState().handleLoopIteration(
+ loopIterationEvent({
+ runId: 'run-li3',
+ nodeId: 'loop-node',
+ iteration: 1,
+ status: 'running',
+ })
+ );
+ // Then: completed with duration
+ useWorkflowStore.getState().handleLoopIteration(
+ loopIterationEvent({
+ runId: 'run-li3',
+ nodeId: 'loop-node',
+ iteration: 1,
+ status: 'completed',
+ total: 0,
+ duration: 1500,
+ })
+ );
+ const wf = useWorkflowStore.getState().workflows.get('run-li3')!;
+ const node = wf.dagNodes.find(n => n.nodeId === 'loop-node')!;
+ expect(node.iterations).toHaveLength(1); // no duplicate
+ expect(node.iterations![0].status).toBe('completed');
+ expect(node.iterations![0].duration).toBe(1500);
+ });
+
+ test('preserves prior maxIterations when total: 0 (completed/failed events)', () => {
+ useWorkflowStore.getState().handleWorkflowStatus(statusEvent({ runId: 'run-li4' }));
+ useWorkflowStore
+ .getState()
+ .handleDagNode(dagNodeEvent({ runId: 'run-li4', nodeId: 'loop-node', name: 'My Loop' }));
+ // started with known total
+ useWorkflowStore.getState().handleLoopIteration(
+ loopIterationEvent({
+ runId: 'run-li4',
+ nodeId: 'loop-node',
+ iteration: 1,
+ total: 4,
+ status: 'running',
+ })
+ );
+ // completed with total: 0 (intentional bridge omission)
+ useWorkflowStore.getState().handleLoopIteration(
+ loopIterationEvent({
+ runId: 'run-li4',
+ nodeId: 'loop-node',
+ iteration: 1,
+ total: 0,
+ status: 'completed',
+ })
+ );
+ const node = useWorkflowStore
+ .getState()
+ .workflows.get('run-li4')!
+ .dagNodes.find(n => n.nodeId === 'loop-node')!;
+ expect(node.maxIterations).toBe(4); // preserved, not overwritten to 0
+ });
+
+ test('accumulates multiple distinct iterations', () => {
+ useWorkflowStore.getState().handleWorkflowStatus(statusEvent({ runId: 'run-li5' }));
+ useWorkflowStore
+ .getState()
+ .handleDagNode(dagNodeEvent({ runId: 'run-li5', nodeId: 'loop-node', name: 'My Loop' }));
+ for (let i = 1; i <= 3; i++) {
+ useWorkflowStore.getState().handleLoopIteration(
+ loopIterationEvent({
+ runId: 'run-li5',
+ nodeId: 'loop-node',
+ iteration: i,
+ status: 'completed',
+ })
+ );
+ }
+ const node = useWorkflowStore
+ .getState()
+ .workflows.get('run-li5')!
+ .dagNodes.find(n => n.nodeId === 'loop-node')!;
+ expect(node.iterations).toHaveLength(3);
+ expect(node.currentIteration).toBe(3);
+ });
+
+ test('preserves iteration data after node_completed dag event overwrites node', () => {
+ useWorkflowStore.getState().handleWorkflowStatus(statusEvent({ runId: 'run-li6' }));
+ useWorkflowStore
+ .getState()
+ .handleDagNode(dagNodeEvent({ runId: 'run-li6', nodeId: 'loop-node', name: 'My Loop' }));
+ useWorkflowStore.getState().handleLoopIteration(
+ loopIterationEvent({
+ runId: 'run-li6',
+ nodeId: 'loop-node',
+ iteration: 1,
+ total: 2,
+ status: 'completed',
+ })
+ );
+ // Simulate the loop node completing — handleDagNode must preserve the iteration data
+ useWorkflowStore.getState().handleDagNode(
+ dagNodeEvent({
+ runId: 'run-li6',
+ nodeId: 'loop-node',
+ name: 'My Loop',
+ status: 'completed',
+ duration: 5000,
+ })
+ );
+ const node = useWorkflowStore
+ .getState()
+ .workflows.get('run-li6')!
+ .dagNodes.find(n => n.nodeId === 'loop-node')!;
+ expect(node.status).toBe('completed');
+ expect(node.iterations).toHaveLength(1); // iteration data preserved after node completion
+ expect(node.maxIterations).toBe(2);
+ });
+});
diff --git a/packages/web/src/stores/workflow-store.ts b/packages/web/src/stores/workflow-store.ts
index 2c0964a01a..77f3739211 100644
--- a/packages/web/src/stores/workflow-store.ts
+++ b/packages/web/src/stores/workflow-store.ts
@@ -10,6 +10,8 @@ import type {
WorkflowArtifactEvent,
DagNodeEvent,
WorkflowToolActivityEvent,
+ LoopIterationEvent,
+ LoopIterationInfo,
} from '@/lib/types';
interface WorkflowStoreState {
@@ -19,6 +21,7 @@ interface WorkflowStoreState {
handleWorkflowStatus: (event: WorkflowStatusEvent) => void;
handleWorkflowArtifact: (event: WorkflowArtifactEvent) => void;
handleDagNode: (event: DagNodeEvent) => void;
+ handleLoopIteration: (event: LoopIterationEvent) => void;
handleWorkflowToolActivity: (event: WorkflowToolActivityEvent) => void;
hydrateWorkflow: (state: WorkflowState) => void;
}
@@ -244,6 +247,7 @@ export const useWorkflowStore = create()(
const existingIdx = dagNodes.findIndex(n => n.nodeId === event.nodeId);
const nodeState: DagNodeState = {
+ ...(existingIdx >= 0 ? dagNodes[existingIdx] : {}), // preserve accumulated iteration state
nodeId: event.nodeId,
name: event.name,
status: event.status,
@@ -265,6 +269,42 @@ export const useWorkflowStore = create()(
);
},
+ handleLoopIteration: (event: LoopIterationEvent): void => {
+ if (!event.nodeId) return; // Non-DAG loops have no nodeId — skip
+ set(
+ state =>
+ updateWorkflow(state, event.runId, wf => {
+ const dagNodes = [...wf.dagNodes];
+ const existingIdx = dagNodes.findIndex(n => n.nodeId === event.nodeId);
+ if (existingIdx < 0) return wf; // Node not yet in store — loop iteration may arrive before dag_node event in SSE ordering. Intentional silent drop.
+
+ const existing = dagNodes[existingIdx];
+ const iterations: LoopIterationInfo[] = [...(existing.iterations ?? [])];
+ const iterIdx = iterations.findIndex(it => it.iteration === event.iteration);
+ const iterState: LoopIterationInfo = {
+ iteration: event.iteration,
+ status: event.status,
+ duration: event.duration,
+ };
+ if (iterIdx >= 0) {
+ iterations[iterIdx] = iterState;
+ } else {
+ iterations.push(iterState);
+ }
+
+ dagNodes[existingIdx] = {
+ ...existing,
+ currentIteration: event.iteration,
+ maxIterations: event.total > 0 ? event.total : existing.maxIterations,
+ iterations,
+ };
+ return { ...wf, dagNodes };
+ }),
+ undefined,
+ 'workflow/loopIteration'
+ );
+ },
+
handleWorkflowToolActivity: (event: WorkflowToolActivityEvent): void => {
set(
state =>
@@ -316,13 +356,19 @@ export function selectActiveWorkflow(state: WorkflowStoreState): WorkflowState |
// Stable SSE handler object — actions are defined once in create(), so references never change.
// Shared by ChatInterface and WorkflowLogs instead of per-component useShallow selectors.
-const { handleWorkflowStatus, handleWorkflowArtifact, handleDagNode, handleWorkflowToolActivity } =
- useWorkflowStore.getState();
+const {
+ handleWorkflowStatus,
+ handleWorkflowArtifact,
+ handleDagNode,
+ handleLoopIteration,
+ handleWorkflowToolActivity,
+} = useWorkflowStore.getState();
export const workflowSSEHandlers = {
onWorkflowStatus: handleWorkflowStatus,
onWorkflowArtifact: handleWorkflowArtifact,
onDagNode: handleDagNode,
+ onLoopIteration: handleLoopIteration,
onToolActivity: handleWorkflowToolActivity,
} as const;
diff --git a/packages/workflows/src/dag-executor.ts b/packages/workflows/src/dag-executor.ts
index 5427c1974f..facfbd1068 100644
--- a/packages/workflows/src/dag-executor.ts
+++ b/packages/workflows/src/dag-executor.ts
@@ -1915,7 +1915,9 @@ async function executeLoopNode(
if (platform.getStreamingMode() === 'stream') {
const toolMsg = formatToolCall(msg.toolName, msg.toolInput);
if (toolMsg) {
- await safeSendMessage(platform, conversationId, toolMsg, msgContext);
+ await safeSendMessage(platform, conversationId, toolMsg, msgContext, {
+ category: 'tool_call_formatted',
+ } as WorkflowMessageMetadata);
}
if (platform.sendStructuredEvent) {
await platform.sendStructuredEvent(conversationId, msg);