Skip to content

Commit b78bb0d

Browse files
DX-2021: Add retryDelay option (#123)
* refactor: bump qstash * feat: add retryDelay parameter * feat: add retryDelay tests * refactor: add retries, retryDelay and flowControl to batch requests * fix: defaults and tests --------- Co-authored-by: CahidArda <[email protected]>
1 parent 6a9e0b7 commit b78bb0d

26 files changed

+290
-19
lines changed

bun.lockb

0 Bytes
Binary file not shown.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@
103103
},
104104
"dependencies": {
105105
"@ai-sdk/openai": "^1.2.1",
106-
"@upstash/qstash": "^2.8.1",
106+
"@upstash/qstash": "^2.8.2",
107107
"ai": "^4.1.54",
108108
"zod": "^3.24.1"
109109
},

src/agents/adapters.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ export const fetchWithContextCall = async (
3535
body,
3636
timeout: agentCallParams?.timeout,
3737
retries: agentCallParams?.retries,
38+
retryDelay: agentCallParams?.retryDelay,
3839
flowControl: agentCallParams?.flowControl,
3940
});
4041

src/agents/agent.test.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ describe("agents", () => {
2525
url: WORKFLOW_ENDPOINT,
2626
workflowRunId,
2727
retries: 5,
28+
retryDelay: "1000",
2829
});
2930

3031
const agentsApi = new WorkflowAgents({ context });
@@ -43,6 +44,7 @@ describe("agents", () => {
4344
},
4445
retries: 5,
4546
timeout: 10,
47+
retryDelay: "1000",
4648
},
4749
});
4850

