Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class RpcError extends Error {
export type RpcTransport = (
req: JsonRpcRequest,
abortSignal: AbortSignal
) => Promise<JsonRpcResponse>;
) => Promise<JsonRpcResponse | void>;

type RpcClientOptions =
| string
Expand Down Expand Up @@ -78,7 +78,7 @@ export function rpcClient<T extends object>(options: RpcClientOptions) {
args: any[],
signal: AbortSignal
) => {
const req = createRequest(method, args);
const req = createRequest(Date.now(), method, args);
const raw = await transport(serialize(req as any), signal);
const res = deserialize(raw);
if ("result" in res) {
Expand All @@ -90,6 +90,20 @@ export function rpcClient<T extends object>(options: RpcClientOptions) {
throw new TypeError("Invalid response");
};

async function sendNotification(method: string, ...args: any[]): Promise<Promise<void>> {
const req = createRequest(undefined, method, args);
const ac = new AbortController();
const promise = transport(serialize(req as any), ac.signal);
abortControllers.set(promise, ac);
promise
.finally(() => {
// Remove the
abortControllers.delete(promise);
})
.catch(() => {});
return promise as Promise<void>;
}

// Map of AbortControllers to abort pending requests
const abortControllers = new WeakMap<Promise<any>, AbortController>();

Expand All @@ -101,6 +115,7 @@ export function rpcClient<T extends object>(options: RpcClientOptions) {
const ac = abortControllers.get(promise);
ac?.abort();
},
$notify: sendNotification,
};

return new Proxy(target, {
Expand Down Expand Up @@ -130,13 +145,16 @@ export function rpcClient<T extends object>(options: RpcClientOptions) {
/**
* Create a JsonRpcRequest for the given method.
*/
export function createRequest(method: string, params?: any[]): JsonRpcRequest {
function createRequest(id: JsonRpcRequest['id'], method: string, params?: any[]): JsonRpcRequest {
const req: JsonRpcRequest = {
jsonrpc: "2.0",
id: Date.now(),
method,
};

if (id) {
req.id = id;
}

if (params?.length) {
req.params = removeTrailingUndefs(params);
}
Expand Down
25 changes: 25 additions & 0 deletions src/test/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,28 @@ tap.test("should not relay internal methods", async (t) => {
//@ts-expect-error
client[Symbol()];
});

tap.test("should not notify", async (t) => {
let client: ReturnType<typeof rpcClient<Service>>;

const request = new Promise((resolve) => {
client = rpcClient<Service>({
url: 'n/a',
transport: async (req) => {
resolve(req);
},
});
});

const res = await Promise.all([
client!.$notify("hello", "world"),
request,
]);
t.equal(res[0], undefined);
t.same(res[1], {
"jsonrpc": "2.0",
"method": "hello",
"params": ["world"],
// no id
});
});