Skip to content

Commit

Permalink
fix: add global header to schedule and enqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
CahidArda committed Dec 10, 2024
1 parent 5da4e71 commit 11650e1
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 39 deletions.
118 changes: 101 additions & 17 deletions src/client/api/email.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,24 @@ import { nanoid } from "../utils";
describe("email", () => {
const qstashToken = nanoid();
const resendToken = nanoid();
const client = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token: qstashToken });

const header = "my-header";
const headerValue = "my-header-value";
const globalHeader = "global-header";
const globalHeaderOverwritten = "global-header-overwritten";
const requestHeader = "request-header";

const globalHeaderValue = nanoid();
const overWrittenOldValue = nanoid();
const overWrittenNewValue = nanoid();
const requestHeaderValue = nanoid();

const client = new Client({
baseUrl: MOCK_QSTASH_SERVER_URL,
token: qstashToken,
headers: {
[globalHeader]: globalHeaderValue,
[globalHeaderOverwritten]: overWrittenOldValue,
},
});

test("should use resend", async () => {
await mockQStashServer({
Expand All @@ -20,15 +34,17 @@ describe("email", () => {
name: "email",
provider: resend({ token: resendToken }),
},
headers: {
[header]: headerValue,
},
body: {
from: "Acme <[email protected]>",
to: ["[email protected]"],
subject: "hello world",
html: "<p>it works!</p>",
},
headers: {
"content-type": "application/json",
[globalHeaderOverwritten]: overWrittenNewValue,
[requestHeader]: requestHeaderValue,
},
});
},
responseFields: {
Expand All @@ -46,10 +62,12 @@ describe("email", () => {
html: "<p>it works!</p>",
},
headers: {
"content-type": "application/json",
authorization: `Bearer ${qstashToken}`,
"upstash-forward-authorization": `Bearer ${resendToken}`,
"content-type": "application/json",
[`upstash-forward-${header}`]: headerValue,
[`upstash-forward-${requestHeader}`]: requestHeaderValue,
[`upstash-forward-${globalHeader}`]: globalHeaderValue,
[`upstash-forward-${globalHeaderOverwritten}`]: overWrittenNewValue,
"upstash-method": "POST",
},
},
Expand All @@ -64,9 +82,6 @@ describe("email", () => {
name: "email",
provider: resend({ token: resendToken, batch: true }),
},
headers: {
[header]: headerValue,
},
body: [
{
from: "Acme <[email protected]>",
Expand All @@ -81,6 +96,11 @@ describe("email", () => {
html: "<p>it works!</p>",
},
],
headers: {
"content-type": "application/json",
[globalHeaderOverwritten]: overWrittenNewValue,
[requestHeader]: requestHeaderValue,
},
});
},
responseFields: {
Expand Down Expand Up @@ -108,9 +128,11 @@ describe("email", () => {
headers: {
authorization: `Bearer ${qstashToken}`,
"upstash-forward-authorization": `Bearer ${resendToken}`,
"content-type": "application/json",
[`upstash-forward-${header}`]: headerValue,
"upstash-method": "POST",
"content-type": "application/json",
[`upstash-forward-${requestHeader}`]: requestHeaderValue,
[`upstash-forward-${globalHeader}`]: globalHeaderValue,
[`upstash-forward-${globalHeaderOverwritten}`]: overWrittenNewValue,
},
},
});
Expand All @@ -124,9 +146,6 @@ describe("email", () => {
name: "email",
provider: resend({ token: resendToken, batch: true }),
},
headers: {
[header]: headerValue,
},
method: "PUT",
body: [
{
Expand Down Expand Up @@ -170,10 +189,75 @@ describe("email", () => {
authorization: `Bearer ${qstashToken}`,
"upstash-forward-authorization": `Bearer ${resendToken}`,
"content-type": "application/json",
[`upstash-forward-${header}`]: headerValue,
"upstash-method": "PUT",
},
},
});
});

test("should be able to enqueue", async () => {
const queueName = "resend-queue";
const queue = client.queue({ queueName });
await mockQStashServer({
execute: async () => {
await queue.enqueueJSON({
api: {
name: "email",
provider: resend({ token: resendToken, batch: true }),
},
body: [
{
from: "Acme <[email protected]>",
to: ["[email protected]"],
subject: "hello world",
html: "<h1>it works!</h1>",
},
{
from: "Acme <[email protected]>",
to: ["[email protected]"],
subject: "world hello",
html: "<p>it works!</p>",
},
],
headers: {
"content-type": "application/json",
[globalHeaderOverwritten]: overWrittenNewValue,
[requestHeader]: requestHeaderValue,
},
});
},
responseFields: {
body: { messageId: "msgId" },
status: 200,
},
receivesRequest: {
method: "POST",
token: qstashToken,
url: "http://localhost:8080/v2/enqueue/resend-queue/https://api.resend.com/emails/batch",
body: [
{
from: "Acme <[email protected]>",
to: ["[email protected]"],
subject: "hello world",
html: "<h1>it works!</h1>",
},
{
from: "Acme <[email protected]>",
to: ["[email protected]"],
subject: "world hello",
html: "<p>it works!</p>",
},
],
headers: {
authorization: `Bearer ${qstashToken}`,
"upstash-forward-authorization": `Bearer ${resendToken}`,
"content-type": "application/json",
"upstash-method": "POST",
[`upstash-forward-${requestHeader}`]: requestHeaderValue,
[`upstash-forward-${globalHeader}`]: globalHeaderValue,
[`upstash-forward-${globalHeaderOverwritten}`]: overWrittenNewValue,
},
},
});
});
});
25 changes: 8 additions & 17 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Queue } from "./queue";
import { Schedules } from "./schedules";
import type { BodyInit, Event, GetEventsPayload, HeadersInit, HTTPMethods, State } from "./types";
import { UrlGroups } from "./url-groups";
import { getRequestPath, prefixHeaders, processHeaders } from "./utils";
import { getRequestPath, prefixHeaders, processHeaders, wrapWithGlobalHeaders } from "./utils";
import { Workflow } from "./workflow";
import type { PublishEmailApi, PublishLLMApi } from "./api/types";
import { processApi } from "./api/utils";
Expand Down Expand Up @@ -271,28 +271,16 @@ export type QueueRequest = {
export class Client {
public http: Requester;
private token: string;
private headers: Headers;

public constructor(config: ClientConfig) {
this.http = new HttpClient({
retry: config.retry,
baseUrl: config.baseUrl ? config.baseUrl.replace(/\/$/, "") : "https://qstash.upstash.io",
authorization: `Bearer ${config.token}`,
//@ts-expect-error caused by undici and bunjs type overlap
headers: prefixHeaders(new Headers(config.headers)),
});
this.token = config.token;
//@ts-expect-error caused by undici and bunjs type overlap
this.headers = prefixHeaders(new Headers(config.headers));
}

private wrapWithGlobalHeaders(headers: Headers) {
const finalHeaders = new Headers(this.headers);

// eslint-disable-next-line unicorn/no-array-for-each
headers.forEach((value, key) => {
finalHeaders.set(key, value);
});

return finalHeaders;
}

/**
Expand Down Expand Up @@ -377,7 +365,10 @@ export class Client {
public async publish<TRequest extends PublishRequest>(
request: TRequest
): Promise<PublishResponse<TRequest>> {
const headers = this.wrapWithGlobalHeaders(processHeaders(request)) as HeadersInit;
const headers = wrapWithGlobalHeaders(
processHeaders(request),
this.http.headers
) as HeadersInit;
const response = await this.http.request<PublishResponse<TRequest>>({
path: ["v2", "publish", getRequestPath(request)],
body: request.body,
Expand Down Expand Up @@ -418,7 +409,7 @@ export class Client {
public async batch(request: PublishBatchRequest[]): Promise<PublishResponse<PublishRequest>[]> {
const messages = [];
for (const message of request) {
const headers = this.wrapWithGlobalHeaders(processHeaders(message));
const headers = wrapWithGlobalHeaders(processHeaders(message), this.http.headers);
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
//@ts-ignore Type mismatch TODO: should be checked later
const headerEntries = Object.fromEntries(headers.entries());
Expand Down
6 changes: 6 additions & 0 deletions src/client/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export type UpstashResponse<TResult> = TResult & { error?: string };
export type Requester = {
request: <TResult = unknown>(request: UpstashRequest) => Promise<UpstashResponse<TResult>>;
requestStream: (request: UpstashRequest) => AsyncIterable<ChatCompletionChunk>;
headers?: Headers;
};

export type RetryConfig =
Expand All @@ -81,6 +82,7 @@ export type HttpClientConfig = {
baseUrl: string;
authorization: string;
retry?: RetryConfig;
headers?: Headers;
};

export class HttpClient implements Requester {
Expand All @@ -95,6 +97,8 @@ export class HttpClient implements Requester {
backoff: (retryCount: number) => number;
};

public readonly headers;

public constructor(config: HttpClientConfig) {
this.baseUrl = config.baseUrl.replace(/\/$/, "");

Expand All @@ -111,6 +115,8 @@ export class HttpClient implements Requester {
attempts: config.retry?.retries ?? 5,
backoff: config.retry?.backoff ?? ((retryCount) => Math.exp(retryCount) * 50),
};

this.headers = config.headers;
}

public async request<TResult>(request: UpstashRequest): Promise<UpstashResponse<TResult>> {
Expand Down
10 changes: 7 additions & 3 deletions src/client/queue.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { processApi } from "./api/utils";
import type { PublishRequest, PublishResponse } from "./client";
import type { Requester } from "./http";
import { getRequestPath, prefixHeaders, processHeaders } from "./utils";
import type { HeadersInit } from "./types";
import { getRequestPath, prefixHeaders, processHeaders, wrapWithGlobalHeaders } from "./utils";

export type QueueResponse = {
createdAt: number;
Expand Down Expand Up @@ -112,7 +113,11 @@ export class Queue {
throw new Error("Please provide a queue name to the Queue constructor");
}

const headers = processHeaders(request);
const headers = wrapWithGlobalHeaders(
processHeaders(request),
this.http.headers
) as HeadersInit;

const destination = getRequestPath(request);
const response = await this.http.request<PublishResponse<TRequest>>({
path: ["v2", "enqueue", this.queueName, destination],
Expand Down Expand Up @@ -142,7 +147,6 @@ export class Queue {
const response = await this.enqueue({
...nonApiRequest,
body: JSON.stringify(nonApiRequest.body),
headers,
});

// @ts-expect-error can't assign union type to conditional
Expand Down
Loading

0 comments on commit 11650e1

Please sign in to comment.