Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DX-982: add api/llm support for publish, enqueue, and batch APIs #104

Merged
merged 1 commit into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Messages } from "./messages";
import { Queue } from "./queue";
import { Schedules } from "./schedules";
import { Topics } from "./topics";
import { prefixHeaders, processHeaders } from "./utils";
import { getRequestPath, prefixHeaders, processHeaders } from "./utils";
import type { BodyInit, Event, HeadersInit, State } from "./types";
import { Chat } from "./llm/chat";

Expand Down Expand Up @@ -143,13 +143,23 @@ export type PublishRequest<TBody = BodyInit> = {
*/
url: string;
topic?: never;
api?: never;
}
| {
url?: never;
/**
* The url where the message should be sent to.
* The topic the message should be sent to.
*/
topic: string;
api?: never;
}
| {
url?: never;
topic?: never;
/**
* The api endpoint the request should be sent to.
*/
api: "llm";
}
);

Expand Down Expand Up @@ -257,7 +267,7 @@ export class Client {
): Promise<PublishResponse<TRequest>> {
const headers = processHeaders(request);
const response = await this.http.request<PublishResponse<TRequest>>({
path: ["v2", "publish", request.url ?? request.topic],
path: ["v2", "publish", getRequestPath(request)],
body: request.body,
headers,
method: "POST",
Expand Down Expand Up @@ -296,7 +306,7 @@ export class Client {
const headerEntries = Object.fromEntries(headers.entries());

messages.push({
destination: message.url ?? message.topic,
destination: getRequestPath(message),
headers: headerEntries,
body: message.body,
...(message.queueName && { queue: message.queueName }),
Expand Down Expand Up @@ -380,14 +390,20 @@ export class Client {
return response;
}
}
export type PublishToUrlResponse = {

export type PublishToApiResponse = {
messageId: string;
};

export type PublishToUrlResponse = PublishToApiResponse & {
url: string;
deduplicated?: boolean;
};

export type PublishToTopicResponse = PublishToUrlResponse[];

export type PublishResponse<R> = R extends { url: string }
export type PublishResponse<TRequest> = TRequest extends { url: string }
? PublishToUrlResponse
: PublishToTopicResponse;
: TRequest extends { topic: string }
? PublishToTopicResponse
: PublishToApiResponse;
56 changes: 56 additions & 0 deletions src/client/llm/chat.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,60 @@ describe("Test Qstash chat", () => {
},
{ timeout: 30_000, retry: 3 }
);

test("should publish with llm api", async () => {
const result = await client.publishJSON({
api: "llm",
body: {
model: "meta-llama/Meta-Llama-3-8B-Instruct",
messages: [
{
role: "user",
content: "hello",
},
],
},
callback: "https://example.com",
});
expect(result.messageId).toBeTruthy();
});

test("should batch with llm api", async () => {
const result = await client.batchJSON([
{
api: "llm",
body: {
model: "meta-llama/Meta-Llama-3-8B-Instruct",
messages: [
{
role: "user",
content: "hello",
},
],
},
callback: "https://example.com",
},
]);
expect(result.length).toBe(1);
expect(result[0].messageId).toBeTruthy();
});

test("should enqueue with llm api", async () => {
const queueName = "upstash-queue";
const queue = client.queue({ queueName });
const result = await queue.enqueueJSON({
api: "llm",
body: {
model: "meta-llama/Meta-Llama-3-8B-Instruct",
messages: [
{
role: "user",
content: "hello",
},
],
},
callback: "https://example.com/",
});
expect(result.messageId).toBeTruthy();
});
});
12 changes: 7 additions & 5 deletions src/client/queue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { PublishRequest, PublishResponse } from "./client";
import type { Requester } from "./http";
import { prefixHeaders, processHeaders } from "./utils";
import { getRequestPath, prefixHeaders, processHeaders } from "./utils";

export type QueueResponse = {
createdAt: number;
Expand Down Expand Up @@ -97,7 +97,7 @@ export class Queue {
}

const headers = processHeaders(request);
const destination = request.url ?? request.topic;
const destination = getRequestPath(request);
const response = await this.http.request<PublishResponse<TRequest>>({
path: ["v2", "enqueue", this.queueName, destination],
body: request.body,
Expand All @@ -111,9 +111,10 @@ export class Queue {
/**
* Enqueue a message to a queue, serializing the body to JSON.
*/
public async enqueueJSON<TBody = unknown>(
request: PublishRequest<TBody>
): Promise<PublishResponse<PublishRequest<TBody>>> {
public async enqueueJSON<
TBody = unknown,
TRequest extends PublishRequest<TBody> = PublishRequest<TBody>,
>(request: TRequest): Promise<PublishResponse<TRequest>> {
//@ts-expect-error caused by undici and bunjs type overlap
const headers = prefixHeaders(new Headers(request.headers));
headers.set("Content-Type", "application/json");
Expand All @@ -124,6 +125,7 @@ export class Queue {
headers,
});

// @ts-expect-error can't assign union type to conditional
return response;
}
}
4 changes: 4 additions & 0 deletions src/client/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,7 @@ export function processHeaders(request: PublishRequest) {

return headers;
}

export function getRequestPath(request: Pick<PublishRequest, "url" | "topic" | "api">): string {
return request.url ?? request.topic ?? `api/${request.api}`;
}