Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions packages/workflows/src/dag-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5490,6 +5490,71 @@ describe('executeDagWorkflow -- Claude SDK advanced options', () => {
expect(failedData.error).toContain('permission denied');
});

it('uses rejected Claude usage-limit details instead of SDK success subtype', async () => {
let callCount = 0;
mockSendQueryDag.mockImplementation(function* () {
callCount++;
if (callCount === 1) {
yield { type: 'assistant', content: 'ok' };
yield { type: 'result', sessionId: 'sid-ok' };
return;
}
yield {
type: 'rate_limit',
rateLimitInfo: {
status: 'rejected',
resetsAt: Math.floor(Date.now() / 1000) + 3600,
rateLimitType: 'weekly',
overageStatus: 'rejected',
},
};
yield {
type: 'result',
isError: true,
errorSubtype: 'success',
sessionId: 'sid-rate-limit',
};
});

const store = createMockStore();
const mockDeps = createMockDeps(store);
const platform = createMockPlatform();
const workflowRun = makeWorkflowRun();

await executeDagWorkflow(
mockDeps,
platform,
'conv-dag',
testDir,
{
name: 'rate-limit-test',
nodes: [
{ id: 'ok', prompt: 'first step' },
{ id: 'step1', command: 'my-cmd', depends_on: ['ok'] },
],
},
workflowRun,
'claude',
undefined,
join(testDir, 'artifacts'),
join(testDir, 'logs'),
'main',
'docs/',
minimalConfig
);

const failCalls = (store.failWorkflowRun as Mock<(id: string, msg: string) => Promise<void>>)
.mock.calls;
expect(failCalls.length).toBeGreaterThan(0);

const failureMessage = failCalls[0][1];
expect(failureMessage).toContain('Claude usage limit hit');
expect(failureMessage).toContain('weekly');
expect(failureMessage).toContain('resets at');
expect(failureMessage).toContain('overage rejected');
expect(failureMessage).not.toContain('SDK returned success');
});

