Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/forty-cobras-think.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@lynx-js/web-worker-rpc": patch
"@lynx-js/web-constants": patch
---

feat: `createRpcEndpoint` adds a new parameter: `hasReturnTransfer`.

When `isSync`: false, `hasReturn`: true, you can add `transfer` to the callback postMessage created.

At this time, the return value structure of register-handler is changed: `{ data: unknown; transfer: Transferable[]; } | Promise<{ data: unknown; transfer: Transferable[];}>`.
2 changes: 1 addition & 1 deletion packages/web-platform/web-constants/src/endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ export const nativeModulesCallEndpoint = createRpcEndpoint<
export const napiModulesCallEndpoint = createRpcEndpoint<
[name: string, data: Cloneable, moduleName: string],
any
>('napiModulesCall', false, true);
>('napiModulesCall', false, true, true);

export const getCustomSectionsEndpoint = createRpcEndpoint<
[string],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,59 @@ import { createRpcEndpoint } from '@lynx-js/web-worker-rpc';
export const addAsync = createRpcEndpoint<[number, number], number>(
'add_async',
false,
true,
false,
);

export const addSync = createRpcEndpoint<[number, number], number>(
'add_sync',
true,
true,
false,
16,
);

export const consoleLog = createRpcEndpoint<[string]>('console_log', false);
export const consoleLog = createRpcEndpoint<[string]>(
'console_log',
false,
true,
false,
);

export const consoleLogSync = createRpcEndpoint<[string]>(
'console_log_sync',
true,
false,
);

export const throwError = createRpcEndpoint<[]>('throw_async', false);
export const throwError = createRpcEndpoint<[]>(
'throw_async',
false,
true,
false,
);

export const throwErrorSync = createRpcEndpoint<[]>('throw_sync', true, false);

export const wait = createRpcEndpoint<[number]>('wait_async', false);
export const wait = createRpcEndpoint<[number]>(
'wait_async',
false,
true,
false,
);

export const waitSync = createRpcEndpoint<[number]>('wait_async', true, false);

export const testLazy = createRpcEndpoint<[number, number], number>(
'add_lazy',
false,
true,
false,
);

export const addAsyncWithTransfer = createRpcEndpoint(
'add_async_with_transfer',
false,
true,
true,
);
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
throwErrorSync,
wait,
waitSync,
addAsyncWithTransfer,
} from './endpoints.ts';

const channel = new MessageChannel();
Expand Down Expand Up @@ -45,5 +46,11 @@ const emptyObj: any = {};
Object.assign(globalThis, { emptyObj });
rpc.registerHandlerLazy(testLazy, emptyObj, 'testLazy');
emptyObj.testLazy = (a, b) => a + b;
rpc.registerHandler(addAsyncWithTransfer, () => {
const ele = document.createElement('canvas') as HTMLCanvasElement;
document.body.appendChild(ele);
const offscreen = ele.transferControlToOffscreen();
return { data: offscreen, transfer: [offscreen] };
});

