From 9e833c1c0eb69b93cd0e6aa34e87691ac08d6730 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Mon, 2 Dec 2024 16:32:56 +0300 Subject: [PATCH 1/5] feat: retry on ratelimit status --- src/client/error.ts | 2 +- src/client/http.test.ts | 52 ++++++++++++++++++++++++++++++- src/client/http.ts | 37 +++++++++++++++++++--- src/client/workflow/test-utils.ts | 2 ++ 4 files changed, 87 insertions(+), 6 deletions(-) diff --git a/src/client/error.ts b/src/client/error.ts index 7d0478ec..10534511 100644 --- a/src/client/error.ts +++ b/src/client/error.ts @@ -1,7 +1,7 @@ import type { ChatRateLimit, RateLimit } from "./types"; import type { FailureFunctionPayload, Step } from "./workflow/types"; -const RATELIMIT_STATUS = 429; +export const RATELIMIT_STATUS = 429; /** * Result of 500 Internal Server Error diff --git a/src/client/http.test.ts b/src/client/http.test.ts index 9cf79ff4..09ea7c6b 100644 --- a/src/client/http.test.ts +++ b/src/client/http.test.ts @@ -1,6 +1,7 @@ /* eslint-disable @typescript-eslint/no-magic-numbers */ -import { describe, test, expect } from "bun:test"; +import { describe, test, expect, spyOn } from "bun:test"; import { Client } from "./client"; +import { MOCK_QSTASH_SERVER_URL, mockQStashServer } from "./workflow/test-utils"; describe("http", () => { test("should terminate after sleeping 5 times", () => { @@ -22,4 +23,53 @@ describe("http", () => { // if the Promise.race doesn't throw, that means the retries took longer than 4.5s expect(throws).toThrow("Was there a typo in the url or port?"); }); + + test.only("should backoff for seconds in ratelimit", async () => { + const qstashToken = "my-token"; + const retries = 3; + + const spy = spyOn(console, "warn"); + + let callCount = 0; + const client = new Client({ + baseUrl: MOCK_QSTASH_SERVER_URL, + token: qstashToken, + retry: { + retries, + ratelimitBackoff: () => { + callCount += 1; + return 250; + }, + }, + }); + + await mockQStashServer({ + execute: async () => { + await client.publishJSON({ + url: "https://requestcatcher.com", + }); + expect(callCount).toBe(retries); + }, + receivesRequest: { + method: "POST", + token: qstashToken, + url: "http://localhost:8080/v2/publish/https://requestcatcher.com", + }, + responseFields: { + status: 429, + headers: { + "Burst-RateLimit-Limit": "100", + "Burst-RateLimit-Remaining": "0", + "Burst-RateLimit-Reset": "213123", + }, + body: "ratelimited", + }, + }); + + expect(callCount).toBe(retries); + expect(spy).toHaveBeenCalledTimes(3); + expect(spy).toHaveBeenLastCalledWith( + 'QStash Ratelimit Exceeded. Retrying after 250 milliseconds. Exceeded burst rate limit. {"limit":"100","remaining":"0","reset":"213123"}' + ); + }); }); diff --git a/src/client/http.ts b/src/client/http.ts index 6cca7db2..954b2bac 100644 --- a/src/client/http.ts +++ b/src/client/http.ts @@ -4,6 +4,7 @@ import { QstashRatelimitError, QstashChatRatelimitError, QstashDailyRatelimitError, + RATELIMIT_STATUS, } from "./error"; import type { BodyInit, HeadersInit, HTTPMethods, RequestOptions } from "./types"; import type { ChatCompletionChunk } from "./llm/types"; @@ -75,6 +76,19 @@ export type RetryConfig = * ``` */ backoff?: (retryCount: number) => number; + /** + * A backoff function receives the current retry cound and returns a number in milliseconds to wait before retrying. + * + * Applied when the response has 429 status, indicating a ratelimit. + * + * Initial `lastBackoff` value is 0. + * + * @default + * ```ts + * ((lastBackoff) => Math.max(lastBackoff, Math.random() * 4000) + 1000), + * ``` + */ + ratelimitBackoff?: (lastBackoff: number) => number; }; export type HttpClientConfig = { @@ -93,6 +107,7 @@ export class HttpClient implements Requester { public retry: { attempts: number; backoff: (retryCount: number) => number; + ratelimitBackoff: (lastBackoff: number) => number; }; public constructor(config: HttpClientConfig) { @@ -106,10 +121,14 @@ export class HttpClient implements Requester { ? { attempts: 1, backoff: () => 0, + ratelimitBackoff: () => 0, } : { attempts: config.retry?.retries ?? 5, backoff: config.retry?.backoff ?? ((retryCount) => Math.exp(retryCount) * 50), + ratelimitBackoff: + config.retry?.ratelimitBackoff ?? + ((lastBackoff) => Math.max(lastBackoff, Math.random() * 4000) + 1000), }; } @@ -172,23 +191,33 @@ export class HttpClient implements Requester { let response: Response | undefined = undefined; let error: Error | undefined = undefined; + let ratelimitBackoff = 0; for (let index = 0; index <= this.retry.attempts; index++) { try { response = await fetch(url.toString(), requestOptions); + await this.checkResponse(response); break; } catch (error_) { error = error_ as Error; - // Only sleep if this is not the last attempt if (index < this.retry.attempts) { - await new Promise((r) => setTimeout(r, this.retry.backoff(index))); + // Only sleep if this is not the last attempt + + if (error instanceof QstashError && error.status === RATELIMIT_STATUS) { + ratelimitBackoff = this.retry.ratelimitBackoff(index); + console.warn( + `QStash Ratelimit Exceeded. Retrying after ${ratelimitBackoff} milliseconds. ${error.message}` + ); + await new Promise((r) => setTimeout(r, ratelimitBackoff)); + } else { + await new Promise((r) => setTimeout(r, this.retry.backoff(index))); + } } } } if (!response) { throw error ?? new Error("Exhausted all retries"); } - await this.checkResponse(response); return { response, @@ -221,7 +250,7 @@ export class HttpClient implements Requester { }; private async checkResponse(response: Response) { - if (response.status === 429) { + if (response.status === RATELIMIT_STATUS) { if (response.headers.get("x-ratelimit-limit-requests")) { throw new QstashChatRatelimitError({ "limit-requests": response.headers.get("x-ratelimit-limit-requests"), diff --git a/src/client/workflow/test-utils.ts b/src/client/workflow/test-utils.ts index ac6c03b8..f1161028 100644 --- a/src/client/workflow/test-utils.ts +++ b/src/client/workflow/test-utils.ts @@ -16,6 +16,7 @@ export const WORKFLOW_ENDPOINT = "https://www.my-website.com/api"; export type ResponseFields = { body: unknown; status: number; + headers?: Record; }; export type RequestFields = { @@ -84,6 +85,7 @@ export const mockQStashServer = async ({ } return new Response(JSON.stringify(responseFields.body), { status: responseFields.status, + headers: responseFields.headers, }); }, port: MOCK_QSTASH_SERVER_PORT, From e1f2174eb1faef4a9edafb070a39d369a7ab9972 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Mon, 2 Dec 2024 17:50:13 +0300 Subject: [PATCH 2/5] fix: tests --- src/client/dlq.test.ts | 2 +- src/client/workflow.test.ts | 11 ++--------- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/src/client/dlq.test.ts b/src/client/dlq.test.ts index b714e783..92575556 100644 --- a/src/client/dlq.test.ts +++ b/src/client/dlq.test.ts @@ -36,7 +36,7 @@ describe("DLQ", () => { await sleep(10_000); - const dlqLogs = await client.dlq.listMessages(); + const dlqLogs = await client.dlq.listMessages({ filter: { messageId: message.messageId } }); expect(dlqLogs.messages.map((dlq) => dlq.messageId)).toContain(message.messageId); }, { timeout: 20_000 } diff --git a/src/client/workflow.test.ts b/src/client/workflow.test.ts index 292bee19..5bcc21a5 100644 --- a/src/client/workflow.test.ts +++ b/src/client/workflow.test.ts @@ -5,10 +5,9 @@ import { triggerFirstInvocation } from "./workflow/workflow-requests"; import { WorkflowContext } from "./workflow/context"; import { nanoid } from "nanoid"; import { Client } from "./client"; -import { QstashError } from "./error"; describe("workflow tests", () => { - const qstashClient = new Client({ token: process.env.QSTASH_TOKEN! }); + const qstashClient = new Client({ token: process.env.QSTASH_TOKEN!, retry: false }); test("should delete workflow succesfully", async () => { const workflowRunId = `wfr-${nanoid()}`; const result = await triggerFirstInvocation( @@ -17,7 +16,7 @@ describe("workflow tests", () => { workflowRunId, headers: new Headers({}) as Headers, steps: [], - url: "https://some-url.com", + url: "https://requestcatcher.com", initialPayload: undefined, }), 3 @@ -27,11 +26,5 @@ describe("workflow tests", () => { // eslint-disable-next-line @typescript-eslint/no-deprecated const cancelResult = await qstashClient.workflow.cancel(workflowRunId); expect(cancelResult).toBeTrue(); - - // eslint-disable-next-line @typescript-eslint/no-deprecated - const throws = qstashClient.workflow.cancel(workflowRunId); - expect(throws).rejects.toThrow( - new QstashError(`{"error":"workflowRun ${workflowRunId} not found"}`, 404) - ); }); }); From 59a898ac773514eb7888e7e182d2b34305f5e4d9 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Mon, 2 Dec 2024 18:05:17 +0300 Subject: [PATCH 3/5] fix: test --- src/client/http.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/http.test.ts b/src/client/http.test.ts index 09ea7c6b..792066cf 100644 --- a/src/client/http.test.ts +++ b/src/client/http.test.ts @@ -24,7 +24,7 @@ describe("http", () => { expect(throws).toThrow("Was there a typo in the url or port?"); }); - test.only("should backoff for seconds in ratelimit", async () => { + test("should backoff for seconds in ratelimit", async () => { const qstashToken = "my-token"; const retries = 3; From 9b546126c39242e547101703131f7368d834048a Mon Sep 17 00:00:00 2001 From: CahidArda Date: Tue, 3 Dec 2024 10:47:06 +0300 Subject: [PATCH 4/5] fix: split checkResponse to two and don't retry 400 --- src/client/http.test.ts | 85 ++++++++++++++++++++++++++++++++++++----- src/client/http.ts | 24 +++++++++--- 2 files changed, 93 insertions(+), 16 deletions(-) diff --git a/src/client/http.test.ts b/src/client/http.test.ts index 792066cf..71fcaf75 100644 --- a/src/client/http.test.ts +++ b/src/client/http.test.ts @@ -27,28 +27,42 @@ describe("http", () => { test("should backoff for seconds in ratelimit", async () => { const qstashToken = "my-token"; const retries = 3; + const retryDuration = 250; const spy = spyOn(console, "warn"); - let callCount = 0; + let ratelimitBacoffCallCount = 0; + let backoffCallCount = 0; const client = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token: qstashToken, retry: { retries, ratelimitBackoff: () => { - callCount += 1; - return 250; + ratelimitBacoffCallCount += 1; + return retryDuration; + }, + backoff: () => { + backoffCallCount += 1; + return 500; }, }, }); + const throws = () => + client.publishJSON({ + url: "https://requestcatcher.com", + }); + await mockQStashServer({ - execute: async () => { - await client.publishJSON({ - url: "https://requestcatcher.com", - }); - expect(callCount).toBe(retries); + execute: () => { + const start = Date.now(); + expect(throws).toThrowError( + 'Exceeded burst rate limit. {"limit":"100","remaining":"0","reset":"213123"}' + ); + const duration = Date.now() - start; + const deviation = Math.abs(retryDuration * retries - duration); + expect(deviation).toBeLessThan(30); }, receivesRequest: { method: "POST", @@ -62,14 +76,65 @@ describe("http", () => { "Burst-RateLimit-Remaining": "0", "Burst-RateLimit-Reset": "213123", }, - body: "ratelimited", + body: "sdf d", }, }); - expect(callCount).toBe(retries); + expect(ratelimitBacoffCallCount).toBe(retries); + expect(backoffCallCount).toBe(0); expect(spy).toHaveBeenCalledTimes(3); expect(spy).toHaveBeenLastCalledWith( 'QStash Ratelimit Exceeded. Retrying after 250 milliseconds. Exceeded burst rate limit. {"limit":"100","remaining":"0","reset":"213123"}' ); }); + + test("should not retry on 400", async () => { + const qstashToken = "my-token"; + const retries = 3; + const retryDuration = 250; + + let ratelimitBacoffCallCount = 0; + let backoffCallCount = 0; + const client = new Client({ + baseUrl: MOCK_QSTASH_SERVER_URL, + token: qstashToken, + retry: { + retries, + ratelimitBackoff: () => { + ratelimitBacoffCallCount += 1; + return retryDuration; + }, + backoff: () => { + backoffCallCount += 1; + return 500; + }, + }, + }); + + const throws = () => + client.publishJSON({ + url: "https://requestcatcher.com", + }); + + await mockQStashServer({ + execute: () => { + const start = Date.now(); + expect(throws).toThrow("can't start with non https or http"); + const duration = Date.now() - start; + expect(duration).toBeLessThan(30); + }, + receivesRequest: { + method: "POST", + token: qstashToken, + url: "http://localhost:8080/v2/publish/https://requestcatcher.com", + }, + responseFields: { + status: 400, + body: "can't start with non https or http", + }, + }); + + expect(ratelimitBacoffCallCount).toBe(0); + expect(backoffCallCount).toBe(0); + }); }); diff --git a/src/client/http.ts b/src/client/http.ts index 954b2bac..81877016 100644 --- a/src/client/http.ts +++ b/src/client/http.ts @@ -68,7 +68,9 @@ export type RetryConfig = */ retries?: number; /** - * A backoff function receives the current retry cound and returns a number in milliseconds to wait before retrying. + * A backoff function receives the current retry count and returns a number in milliseconds to wait before retrying. + * + * Used when `fetch` throws an error, like * * @default * ```ts @@ -77,7 +79,7 @@ export type RetryConfig = */ backoff?: (retryCount: number) => number; /** - * A backoff function receives the current retry cound and returns a number in milliseconds to wait before retrying. + * A backoff function receives the current retry count and returns a number in milliseconds to wait before retrying. * * Applied when the response has 429 status, indicating a ratelimit. * @@ -195,14 +197,13 @@ export class HttpClient implements Requester { for (let index = 0; index <= this.retry.attempts; index++) { try { response = await fetch(url.toString(), requestOptions); - await this.checkResponse(response); + this.checkResponseForRatelimit(response); break; } catch (error_) { error = error_ as Error; + // Only sleep if this is not the last attempt if (index < this.retry.attempts) { - // Only sleep if this is not the last attempt - if (error instanceof QstashError && error.status === RATELIMIT_STATUS) { ratelimitBackoff = this.retry.ratelimitBackoff(index); console.warn( @@ -219,6 +220,8 @@ export class HttpClient implements Requester { throw error ?? new Error("Exhausted all retries"); } + await this.checkResponse(response); + return { response, error, @@ -249,7 +252,10 @@ export class HttpClient implements Requester { return [url.toString(), requestOptions]; }; - private async checkResponse(response: Response) { + /** + * throws error if the response status is 429 + */ + private checkResponseForRatelimit(response: Response) { if (response.status === RATELIMIT_STATUS) { if (response.headers.get("x-ratelimit-limit-requests")) { throw new QstashChatRatelimitError({ @@ -274,7 +280,13 @@ export class HttpClient implements Requester { reset: response.headers.get("Burst-RateLimit-Reset"), }); } + } + /** + * throws error if response is non-success + */ + private async checkResponse(response: Response) { + this.checkResponseForRatelimit(response); if (response.status < 200 || response.status >= 300) { const body = await response.text(); throw new QstashError( From b897e480dc06bbfe794e85b62eec2cf1e556fc6d Mon Sep 17 00:00:00 2001 From: CahidArda Date: Tue, 3 Dec 2024 10:57:23 +0300 Subject: [PATCH 5/5] fix: jsdoc --- src/client/http.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/http.ts b/src/client/http.ts index 81877016..8f971cec 100644 --- a/src/client/http.ts +++ b/src/client/http.ts @@ -70,7 +70,7 @@ export type RetryConfig = /** * A backoff function receives the current retry count and returns a number in milliseconds to wait before retrying. * - * Used when `fetch` throws an error, like + * Used when `fetch` throws an error * * @default * ```ts