diff --git a/packages/workflows/src/dag-executor.test.ts b/packages/workflows/src/dag-executor.test.ts index 6ae086b3bb..a02e2fba62 100644 --- a/packages/workflows/src/dag-executor.test.ts +++ b/packages/workflows/src/dag-executor.test.ts @@ -5490,6 +5490,71 @@ describe('executeDagWorkflow -- Claude SDK advanced options', () => { expect(failedData.error).toContain('permission denied'); }); + it('uses rejected Claude usage-limit details instead of SDK success subtype', async () => { + let callCount = 0; + mockSendQueryDag.mockImplementation(function* () { + callCount++; + if (callCount === 1) { + yield { type: 'assistant', content: 'ok' }; + yield { type: 'result', sessionId: 'sid-ok' }; + return; + } + yield { + type: 'rate_limit', + rateLimitInfo: { + status: 'rejected', + resetsAt: Math.floor(Date.now() / 1000) + 3600, + rateLimitType: 'weekly', + overageStatus: 'rejected', + }, + }; + yield { + type: 'result', + isError: true, + errorSubtype: 'success', + sessionId: 'sid-rate-limit', + }; + }); + + const store = createMockStore(); + const mockDeps = createMockDeps(store); + const platform = createMockPlatform(); + const workflowRun = makeWorkflowRun(); + + await executeDagWorkflow( + mockDeps, + platform, + 'conv-dag', + testDir, + { + name: 'rate-limit-test', + nodes: [ + { id: 'ok', prompt: 'first step' }, + { id: 'step1', command: 'my-cmd', depends_on: ['ok'] }, + ], + }, + workflowRun, + 'claude', + undefined, + join(testDir, 'artifacts'), + join(testDir, 'logs'), + 'main', + 'docs/', + minimalConfig + ); + + const failCalls = (store.failWorkflowRun as Mock<(id: string, msg: string) => Promise>) + .mock.calls; + expect(failCalls.length).toBeGreaterThan(0); + + const failureMessage = failCalls[0][1]; + expect(failureMessage).toContain('Claude usage limit hit'); + expect(failureMessage).toContain('weekly'); + expect(failureMessage).toContain('resets at'); + expect(failureMessage).toContain('overage rejected'); + expect(failureMessage).not.toContain('SDK returned success'); + }); + it('forwards workflow-level effort to node when no per-node override', async () => { const mockDeps = createMockDeps(); const platform = createMockPlatform(); diff --git a/packages/workflows/src/dag-executor.ts b/packages/workflows/src/dag-executor.ts index 3ba9824566..2755348934 100644 --- a/packages/workflows/src/dag-executor.ts +++ b/packages/workflows/src/dag-executor.ts @@ -81,6 +81,47 @@ function getLog(): ReturnType { const MCP_FAILURE_PREFIX = 'MCP server connection failed: '; +function formatUtcTimestamp(seconds: number): string { + return new Date(seconds * 1000).toISOString().replace(/\.000Z$/, 'Z'); +} + +function formatRateLimitReset(rateLimitInfo: Record): string { + const resetsAt = typeof rateLimitInfo.resetsAt === 'number' ? rateLimitInfo.resetsAt : undefined; + if (resetsAt === undefined) return 'reset time unknown'; + + const remainingMs = resetsAt * 1000 - Date.now(); + const remainingMinutes = Math.max(0, Math.ceil(remainingMs / 60000)); + return `resets at ${formatUtcTimestamp(resetsAt)} (${remainingMinutes} min remaining)`; +} + +function isRejectedRateLimit( + rateLimitInfo: Record | undefined +): rateLimitInfo is Record { + return rateLimitInfo?.status === 'rejected'; +} + +function formatSdkErrorMessage( + subject: string, + subtype: string, + errors: string[] | undefined, + lastRateLimitInfo: Record | undefined +): string { + if (isRejectedRateLimit(lastRateLimitInfo)) { + const limitType = + typeof lastRateLimitInfo.rateLimitType === 'string' + ? ` (${lastRateLimitInfo.rateLimitType})` + : ''; + const overage = + typeof lastRateLimitInfo.overageStatus === 'string' + ? `; overage ${lastRateLimitInfo.overageStatus}` + : ''; + return `${subject} failed: Claude usage limit hit${limitType}; ${formatRateLimitReset(lastRateLimitInfo)}${overage}.`; + } + + const errorsDetail = errors?.length ? ` — ${errors.join('; ')}` : ''; + return `${subject} failed: SDK returned ${subtype}${errorsDetail}`; +} + /** A failed MCP server entry parsed from the SDK message. `segment` is the * original substring (e.g. `"telegram (disconnected)"`) so callers can * reconstruct a filtered message without losing the status detail. */ @@ -675,6 +716,7 @@ async function executeNodeInternal( let nodeStopReason: string | undefined; let nodeNumTurns: number | undefined; let nodeModelUsage: Record | undefined; + let lastRateLimitInfo: Record | undefined; const batchMessages: string[] = []; // Create per-node abort controller for idle timeout cleanup @@ -893,21 +935,35 @@ async function executeNodeInternal( // failure — which let failed iterations masquerade as successes (#1208). if (msg.isError) { const subtype = msg.errorSubtype ?? 'unknown'; - const errorsDetail = msg.errors?.length ? ` — ${msg.errors.join('; ')}` : ''; + const errorMessage = formatSdkErrorMessage( + `Node '${node.id}'`, + subtype, + msg.errors, + lastRateLimitInfo + ); getLog().error( { nodeId: node.id, errorSubtype: subtype, errors: msg.errors, + rateLimitInfo: lastRateLimitInfo, sessionId: msg.sessionId, stopReason: msg.stopReason, durationMs: Date.now() - nodeStartTime, }, 'dag.node_sdk_error_result' ); - throw new Error(`Node '${node.id}' failed: SDK returned ${subtype}${errorsDetail}`); + throw new Error(errorMessage); } break; // Result is the "I'm done" signal — don't wait for subprocess to exit + } else if (msg.type === 'rate_limit') { + lastRateLimitInfo = msg.rateLimitInfo; + if (isRejectedRateLimit(lastRateLimitInfo)) { + getLog().warn( + { nodeId: node.id, rateLimitInfo: lastRateLimitInfo }, + 'dag.node_rate_limit_rejected' + ); + } } else if (msg.type === 'system' && msg.content) { // Providers yield system chunks for user-actionable issues (missing env // vars, Haiku+MCP, structured output failures, etc.). MCP-failure @@ -969,7 +1025,8 @@ async function executeNodeInternal( ); } } - // rate_limit chunks: already log.warn'd in claude.ts; not surfaced to SSE per design + // rate_limit chunks are surfaced through SDK error formatting when they + // reject the request. Warnings remain provider-log-only. } // When output_format is set and the provider returned structured_output, @@ -1764,6 +1821,7 @@ async function executeLoopNode( // Stream AI response for this iteration let fullOutput = ''; // raw, for signal detection let cleanOutput = ''; // stripped, for platform display + let lastRateLimitInfo: Record | undefined; let iterationIdleTimedOut = false; const iterationAbortController = new AbortController(); @@ -1855,23 +1913,35 @@ async function executeLoopNode( // like a "5-second crash" that kept burning iterations (#1208). if (msg.isError) { const subtype = msg.errorSubtype ?? 'unknown'; - const errorsDetail = msg.errors?.length ? ` — ${msg.errors.join('; ')}` : ''; + const errorMessage = formatSdkErrorMessage( + `Loop '${node.id}' iteration ${String(i)}`, + subtype, + msg.errors, + lastRateLimitInfo + ); getLog().error( { nodeId: node.id, iteration: i, errorSubtype: subtype, errors: msg.errors, + rateLimitInfo: lastRateLimitInfo, sessionId: msg.sessionId, stopReason: msg.stopReason, }, 'loop_node.iteration_sdk_error' ); - throw new Error( - `Loop '${node.id}' iteration ${String(i)} failed: SDK returned ${subtype}${errorsDetail}` - ); + throw new Error(errorMessage); } break; // Result is the "I'm done" signal — don't wait for subprocess to exit + } else if (msg.type === 'rate_limit') { + lastRateLimitInfo = msg.rateLimitInfo; + if (isRejectedRateLimit(lastRateLimitInfo)) { + getLog().warn( + { nodeId: node.id, iteration: i, rateLimitInfo: lastRateLimitInfo }, + 'loop_node.iteration_rate_limit_rejected' + ); + } } else if (msg.type === 'tool' && msg.toolName) { const now = Date.now(); @@ -1941,7 +2011,8 @@ async function executeLoopNode( } else if (msg.type === 'tool_result' && platform.sendStructuredEvent) { await platform.sendStructuredEvent(conversationId, msg); } - // rate_limit chunks: already log.warn'd in claude.ts; not surfaced to SSE per design + // rate_limit chunks are surfaced through SDK error formatting when they + // reject the request. Warnings remain provider-log-only. } } catch (error) { const err = error as Error;