@@ -119,9 +121,11 @@ describe("agents", () => {
119121
"upstash-workflow-runid": workflowRunId,
120122
"upstash-workflow-url": "https://requestcatcher.com/api",
121123
"upstash-callback-retries": "5",
124+
"upstash-callback-retry-delay": "1000",
122125
"upstash-flow-control-key": "flowControlKey",
123126
"upstash-flow-control-value": "parallelism=2",
124127
"upstash-retries": "5",
128+
"upstash-retry-delay": "1000",
125129
"upstash-timeout": "10",
126130
},
127131
},
@@ -173,6 +177,7 @@ describe("agents", () => {
173177
"upstash-callback-forward-upstash-workflow-stepname": "Call Agent my agent",
174178
"upstash-callback-forward-upstash-workflow-steptype": "Call",
175179
"upstash-callback-retries": "5",
180+
"upstash-callback-retry-delay": "1000",
176181
"upstash-callback-workflow-calltype": "fromCallback",
177182
"upstash-callback-workflow-init": "false",
178183
"upstash-callback-workflow-runid": workflowRunId,
@@ -189,6 +194,7 @@ describe("agents", () => {
189194
"upstash-flow-control-key": "flowControlKey",
190195
"upstash-flow-control-value": "parallelism=2",
191196
"upstash-retries": "5",
197+
"upstash-retry-delay": "1000",
192198
"upstash-timeout": "10",
193199
},
194200
},
@@ -239,6 +245,7 @@ describe("agents", () => {
239245
"upstash-callback-forward-upstash-workflow-stepname": "Call Agent manager llm",
240246
"upstash-callback-forward-upstash-workflow-steptype": "Call",
241247
"upstash-callback-retries": "5",
248+
"upstash-callback-retry-delay": "1000",
242249
"upstash-callback-workflow-calltype": "fromCallback",
243250
"upstash-callback-workflow-init": "false",
244251
"upstash-callback-workflow-runid": workflowRunId,
@@ -255,6 +262,7 @@ describe("agents", () => {
255262
"upstash-flow-control-key": "flowControlKey",
256263
"upstash-flow-control-value": "parallelism=2",
257264
"upstash-retries": "5",
265+
"upstash-retry-delay": "1000",
258266
"upstash-timeout": "10",
259267
},
260268
},

src/agents/types.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@ export type ManagerAgentParameters = {
9393
Pick<AgentParameters, "maxSteps">;
9494

9595
type ModelParams = Parameters<ReturnType<typeof createOpenAI>>;
96-
export type AgentCallParams = Pick<CallSettings, "flowControl" | "retries" | "timeout">;
96+
export type AgentCallParams = Pick<
97+
CallSettings,
98+
"flowControl" | "retries" | "timeout" | "retryDelay"
99+
>;
97100
type CustomModelSettings = ModelParams["1"] & { baseURL?: string; apiKey?: string } & {
98101
callSettings: AgentCallParams;
99102
};

src/client/index.test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ describe("workflow client", () => {
210210
headers: { "user-header": "user-header-value" },
211211
workflowRunId: myWorkflowRunId,
212212
retries: 15,
213+
retryDelay: "1000",
213214
delay: 1,
214215
});
215216
},
@@ -229,6 +230,7 @@ describe("workflow client", () => {
229230
"upstash-forward-user-header": "user-header-value",
230231
"upstash-method": "POST",
231232
"upstash-retries": "15",
233+
"upstash-retry-delay": "1000",
232234
"upstash-workflow-init": "true",
233235
"upstash-workflow-runid": `wfr_${myWorkflowRunId}`,
234236
"upstash-workflow-url": "https://requestcatcher.com/api",
@@ -261,6 +263,7 @@ describe("workflow client", () => {
261263
headers: { "user-header": "user-header-value" },
262264
workflowRunId: myWorkflowRunId,
263265
retries: 15,
266+
retryDelay: "1000",
264267
delay: 1,
265268
},
266269
{
@@ -269,6 +272,7 @@ describe("workflow client", () => {
269272
headers: { "user-header": "user-header-value" },
270273
workflowRunId: myWorkflowRunId2,
271274
retries: 15,
275+
retryDelay: "2000",
272276
delay: 1,
273277
useFailureFunction: true,
274278
},
@@ -294,6 +298,7 @@ describe("workflow client", () => {
294298
"upstash-forward-user-header": "user-header-value",
295299
"upstash-method": "POST",
296300
"upstash-retries": "15",
301+
"upstash-retry-delay": "1000",
297302
"upstash-workflow-init": "true",
298303
"upstash-workflow-runid": `wfr_${myWorkflowRunId}`,
299304
"upstash-workflow-url": "https://requestcatcher.com/api",
@@ -314,6 +319,7 @@ describe("workflow client", () => {
314319
"upstash-forward-user-header": "user-header-value",
315320
"upstash-method": "POST",
316321
"upstash-retries": "15",
322+
"upstash-retry-delay": "2000",
317323
"upstash-workflow-init": "true",
318324
"upstash-workflow-runid": `wfr_${myWorkflowRunId2}`,
319325
"upstash-workflow-url": "https://requestcatcher.com/api",
@@ -330,6 +336,7 @@ describe("workflow client", () => {
330336
"upstash-failure-callback-forward-upstash-workflow-is-failure": "true",
331337
"upstash-failure-callback-forward-user-header": "user-header-value",
332338
"upstash-failure-callback-retries": "15",
339+
"upstash-failure-callback-retry-delay": "2000",
333340
"upstash-failure-callback-workflow-calltype": "failureCall",
334341
"upstash-failure-callback-workflow-init": "false",
335342
"upstash-failure-callback-workflow-runid": `wfr_${myWorkflowRunId2}`,
@@ -353,6 +360,7 @@ describe("workflow client", () => {
353360
headers: { "user-header": "user-header-value" },
354361
workflowRunId: myWorkflowRunId,
355362
retries: 15,
363+
retryDelay: "1000",
356364
delay: 1,
357365
failureUrl: "https://requestcatcher.com/some-failure-callback",
358366
});
@@ -373,6 +381,7 @@ describe("workflow client", () => {
373381
"upstash-forward-user-header": "user-header-value",
374382
"upstash-method": "POST",
375383
"upstash-retries": "15",
384+
"upstash-retry-delay": "1000",
376385
"upstash-workflow-init": "true",
377386
"upstash-workflow-runid": `wfr_${myWorkflowRunId}`,
378387
"upstash-workflow-url": "https://requestcatcher.com/api",
@@ -385,6 +394,7 @@ describe("workflow client", () => {
385394
"upstash-failure-callback-forward-upstash-workflow-is-failure": "true",
386395
"upstash-failure-callback-forward-user-header": "user-header-value",
387396
"upstash-failure-callback-retries": "15",
397+
"upstash-failure-callback-retry-delay": "1000",
388398
"upstash-failure-callback-workflow-calltype": "failureCall",
389399
"upstash-failure-callback-workflow-init": "false",
390400
"upstash-failure-callback-workflow-runid": `wfr_${myWorkflowRunId}`,

src/client/index.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ export class Client {
177177
* headers: { ... }, // Optional headers
178178
* workflowRunId: "my-workflow", // Optional workflow run ID
179179
* retries: 3 // Optional retries for the initial request
180+
* retryDelay: "1000" // Optional retry delay for the delay between retries
180181
* });
181182
*
182183
* console.log(workflowRunId)
@@ -191,13 +192,15 @@ export class Client {
191192
* headers: { ... }, // Optional headers
192193
* workflowRunId: "my-workflow", // Optional workflow run ID
193194
* retries: 3 // Optional retries for the initial request
195+
* retryDelay: "1000" // Optional retry delay for the delay between retries
194196
* },
195197
* {
196198
* url: "https://workflow-endpoint-2.com",
197199
* body: "hello world!", // Optional body
198200
* headers: { ... }, // Optional headers
199201
* workflowRunId: "my-workflow-2", // Optional workflow run ID
200202
* retries: 5 // Optional retries for the initial request
203+
* retryDelay: "1000" // Optional retry delay for the delay between retries
201204
* },
202205
* ]);
203206
*
@@ -218,6 +221,7 @@ export class Client {
218221
* with `wfr_`.
219222
* @param retries retry to use in the initial request. in the rest of
220223
* the workflow, `retries` option of the `serve` will be used.
224+
* @param retryDelay delay between retries.
221225
* @param flowControl Settings for controlling the number of active requests
222226
* and number of requests per second with the same key.
223227
* @param delay Delay for the workflow run. This is used to delay the
@@ -241,13 +245,14 @@ export class Client {
241245

242246
const context = new WorkflowContext({
243247
qstashClient: this.client,
244-
// @ts-expect-error headers type mismatch
245-
headers: new Headers(option.headers ?? {}),
248+
// @ts-expect-error header type mismatch because of bun
249+
headers: new Headers((option.headers ?? {})),
246250
initialPayload: option.body,
247251
steps: [],
248252
url: option.url,
249253
workflowRunId: finalWorkflowRunId,
250254
retries: option.retries,
255+
retryDelay: option.retryDelay,
251256
telemetry: { sdk: SDK_TELEMETRY },
252257
flowControl: option.flowControl,
253258
failureUrl,

src/client/types.ts

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ type BaseStepLog = {
3535
* number of retries for the step
3636
*/
3737
retries: number;
38+
/**
39+
* retry delay parameter for the step if it was set
40+
*/
41+
retryDelay?: string;
3842
/**
3943
* number of parallel steps
4044
*
@@ -53,10 +57,6 @@ type BaseStepLog = {
5357
* headers
5458
*/
5559
headers: Record<string, string[]>;
56-
/**
57-
* retry delay parameter for the step if it was set
58-
*/
59-
retryDelay?: string;
6060
};
6161

6262
type CallUrlGroup = {
@@ -184,14 +184,14 @@ type StepLogGroup =
184184
* retries
185185
*/
186186
retries: number;
187-
/**
188-
* errors which occured in the step
189-
*/
190-
errors?: StepError[];
191187
/**
192188
* retry delay parameter for the step if it was set
193189
*/
194190
retryDelay?: string;
191+
/**
192+
* errors which occured in the step
193+
*/
194+
errors?: StepError[];
195195
}[];
196196
/**
197197
* Log which belongs to the next step
@@ -345,6 +345,38 @@ export type TriggerOptions = {
345345
* @default 3
346346
*/
347347
retries?: number;
348+
/**
349+
* Delay between retries.
350+
*
351+
* By default, the `retryDelay` is exponential backoff.
352+
* More details can be found in: https://upstash.com/docs/qstash/features/retry.
353+
*
354+
* The `retryDelay` option allows you to customize the delay (in milliseconds) between retry attempts when message delivery fails.
355+
*
356+
* You can use mathematical expressions and the following built-in functions to calculate the delay dynamically.
357+
* The special variable `retried` represents the current retry attempt count (starting from 0).
358+
*
359+
* Supported functions:
360+
* - `pow`
361+
* - `sqrt`
362+
* - `abs`
363+
* - `exp`
364+
* - `floor`
365+
* - `ceil`
366+
* - `round`
367+
* - `min`
368+
* - `max`
369+
*
370+
* Examples of valid `retryDelay` values:
371+
* ```ts
372+
* 1000 // 1 second
373+
* 1000 * (1 + retried) // 1 second multiplied by the current retry attempt
374+
* pow(2, retried) // 2 to the power of the current retry attempt
375+
* max(10, pow(2, retried)) // The greater of 10 or 2^retried
376+
* ```
377+
*/
378+
retryDelay?: string;
379+
348380
/**
349381
* Flow control to use for the workflow run.
350382
* If not provided, no flow control will be used.

src/context/api/base.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ export abstract class BaseWorkflowApi {
3232
>
3333
): Promise<CallResponse<TResult>> {
3434
const { url, appendHeaders, method } = getProviderInfo(settings.api);
35-
const { method: userMethod, body, headers = {}, retries = 0, timeout } = settings;
35+
const { method: userMethod, body, headers = {}, retries = 0, retryDelay, timeout } = settings;
3636

3737
return await this.context.call<TResult, TBody>(stepName, {
3838
url,
@@ -43,6 +43,7 @@ export abstract class BaseWorkflowApi {
4343
...headers,
4444
},
4545
retries,
46+
retryDelay,
4647
timeout,
4748
});
4849
}

src/context/auto-executor.test.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ describe("auto-executor", () => {
9292
steps,
9393
url: WORKFLOW_ENDPOINT,
9494
retries: 6,
95+
retryDelay: "1000",
9596
flowControl: {
9697
key: "key",
9798
parallelism: 10,
@@ -135,6 +136,7 @@ describe("auto-executor", () => {
135136
"upstash-workflow-init": "false",
136137
"upstash-workflow-url": WORKFLOW_ENDPOINT,
137138
"upstash-retries": "6",
139+
"upstash-retry-delay": "1000",
138140
"upstash-flow-control-key": "key",
139141
"upstash-flow-control-value": "parallelism=10",
140142
"upstash-forward-upstash-workflow-invoke-count": "7",
@@ -232,6 +234,7 @@ describe("auto-executor", () => {
232234
"upstash-workflow-init": "false",
233235
"upstash-workflow-url": WORKFLOW_ENDPOINT,
234236
"upstash-retries": "6",
237+
"upstash-retry-delay": "1000",
235238
"upstash-flow-control-key": "key",
236239
"upstash-flow-control-value": "parallelism=10",
237240
"upstash-forward-upstash-workflow-invoke-count": "7",
@@ -251,6 +254,7 @@ describe("auto-executor", () => {
251254
"upstash-workflow-init": "false",
252255
"upstash-workflow-url": WORKFLOW_ENDPOINT,
253256
"upstash-retries": "6",
257+
"upstash-retry-delay": "1000",
254258
"upstash-flow-control-key": "key",
255259
"upstash-flow-control-value": "parallelism=10",
256260
"upstash-forward-upstash-workflow-invoke-count": "7",
@@ -270,6 +274,7 @@ describe("auto-executor", () => {
270274
"upstash-workflow-init": "false",
271275
"upstash-workflow-url": WORKFLOW_ENDPOINT,
272276
"upstash-retries": "6",
277+
"upstash-retry-delay": "1000",
273278
"upstash-flow-control-key": "key",
274279
"upstash-flow-control-value": "parallelism=10",
275280
"upstash-forward-upstash-workflow-invoke-count": "7",
@@ -288,6 +293,7 @@ describe("auto-executor", () => {
288293
"upstash-workflow-init": "false",
289294
"upstash-workflow-url": WORKFLOW_ENDPOINT,
290295
"upstash-retries": "6",
296+
"upstash-retry-delay": "1000",
291297
"upstash-flow-control-key": "key",
292298
"upstash-flow-control-value": "parallelism=10",
293299
"upstash-forward-upstash-workflow-invoke-count": "7",
@@ -349,6 +355,7 @@ describe("auto-executor", () => {
349355
"upstash-workflow-init": "false",
350356
"upstash-workflow-url": WORKFLOW_ENDPOINT,
351357
"upstash-retries": "6",
358+
"upstash-retry-delay": "1000",
352359
"upstash-flow-control-key": "key",
353360
"upstash-flow-control-value": "parallelism=10",
354361
"upstash-forward-upstash-workflow-invoke-count": "7",
@@ -406,6 +413,7 @@ describe("auto-executor", () => {
406413
"upstash-workflow-init": "false",
407414
"upstash-workflow-url": WORKFLOW_ENDPOINT,
408415
"upstash-retries": "6",
416+
"upstash-retry-delay": "1000",
409417
"upstash-flow-control-key": "key",
410418
"upstash-flow-control-value": "parallelism=10",
411419
"upstash-forward-upstash-workflow-invoke-count": "7",

0 commit comments

Comments
 (0)