worker.postMessage({ port: channel.port2 }, { transfer: [channel.port2] });
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
wait,
waitSync,
testLazy,
addAsyncWithTransfer,
} from './endpoints.ts';
globalThis.onmessage = (ev) => {
const port = ev.data.port as MessagePort;
Expand All @@ -23,5 +24,6 @@ globalThis.onmessage = (ev) => {
wait: rpc.createCall(wait),
waitSync: rpc.createCall(waitSync),
testLazy: rpc.createCall(testLazy),
addAsyncWithTransfer: rpc.createCall(addAsyncWithTransfer),
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async function run() {
};
lynxView.onNapiModulesCall = (name, data, moduleName) => {
if (name === 'getColor' && moduleName === 'color_methods') {
return data.color;
return { data: data.color };
}
};
lynxView.addEventListener('error', () => {
Expand Down
24 changes: 24 additions & 0 deletions packages/web-platform/web-tests/tests/rpc.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,28 @@ test.describe('rpc tests', () => {
});
expect(ret2).toBe(100);
});

test('async return with transfer', async ({ page }) => {
const worker = page.workers().pop()!;
await worker.evaluate(async () => {
// @ts-ignore
const offscreen = await globalThis.addAsyncWithTransfer();
offscreen.width = 100;
offscreen.height = 100;
const ctx = offscreen.getContext('2d');
ctx.fillStyle = 'red';
ctx.fillRect(0, 0, 100, 100);
});
await waitImpl(100);
expect(
await page.evaluate(() => {
return document.querySelector('canvas')?.width === 100;
}),
).toBeTruthy;
expect(
await page.evaluate(() => {
return document.querySelector('canvas')?.height === 100;
}),
).toBeTruthy;
});
});
59 changes: 52 additions & 7 deletions packages/web-platform/web-worker-rpc/src/Rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
RpcEndpoint,
RpcEndpointAsync,
RpcEndpointAsyncVoid,
RpcEndpointAsyncWithTransfer,
RpcEndpointBase,
RpcEndpointSync,
RpcEndpointSyncVoid,
Expand All @@ -14,6 +15,7 @@ interface RpcMessageData {
name: string;
data: unknown[];
sync: false;
hasTransfer?: boolean;
}
interface RpcMessageDataSync {
name: string;
Expand Down Expand Up @@ -44,7 +46,22 @@ export class Rpc {
#textDecoder = new TextDecoder();
#handlerMap = new Map<
string,
(...args: any[]) => unknown | Promise<unknown>
| ((
...args: any[]
) =>
| unknown
| Promise<unknown>)
| ((
...args: any[]
) =>
| {
data: unknown;
transfer: Transferable[];
}
| Promise<{
data: unknown;
transfer: Transferable[];
}>)
>();

/**
Expand All @@ -71,7 +88,9 @@ export class Rpc {
} as unknown as RetEndpoint<Return>;
}

#onMessage: (message: RpcMessageData | RpcMessageDataSync) => void = async (
#onMessage: (
message: RpcMessageData | RpcMessageDataSync,
) => void = async (
message,
) => {
console.warn(`[rpc] on ${this.name} received ${message.name}`, message);
Expand All @@ -84,7 +103,19 @@ export class Rpc {
? Rpc.createRetEndpoint(message.retId)
: undefined;
try {
const retData = await handler(...message.data);
const result = await handler(...message.data);
let retData = undefined, transfer: Transferable[] = [];
if (message.sync) {
retData = result;
} else if (message.hasTransfer) {
({ data: retData, transfer } = (result || {}) as {
data: unknown;
transfer: Transferable[];
});
} else {
retData = result;
}

if (message.sync) {
if (message.buf) {
const retStr = JSON.stringify(retData);
Expand All @@ -105,7 +136,7 @@ export class Rpc {
this.invoke<RetEndpoint<unknown>>(replyTempEndpoint!, [
retData,
false,
]);
], transfer || []);
}
}
} catch (e) {
Expand Down Expand Up @@ -169,11 +200,24 @@ export class Rpc {
| ((...args: T['_TypeParameters']) => T['_TypeReturn'])
| ((...args: T['_TypeParameters']) => Promise<T['_TypeReturn']>),
): void;
registerHandler<T extends RpcEndpoint<any[], any>>(
registerHandler<T extends RpcEndpointAsyncWithTransfer<any[], any>>(
endpoint: T,
handler:
| ((...args: T['_TypeParameters']) => T['_TypeReturn'])
| ((...args: T['_TypeParameters']) => Promise<T['_TypeReturn']>),
| ((
...args: T['_TypeParameters']
) => { data: T['_TypeReturn']; transfer: Transferable[] })
| ((
...args: T['_TypeParameters']
) => Promise<{ data: T['_TypeReturn']; transfer: Transferable[] }>),
): void;
registerHandler<T extends RpcEndpoint<any[], any>>(
endpoint: T,
handler: (
...args: T['_TypeParameters']
) => void | T['_TypeReturn'] | {
data: T['_TypeReturn'];
transfer: Transferable[];
} | Promise<{ data: T['_TypeReturn']; transfer: Transferable[] }>,
): void {
this.#handlerMap.set(endpoint.name, handler);
const currentCache = this.#messageCache[endpoint.name];
Expand Down Expand Up @@ -394,6 +438,7 @@ export class Rpc {
data: parameters,
sync: false,
retId: retHandler?.name,
hasTransfer: endpoint.hasReturnTransfer,
};
this.port.postMessage(message, { transfer });
return promise;
Expand Down
40 changes: 37 additions & 3 deletions packages/web-platform/web-worker-rpc/src/RpcEndpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,20 @@ export interface RpcEndpointSync<Parameters extends any[], Return>
readonly bufferSize: number;
}

export type RpcEndpointAsync<Parameters extends any[], Return> =
RpcEndpointBase<Parameters, Return, false, true>;
export interface RpcEndpointAsync<
Parameters extends any[],
Return,
> extends RpcEndpointBase<Parameters, Return, false, true> {
readonly hasReturnTransfer: false;
}

export interface RpcEndpointAsyncWithTransfer<
Parameters extends any[],
Return,
> extends RpcEndpointBase<Parameters, Return, true, true> {
readonly hasReturnTransfer: true;
}

export type RpcEndpointAsyncVoid<Parameters extends any[]> = RpcEndpointBase<
Parameters,
void,
Expand Down Expand Up @@ -64,13 +76,20 @@ export interface RpcEndpointBase<
* So you should ensure this size is enough for your stringified return value.
*/
readonly bufferSize: never | number;
/**
* @public
* Make the message invoke created by hasReturn support transfer.
* Only valid for async and hasReturn endpoints
*/
readonly hasReturnTransfer: never | boolean;
}

export type RpcEndpoint<Parameters extends any[], Return> =
| RpcEndpointSyncVoid<Parameters>
| RpcEndpointSync<Parameters, Return>
| RpcEndpointAsync<Parameters, Return>
| RpcEndpointAsyncVoid<Parameters>;
| RpcEndpointAsyncVoid<Parameters>
| RpcEndpointAsyncWithTransfer<Parameters, Return>;

export function createRpcEndpoint<Parameters extends any[], Return = void>(
name: string,
Expand All @@ -82,6 +101,18 @@ export function createRpcEndpoint<Parameters extends any[], Return = void>(
isSync: false,
hasReturn: true,
): RpcEndpointAsync<Parameters, Return>;
export function createRpcEndpoint<Parameters extends any[], Return = void>(
name: string,
isSync: false,
hasReturn: true,
hasReturnTransfer: false,
): RpcEndpointAsync<Parameters, Return>;
export function createRpcEndpoint<Parameters extends any[], Return = void>(
name: string,
isSync: false,
hasReturn: true,
hasReturnTransfer: true,
): RpcEndpointAsyncWithTransfer<Parameters, Return>;
export function createRpcEndpoint<Parameters extends any[]>(
name: string,
isSync: true,
Expand All @@ -91,18 +122,21 @@ export function createRpcEndpoint<Parameters extends any[], Return>(
name: string,
isSync: true,
hasReturn: true,
hasReturnTransfer: false,
bufferSize: number,
): RpcEndpointSync<Parameters, Return>;
export function createRpcEndpoint(
name: string,
isSync: boolean,
hasReturn: boolean = true,
hasReturnTransfer: boolean = false,
bufferSize?: number,
) {
return {
name,
isSync,
hasReturn,
hasReturnTransfer,
bufferSize,
} as any;
}
Loading