Skip to content

Commit

Permalink
Merge pull request #114 from upstash/hotfix-qstash-missing-features
Browse files Browse the repository at this point in the history
Add Missing Fields in Requests & Add DLQ Filtering and Batch Cancel API
  • Loading branch information
CahidArda authored Jul 2, 2024
2 parents d5e9445 + dd93920 commit d82f064
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ type EventsRequestFilter = {
state?: State;
url?: string;
urlGroup?: string;
api?: string;
scheduleId?: string;
queueName?: string;
fromDate?: number; // unix timestamp (ms)
Expand Down
67 changes: 66 additions & 1 deletion src/client/dlq.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
/* eslint-disable @typescript-eslint/no-magic-numbers */
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import { sleep } from "bun";
import { afterAll, describe, expect, test } from "bun:test";
import { afterAll, beforeAll, describe, expect, test } from "bun:test";
import { Client } from "./client";

describe("DLQ", () => {
const client = new Client({ token: process.env.QSTASH_TOKEN! });
const urlGroup = "someUrlGroup";

beforeAll(async () => {
await client.urlGroups.addEndpoints({
name: urlGroup,
endpoints: [
{
name: "myEndpoint",
url: "https://example.com/789/?asdasd=ooo",
},
],
});
});

afterAll(async () => {
const dlqLogs = await client.dlq.listMessages();
await client.dlq.deleteMany({ dlqIds: dlqLogs.messages.map((dlq) => dlq.dlqId) });
await client.urlGroups.delete(urlGroup);
});

test(
Expand Down Expand Up @@ -50,4 +64,55 @@ describe("DLQ", () => {
},
{ timeout: 20_000 }
);

test(
"should filter requests",
async () => {
const message = await client.publish({
url: `https://example.com/123/?asdasd=ooo`, //Any broken link will work
retries: 0,
});

await sleep(10_000);

const result = await client.dlq.listMessages({
filter: {
messageId: message.messageId,
},
});

expect(result.messages.length).toBe(1);
expect(result.messages[0].messageId).toBe(message.messageId);

await client.dlq.delete(result.messages[0].dlqId);
},
{ timeout: 20_000 }
);

test(
"should filter requests with urlGroup",
async () => {
/**
* we pass both urlGroup and topicName in our request, which could
* fail in the backend. Adding this test to make sure that
*/
const message = await client.publish({
urlGroup: urlGroup,
retries: 0,
});

await sleep(10_000);

const result = await client.dlq.listMessages({
filter: {
urlGroup: urlGroup,
},
});

expect(result.messages.length).toBe(1);
expect(result.messages[0].messageId).toBe(message[0].messageId);
await client.dlq.delete(result.messages[0].dlqId);
},
{ timeout: 20_000 }
);
});
96 changes: 94 additions & 2 deletions src/client/dlq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,32 @@ import type { Requester } from "./http";
import type { Message } from "./messages";

type DlqMessage = Message & {
/**
* The unique id within the DLQ
*/
dlqId: string;

/**
* The HTTP status code of the last failed delivery attempt
*/
responseStatus?: number;

/**
* The response headers of the last failed delivery attempt
*/
responseHeader?: Record<string, string[]>;

/**
* The response body of the last failed delivery attempt if it is
* composed of UTF-8 characters only, `None` otherwise.
*/
responseBody?: string;

/**
* The base64 encoded response body of the last failed delivery attempt
* if the response body contains non-UTF-8 characters, `None` otherwise.
*/
responseBodyBase64?: string;
};

export type DlqMessagePayload = Omit<DlqMessage, "urlGroup"> & { topicName: string };
Expand All @@ -12,6 +37,60 @@ export type DlqMessageGetPayload = {
cursor?: string;
};

export type DLQFilter = {
/**
* Filter DLQ entries by message id
*/
messageId?: string;

/**
* Filter DLQ entries by url
*/
url?: string;

/**
* Filter DLQ entries by url group name
*/
urlGroup?: string;

/**
* Filter DLQ entries by api name
*/
api?: string;

/**
* Filter DLQ entries by queue name
*/
queueName?: string;

/**
* Filter DLQ entries by schedule id
*/
scheduleId?: string;

/**
* Filter DLQ entries by starting time, in milliseconds
*/
fromDate?: number;

/**
* Filter DLQ entries by ending time, in milliseconds
*/
toDate?: number;

/**
* Filter DLQ entries by HTTP status of the response
*/
responseStatus?: number;

/**
* Filter DLQ entries by IP address of the publisher of the message
*/
callerIp?: string;
};

export type DLQFilterPayload = Omit<DLQFilter, "urlGroup"> & { topicName?: string };

export class DLQ {
private readonly http: Requester;

Expand All @@ -22,14 +101,27 @@ export class DLQ {
/**
* List messages in the dlq
*/
public async listMessages(options?: { cursor?: string }): Promise<{
public async listMessages(options?: {
cursor?: string;
count?: number;
filter?: DLQFilter;
}): Promise<{
messages: DlqMessage[];
cursor?: string;
}> {
const filterPayload: DLQFilterPayload = {
...options?.filter,
topicName: options?.filter?.urlGroup,
};

const messagesPayload = await this.http.request<DlqMessageGetPayload>({
method: "GET",
path: ["v2", "dlq"],
query: { cursor: options?.cursor },
query: {
cursor: options?.cursor,
count: options?.count,
...filterPayload,
},
});
return {
messages: messagesPayload.messages.map((message) => {
Expand Down
47 changes: 46 additions & 1 deletion src/client/messages.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import { describe, expect, test } from "bun:test";
import { beforeAll, describe, expect, test } from "bun:test";
import { Client } from "./client";

describe("Messages", () => {
const client = new Client({ token: process.env.QSTASH_TOKEN! });

beforeAll(async () => {
await client.messages.deleteAll();
});

test(
"should send message, cancel it then verify cancel",
async () => {
Expand Down Expand Up @@ -44,4 +48,45 @@ describe("Messages", () => {
},
{ timeout: 20_000 }
);

test(
"should delete many and all",
async () => {
const messages = await client.batchJSON([
{
url: `https://example.com`,
body: { hello: "world" },
timeout: 90,
delay: 10,
},
{
url: `https://example.com`,
body: { hello: "world" },
timeout: 90,
delay: 10,
},
{
url: `https://example.com`,
body: { hello: "world" },
timeout: 90,
delay: 10,
},
]);

// eslint-disable-next-line @typescript-eslint/no-magic-numbers
expect(messages.length).toBe(3);

const deleted = await client.messages.deleteMany([
messages[0].messageId,
messages[1].messageId,
]);

// eslint-disable-next-line @typescript-eslint/no-magic-numbers
expect(deleted).toBe(2);

const deletedAll = await client.messages.deleteAll();
expect(deletedAll).toBe(1);
},
{ timeout: 20_000 }
);
});
45 changes: 45 additions & 0 deletions src/client/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ export type Message = {
*/
url: string;

/**
* The endpoint name of the message if the endpoint is given a
* name within the url group.
*/
endpointName?: string;

/**
* The api name if this message was sent to an api
*/
api?: string;

/**
* The http method used to deliver the message
*/
Expand All @@ -31,6 +42,12 @@ export type Message = {
*/
body?: string;

/**
* The base64 encoded body if the body contains non-UTF-8 characters,
* `None` otherwise.
*/
bodyBase64?: string;

/**
* Maxmimum number of retries.
*/
Expand Down Expand Up @@ -60,6 +77,16 @@ export type Message = {
* The queue name if this message was sent to a queue.
*/
queueName?: string;

/**
* The scheduleId of the message if the message is triggered by a schedule
*/
scheduleId?: string;

/**
* IP address of the publisher of this message
*/
callerIp?: string;
};

export type MessagePayload = Omit<Message, "urlGroup"> & { topicName: string };
Expand Down Expand Up @@ -96,4 +123,22 @@ export class Messages {
parseResponseAsJson: false,
});
}

public async deleteMany(messageIds: string[]): Promise<number> {
const result = (await this.http.request({
method: "DELETE",
path: ["v2", "messages"],
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messageIds }),
})) as { cancelled: number };
return result.cancelled;
}

public async deleteAll(): Promise<number> {
const result = (await this.http.request({
method: "DELETE",
path: ["v2", "messages"],
})) as { cancelled: number };
return result.cancelled;
}
}
2 changes: 2 additions & 0 deletions src/client/schedules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ export type Schedule = {
method: string;
header?: Record<string, string[]>;
body?: string;
bodyBase64?: string;
retries: number;
delay?: number;
callback?: string;
failureCallback?: string;
callerIp?: string;
isPaused: true | undefined;
};

Expand Down
2 changes: 2 additions & 0 deletions src/client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ export type Event = {
url: string;
urlGroup?: string;
endpointName?: string;
header?: Record<string, string>;
body?: string; // base64 encoded
};

export type EventPayload = Omit<Event, "urlGroup"> & { topicName: string };
Expand Down

0 comments on commit d82f064

Please sign in to comment.