Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: enable transferring WASM modules #25314

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ using v8::String;
using v8::Value;
using v8::ValueDeserializer;
using v8::ValueSerializer;
using v8::WasmCompiledModule;

namespace node {
namespace worker {
Expand All @@ -43,13 +44,15 @@ namespace {
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
class DeserializerDelegate : public ValueDeserializer::Delegate {
public:
DeserializerDelegate(Message* m,
Environment* env,
const std::vector<MessagePort*>& message_ports,
const std::vector<Local<SharedArrayBuffer>>&
shared_array_buffers)
: message_ports_(message_ports),
shared_array_buffers_(shared_array_buffers) {}
DeserializerDelegate(
Message* m,
Environment* env,
const std::vector<MessagePort*>& message_ports,
const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
const std::vector<WasmCompiledModule::TransferrableModule>& wasm_modules)
: message_ports_(message_ports),
shared_array_buffers_(shared_array_buffers),
wasm_modules_(wasm_modules) {}

MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
// Currently, only MessagePort hosts objects are supported, so identifying
Expand All @@ -67,11 +70,19 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
return shared_array_buffers_[clone_id];
}

MaybeLocal<WasmCompiledModule> GetWasmModuleFromId(
Isolate* isolate, uint32_t transfer_id) override {
CHECK_LE(transfer_id, wasm_modules_.size());
return WasmCompiledModule::FromTransferrableModule(
isolate, wasm_modules_[transfer_id]);
}

ValueDeserializer* deserializer = nullptr;

private:
const std::vector<MessagePort*>& message_ports_;
const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
const std::vector<WasmCompiledModule::TransferrableModule>& wasm_modules_;
};

} // anonymous namespace
Expand Down Expand Up @@ -109,7 +120,8 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
}
shared_array_buffers_.clear();

DeserializerDelegate delegate(this, env, ports, shared_array_buffers);
DeserializerDelegate delegate(
this, env, ports, shared_array_buffers, wasm_modules_);
ValueDeserializer deserializer(
env->isolate(),
reinterpret_cast<const uint8_t*>(main_message_buf_.data),
Expand Down Expand Up @@ -143,6 +155,11 @@ void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
message_ports_.emplace_back(std::move(data));
}

uint32_t Message::AddWASMModule(WasmCompiledModule::TransferrableModule&& mod) {
wasm_modules_.emplace_back(std::move(mod));
return wasm_modules_.size() - 1;
}

namespace {

void ThrowDataCloneException(Environment* env, Local<String> message) {
Expand Down Expand Up @@ -202,6 +219,11 @@ class SerializerDelegate : public ValueSerializer::Delegate {
return Just(i);
}

Maybe<uint32_t> GetWasmModuleTransferId(
Isolate* isolate, Local<WasmCompiledModule> module) override {
return Just(msg_->AddWASMModule(module->GetTransferrableModule()));
}

void Finish() {
// Only close the MessagePort handles and actually transfer them
// once we know that serialization succeeded.
Expand Down
4 changes: 4 additions & 0 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class Message : public MemoryRetainer {
// Internal method of Message that is called once serialization finishes
// and that transfers ownership of `data` to this message.
void AddMessagePort(std::unique_ptr<MessagePortData>&& data);
// Internal method of Message that is called when a new WebAssembly.Module
// object is encountered in the incoming value's structure.
uint32_t AddWASMModule(v8::WasmCompiledModule::TransferrableModule&& mod);

// The MessagePorts that will be transferred, as recorded by Serialize().
// Used for warning user about posting the target MessagePort to itself,
Expand All @@ -65,6 +68,7 @@ class Message : public MemoryRetainer {
std::vector<MallocedBuffer<char>> array_buffer_contents_;
std::vector<SharedArrayBufferMetadataReference> shared_array_buffers_;
std::vector<std::unique_ptr<MessagePortData>> message_ports_;
std::vector<v8::WasmCompiledModule::TransferrableModule> wasm_modules_;

friend class MessagePort;
};
Expand Down
19 changes: 19 additions & 0 deletions test/parallel/test-worker-message-port-wasm-module.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Flags: --experimental-worker
'use strict';
const common = require('../common');
const assert = require('assert');
const fixtures = require('../common/fixtures');

const { Worker } = require('worker_threads');
const wasmModule = new WebAssembly.Module(fixtures.readSync('test.wasm'));

const worker = new Worker(`
const { parentPort } = require('worker_threads');
parentPort.once('message', ({ wasmModule }) => {
const instance = new WebAssembly.Instance(wasmModule);
parentPort.postMessage(instance.exports.addTwo(10, 20));
addaleax marked this conversation as resolved.
Show resolved Hide resolved
});
`, { eval: true });

worker.once('message', common.mustCall((num) => assert.strictEqual(num, 30)));
worker.postMessage({ wasmModule });
12 changes: 10 additions & 2 deletions test/parallel/test-worker-message-port-wasm-threads.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,23 @@ assert(buffer instanceof SharedArrayBuffer);
// stopped when we exit.
const worker = new Worker(`
const { parentPort } = require('worker_threads');

// Compile the same WASM module from its source bytes.
const wasmSource = new Uint8Array([${wasmSource.join(',')}]);
const wasmModule = new WebAssembly.Module(wasmSource);
const instance = new WebAssembly.Instance(wasmModule);
parentPort.postMessage(instance.exports.memory);

// Do the same thing, except we receive the WASM module via transfer.
parentPort.once('message', ({ wasmModule }) => {
const instance = new WebAssembly.Instance(wasmModule);
parentPort.postMessage(instance.exports.memory);
});
`, { eval: true });
worker.once('message', common.mustCall(({ buffer }) => {
worker.on('message', common.mustCall(({ buffer }) => {
assert(buffer instanceof SharedArrayBuffer);
worker.buf = buffer; // Basically just keep the reference to buffer alive.
}));
}, 2));
worker.once('exit', common.mustCall());
worker.postMessage({ wasmModule });
}