Skip to content

Commit 0ed74e0

Browse files
WIP: Sending commands from the worker back to the master
1 parent 5c62c60 commit 0ed74e0

File tree

8 files changed

+81
-15
lines changed

8 files changed

+81
-15
lines changed

examples/simple-url-call/src/flux/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ pub fn get_call_result_size() -> u32 {
99
}
1010

1111
pub fn call() {
12-
let mut result_data: Vec<u8> = Vec::with_capacity(get_call_result_size() as usize);
13-
1412
let call_data = "This is some call data";
15-
13+
1614
unsafe {
1715
raw::call(call_data.as_ptr(), call_data.len() as i32);
1816
}
17+
18+
let mut result_data: Vec<u8> = Vec::with_capacity(get_call_result_size() as usize);
1919
}

src/Process.worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { VmContext } from "./models/Context";
22
import { MessageType } from "./models/WorkerMessage";
3-
import { executeWasm } from "./services/WasmService";
3+
import { executeWasm } from "./services/VirtualMachineService";
44
import { extractMessageFromEvent, workerAddEventListener, selfPostMessage } from "./services/WorkerService";
55

66
workerAddEventListener('message', async (event: MessageEvent) => {

src/ProcessMethods.ts

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,42 @@
1+
import fetch from 'node-fetch';
12
import { MethodCallMessage } from "./models/WorkerMessage";
3+
import { sendBufferLengthToWorker, storeAndNotify } from "./services/WorkerService";
24

35
class ProcessMethods {
6+
private result?: Buffer;
7+
48
constructor(private notifierBuffer: SharedArrayBuffer) {}
59

10+
private async call(args: any[]) {
11+
try {
12+
const response = await fetch('https://pokeapi.co/api/v2/pokemon/ditto');
13+
const text = await response.text();
14+
15+
const result = JSON.stringify({
16+
type: 'Result',
17+
status: response.status,
18+
result: text,
19+
});
20+
21+
this.result = Buffer.from(result);
22+
sendBufferLengthToWorker(this.notifierBuffer, this.result);
23+
} catch(error) {
24+
console.error('[call]', error);
25+
}
26+
}
27+
28+
private async writeResultToBuffer(args: any[]) {
29+
const u8WriteBuffer = new Uint8Array(args[0] as SharedArrayBuffer);
30+
u8WriteBuffer.set(this.result ?? Buffer.from([]));
31+
storeAndNotify(this.notifierBuffer, 0, 1);
32+
}
33+
634
executeMethodCallMessage(message: MethodCallMessage) {
7-
35+
if (message.value.methodName === 'call') {
36+
this.call(message.value.args);
37+
} else if (message.value.methodName === 'writeResultToBuffer') {
38+
this.writeResultToBuffer(message.value.args);
39+
}
840
}
941
}
1042

src/models/Context.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@ export class VmContext {
2121
gasLimit: string;
2222
args: string[];
2323
env: ProcessEnvOptions['env'];
24+
notifierBuffer: SharedArrayBuffer;
2425

25-
constructor(context: Context) {
26+
constructor(context: WorkerContext) {
2627
this.gasUsed = new Big(context.startingGas ?? '0');
2728
this.binary = context.binary;
2829
this.randomSeed = context.randomSeed;
2930
this.gasLimit = context.gasLimit;
3031
this.args = context.args;
32+
this.notifierBuffer = context.notifierBuffer;
3133
this.env = {
3234
...context.env,
3335
GAS_LIMIT: this.gasLimit,

src/services/HexService.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/**
2+
* Converts numbers to a hex format
3+
* Makes sure that small numbers are in 2 byte pairs
4+
*
5+
* @export
6+
* @param {number} num
7+
* @returns
8+
*/
9+
export function numberToHex(num: number, padBytes: number = 2): string {
10+
let result = num.toString(16);
11+
12+
if ((result.length % 2) > 0) {
13+
result = '0' + result;
14+
}
15+
16+
if (result.length < padBytes) {
17+
const missingAmountOfBytes = new Array(padBytes - result.length).fill('0');
18+
result = missingAmountOfBytes.join('') + result;
19+
}
20+
21+
return result;
22+
}

src/services/WasmService.ts renamed to src/services/VirtualMachineService.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import { VmContext } from "../models/Context";
66
import { ExecuteResult } from '../models/ExecuteResult';
77
import { generateFixedBuffer } from './CryptoService';
88
import WasmImports from './WasmImports';
9-
import createWasmImports from './WasmImports';
109

1110
const metering = require('wasm-metering');
1211

@@ -23,10 +22,6 @@ export async function executeWasm(context: VmContext): Promise<ExecuteResult> {
2322

2423
try {
2524
const random = alea(context.randomSeed);
26-
const memory = new WebAssembly.Memory({
27-
initial: 5000,
28-
});
29-
3025
const wasi = new WASI({
3126
args: context.args,
3227
env: context.env,

src/services/WasmImports.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,24 @@
11
import { VmContext } from '../models/Context';
2+
import { callMethodOnMainThread } from './WorkerService';
23

34
export default class WasmImports {
45
private memory?: WebAssembly.Memory;
56
private lastCallResult: {} = {};
67

78
constructor(
89
private vmContext: VmContext,
9-
private wasiImports: Record<string, Record<string, Function>>
10+
private wasiImports: Record<string, Record<string, Function>>,
1011
) {}
1112

1213
private call(callDataOffset: number, callDataLength: number) {
1314
if (!this.memory) throw new ReferenceError('ERR_NO_MEMORY');
1415
const callDataBytes = new Uint8Array(this.memory.buffer, callDataOffset, callDataLength);
15-
const callDataString = new TextDecoder().decode(callDataBytes);;
16-
16+
const callDataString = new TextDecoder().decode(callDataBytes);
17+
const resultBuffer = callMethodOnMainThread(this.vmContext.notifierBuffer, 'call', [callDataString]);
18+
19+
if (resultBuffer) {
20+
console.log('[] resultBuffer -> ', resultBuffer.toString());
21+
}
1722
}
1823

1924
public setMemory(memory: WebAssembly.Memory) {

src/services/WorkerService.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { MessageType, RequestMessage } from "../models/WorkerMessage";
22
import { isNodeJs } from "./EnvironmentService";
3+
import { numberToHex } from "./HexService";
34

45
export function createWorker(url: URL, options?: any): Worker {
56
let WorkerConstructor: any = null;
@@ -90,7 +91,8 @@ export function callMethodOnMainThread(notifierBuffer: SharedArrayBuffer, method
9091

9192
// The main thread only writes the length of the result inside the notifierbuffer
9293
// We create a big enough shared buffer so the main thread can write the full result
93-
const bufferLength = parseInt(Buffer.from(notifierBuffer).toString('hex'), 16);
94+
const writtenToBuffer = Buffer.from(notifierBuffer).slice(0); // slice 1 of since that is our notify index
95+
const bufferLength = parseInt(writtenToBuffer.toString('hex'), 16);
9496

9597
if (bufferLength === 0) {
9698
return null;
@@ -110,4 +112,12 @@ export function callMethodOnMainThread(notifierBuffer: SharedArrayBuffer, method
110112
resetSharedBuffer(notifierBuffer, 0);
111113

112114
return Buffer.from(valueBuffer);
115+
}
116+
117+
export function sendBufferLengthToWorker(notifierBuffer: SharedArrayBuffer, value?: Buffer) {
118+
const lengthInHex = numberToHex(value?.length ?? 0, (notifierBuffer.byteLength * 2));
119+
const u8NotifierBuffer = new Uint8Array(notifierBuffer);
120+
u8NotifierBuffer.set(Buffer.from(lengthInHex, 'hex'), 0);
121+
122+
storeAndNotify(notifierBuffer, 0, 1);
113123
}

0 commit comments

Comments
 (0)