it('forwards workflow-level effort to node when no per-node override', async () => {
const mockDeps = createMockDeps();
const platform = createMockPlatform();
Expand Down
87 changes: 79 additions & 8 deletions packages/workflows/src/dag-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,47 @@ function getLog(): ReturnType<typeof createLogger> {

const MCP_FAILURE_PREFIX = 'MCP server connection failed: ';

function formatUtcTimestamp(seconds: number): string {
return new Date(seconds * 1000).toISOString().replace(/\.000Z$/, 'Z');
}

function formatRateLimitReset(rateLimitInfo: Record<string, unknown>): string {
const resetsAt = typeof rateLimitInfo.resetsAt === 'number' ? rateLimitInfo.resetsAt : undefined;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Normalize rate_limit_info keys before formatting

formatRateLimitReset/formatSdkErrorMessage read camelCase fields (resetsAt, rateLimitType, overageStatus), but the Claude provider passes through rate_limit_info from the SDK without key normalization. In the common case where that payload is snake_case, this branch will miss the reset/type/overage metadata and produce reset time unknown, so usage-limit failures still lose the actionable details this change is intended to surface.

Useful? React with 👍 / 👎.

if (resetsAt === undefined) return 'reset time unknown';

const remainingMs = resetsAt * 1000 - Date.now();
const remainingMinutes = Math.max(0, Math.ceil(remainingMs / 60000));
return `resets at ${formatUtcTimestamp(resetsAt)} (${remainingMinutes} min remaining)`;
}

function isRejectedRateLimit(
rateLimitInfo: Record<string, unknown> | undefined
): rateLimitInfo is Record<string, unknown> {
return rateLimitInfo?.status === 'rejected';
}

function formatSdkErrorMessage(
subject: string,
subtype: string,
errors: string[] | undefined,
lastRateLimitInfo: Record<string, unknown> | undefined
): string {
if (isRejectedRateLimit(lastRateLimitInfo)) {
const limitType =
typeof lastRateLimitInfo.rateLimitType === 'string'
? ` (${lastRateLimitInfo.rateLimitType})`
: '';
const overage =
typeof lastRateLimitInfo.overageStatus === 'string'
? `; overage ${lastRateLimitInfo.overageStatus}`
: '';
return `${subject} failed: Claude usage limit hit${limitType}; ${formatRateLimitReset(lastRateLimitInfo)}${overage}.`;
}

const errorsDetail = errors?.length ? ` — ${errors.join('; ')}` : '';
return `${subject} failed: SDK returned ${subtype}${errorsDetail}`;
}

/** A failed MCP server entry parsed from the SDK message. `segment` is the
* original substring (e.g. `"telegram (disconnected)"`) so callers can
* reconstruct a filtered message without losing the status detail. */
Expand Down Expand Up @@ -675,6 +716,7 @@ async function executeNodeInternal(
let nodeStopReason: string | undefined;
let nodeNumTurns: number | undefined;
let nodeModelUsage: Record<string, unknown> | undefined;
let lastRateLimitInfo: Record<string, unknown> | undefined;
const batchMessages: string[] = [];

// Create per-node abort controller for idle timeout cleanup
Expand Down Expand Up @@ -893,21 +935,35 @@ async function executeNodeInternal(
// failure — which let failed iterations masquerade as successes (#1208).
if (msg.isError) {
const subtype = msg.errorSubtype ?? 'unknown';
const errorsDetail = msg.errors?.length ? ` — ${msg.errors.join('; ')}` : '';
const errorMessage = formatSdkErrorMessage(
`Node '${node.id}'`,
subtype,
msg.errors,
lastRateLimitInfo
);
getLog().error(
{
nodeId: node.id,
errorSubtype: subtype,
errors: msg.errors,
rateLimitInfo: lastRateLimitInfo,
sessionId: msg.sessionId,
stopReason: msg.stopReason,
durationMs: Date.now() - nodeStartTime,
},
'dag.node_sdk_error_result'
);
throw new Error(`Node '${node.id}' failed: SDK returned ${subtype}${errorsDetail}`);
throw new Error(errorMessage);
}
break; // Result is the "I'm done" signal — don't wait for subprocess to exit
} else if (msg.type === 'rate_limit') {
lastRateLimitInfo = msg.rateLimitInfo;
if (isRejectedRateLimit(lastRateLimitInfo)) {
getLog().warn(
{ nodeId: node.id, rateLimitInfo: lastRateLimitInfo },
'dag.node_rate_limit_rejected'
);
}
} else if (msg.type === 'system' && msg.content) {
// Providers yield system chunks for user-actionable issues (missing env
// vars, Haiku+MCP, structured output failures, etc.). MCP-failure
Expand Down Expand Up @@ -969,7 +1025,8 @@ async function executeNodeInternal(
);
}
}
// rate_limit chunks: already log.warn'd in claude.ts; not surfaced to SSE per design
// rate_limit chunks are surfaced through SDK error formatting when they
// reject the request. Warnings remain provider-log-only.
}

// When output_format is set and the provider returned structured_output,
Expand Down Expand Up @@ -1764,6 +1821,7 @@ async function executeLoopNode(
// Stream AI response for this iteration
let fullOutput = ''; // raw, for signal detection
let cleanOutput = ''; // stripped, for platform display
let lastRateLimitInfo: Record<string, unknown> | undefined;
let iterationIdleTimedOut = false;
const iterationAbortController = new AbortController();

Expand Down Expand Up @@ -1855,23 +1913,35 @@ async function executeLoopNode(
// like a "5-second crash" that kept burning iterations (#1208).
if (msg.isError) {
const subtype = msg.errorSubtype ?? 'unknown';
const errorsDetail = msg.errors?.length ? ` — ${msg.errors.join('; ')}` : '';
const errorMessage = formatSdkErrorMessage(
`Loop '${node.id}' iteration ${String(i)}`,
subtype,
msg.errors,
lastRateLimitInfo
);
getLog().error(
{
nodeId: node.id,
iteration: i,
errorSubtype: subtype,
errors: msg.errors,
rateLimitInfo: lastRateLimitInfo,
sessionId: msg.sessionId,
stopReason: msg.stopReason,
},
'loop_node.iteration_sdk_error'
);
throw new Error(
`Loop '${node.id}' iteration ${String(i)} failed: SDK returned ${subtype}${errorsDetail}`
);
throw new Error(errorMessage);
}
break; // Result is the "I'm done" signal — don't wait for subprocess to exit
} else if (msg.type === 'rate_limit') {
lastRateLimitInfo = msg.rateLimitInfo;
if (isRejectedRateLimit(lastRateLimitInfo)) {
getLog().warn(
{ nodeId: node.id, iteration: i, rateLimitInfo: lastRateLimitInfo },
'loop_node.iteration_rate_limit_rejected'
);
}
} else if (msg.type === 'tool' && msg.toolName) {
const now = Date.now();

Expand Down Expand Up @@ -1941,7 +2011,8 @@ async function executeLoopNode(
} else if (msg.type === 'tool_result' && platform.sendStructuredEvent) {
await platform.sendStructuredEvent(conversationId, msg);
}
// rate_limit chunks: already log.warn'd in claude.ts; not surfaced to SSE per design
// rate_limit chunks are surfaced through SDK error formatting when they
// reject the request. Warnings remain provider-log-only.
}
} catch (error) {
const err = error as Error;
Expand Down