diff --git a/.changeset/curvy-knives-wink.md b/.changeset/curvy-knives-wink.md new file mode 100644 index 0000000000..bcb2b25df9 --- /dev/null +++ b/.changeset/curvy-knives-wink.md @@ -0,0 +1,5 @@ +--- +"@lynx-js/web-worker-rpc": patch +--- + +feat: support lazy message port assigning in web-worker-rpc diff --git a/packages/web-platform/web-worker-rpc/src/Rpc.ts b/packages/web-platform/web-worker-rpc/src/Rpc.ts index 18cbf08ddc..e72e9d9d63 100644 --- a/packages/web-platform/web-worker-rpc/src/Rpc.ts +++ b/packages/web-platform/web-worker-rpc/src/Rpc.ts @@ -38,6 +38,11 @@ type RetEndpoint = RpcEndpointBase< export class Rpc { private incId = 0; + #messageQueue: { + message: RpcMessageData | RpcMessageDataSync; + detail?: { transfer: Transferable[] }; + }[] = []; + #messageCache: Record< string, (RpcMessageData | RpcMessageDataSync)[] | undefined @@ -68,9 +73,36 @@ export class Rpc { * @param port one size of a message channel * @param name instance name */ - constructor(private port: MessagePort, private name: string) { - port.onmessage = (ev) => this.#onMessage(ev.data); + constructor(private port: MessagePort | undefined, private name: string) { + if (port) { + port.onmessage = (ev) => this.#onMessage(ev.data); + } + } + + setMessagePort(port: MessagePort): void { + if (this.port) { + throw new Error('Rpc port already set'); + } else { + this.port = port; + for (const item of this.#messageQueue) { + this.postMessage(item.message, item.detail); + } + this.#messageQueue = []; + port.onmessage = (ev) => this.#onMessage(ev.data); + } } + + postMessage(message: unknown, detail?: { transfer: Transferable[] }): void { + if (this.port) { + this.port.postMessage(message, detail); + } else { + this.#messageQueue.push({ + message: message as (RpcMessageData | RpcMessageDataSync), + detail, + }); + } + } + get nextRetId() { return `ret_${this.name}_${this.incId++}`; } @@ -403,7 +435,7 @@ export class Rpc { lock: lock, buf: sharedBuffer, }; - this.port.postMessage(message, { transfer }); + this.postMessage(message, { transfer }); Atomics.wait(lockViewer, 0, 0); if (lockViewer[0] === 2) { // error @@ -425,9 +457,13 @@ export class Rpc { } } else { if (endpoint.hasReturn) { - const { promise, resolve, reject } = Promise.withResolvers< - E['_TypeReturn'] - >(); + let promise: Promise, + resolve: (value: E['_TypeReturn']) => void, + reject: () => void; + promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); const retHandler = Rpc.createRetEndpoint(this.nextRetId); this.registerHandler(retHandler!, (returnValue, error) => { if (error) reject(); @@ -440,7 +476,7 @@ export class Rpc { retId: retHandler?.name, hasTransfer: endpoint.hasReturnTransfer, }; - this.port.postMessage(message, { transfer }); + this.postMessage(message, { transfer }); return promise; } else { const message: RpcMessageData = { @@ -448,7 +484,7 @@ export class Rpc { data: parameters, sync: false, }; - this.port.postMessage(message, { transfer }); + this.postMessage(message, { transfer }); } } } diff --git a/packages/web-platform/web-worker-rpc/test/endpoints.js b/packages/web-platform/web-worker-rpc/test/endpoints.js index cd2ddf8c49..c7fce69ead 100644 --- a/packages/web-platform/web-worker-rpc/test/endpoints.js +++ b/packages/web-platform/web-worker-rpc/test/endpoints.js @@ -53,3 +53,10 @@ export const changeLazyHandler = createRpcEndpoint( true, false, ); + +export const callbackifyEndpoint = createRpcEndpoint( + 'callbackify', + false, + true, + false, +); diff --git a/packages/web-platform/web-worker-rpc/test/rpc.test.ts b/packages/web-platform/web-worker-rpc/test/rpc.test.ts index 700e735303..5233319479 100644 --- a/packages/web-platform/web-worker-rpc/test/rpc.test.ts +++ b/packages/web-platform/web-worker-rpc/test/rpc.test.ts @@ -10,6 +10,7 @@ import { testLazy, addAsyncWithTransfer, changeLazyHandler, + callbackifyEndpoint, } from './endpoints'; import { Worker } from 'node:worker_threads'; @@ -108,4 +109,38 @@ describe('rpc tests', () => { expect(ret).toBeInstanceOf(ArrayBuffer); expect(ret.byteLength).toBe(100); }); + + test('callbackify', async () => { + const fn = rpc.createCallbackify(callbackifyEndpoint, 2); + // (a, b, callback) + const promise = new Promise((resolve) => { + fn(2, 3, (ret: number) => { + resolve(ret); + }); + }); + const ret = await promise; + expect(ret).toBe(5); + }); + + test('set message port', async () => { + const channel = new MessageChannel(); + // buffer flush + const rpc = new Rpc(undefined, 'test'); + rpc.postMessage({ + type: 'test', + }); + // flush + rpc.setMessagePort(channel.port1); + expect(() => { + rpc.setMessagePort(channel.port1); + }).toThrow('Rpc port already set'); + channel.port1.close(); + channel.port2.close(); + }); + + test('remove handler', () => { + const rpc = new Rpc(undefined, 'test'); + rpc.registerHandler(addAsync, async (a, b) => a + b); + rpc.removeHandler(addAsync); + }); }); diff --git a/packages/web-platform/web-worker-rpc/test/worker.js b/packages/web-platform/web-worker-rpc/test/worker.js index 5caba17669..77f83b2452 100644 --- a/packages/web-platform/web-worker-rpc/test/worker.js +++ b/packages/web-platform/web-worker-rpc/test/worker.js @@ -10,6 +10,7 @@ import { testLazy, addAsyncWithTransfer, changeLazyHandler, + callbackifyEndpoint, } from './endpoints.js'; console.log('worker started'); @@ -50,6 +51,9 @@ parentPort.on('message', async (ev) => { rpc.registerHandler(changeLazyHandler, () => { emptyObj.testLazy = () => 100; }); + rpc.registerHandler(callbackifyEndpoint, async (a, b) => { + return a + b; + }); privatePort.postMessage({ type: 'ready' }); } });