Skip to content

Commit

Permalink
worker,fs: make FileHandle transferable
Browse files Browse the repository at this point in the history
Allow passing `FileHandle` instances in the transfer list
of a `.postMessage()` call.

PR-URL: #33772
Backport-PR-URL: #33965
Reviewed-By: Benjamin Gruenbaum <[email protected]>
  • Loading branch information
addaleax committed Sep 28, 2020
1 parent 0d35eaa commit dd51ba3
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 4 deletions.
12 changes: 10 additions & 2 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ are part of the channel.
### `port.postMessage(value[, transferList])`
<!-- YAML
added: v10.5.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/33772
description: Added `FileHandle` to the list of transferable types.
-->

* `value` {any}
Expand All @@ -335,7 +339,8 @@ In particular, the significant differences to `JSON` are:
* `value` may contain typed arrays, both using `ArrayBuffer`s
and `SharedArrayBuffer`s.
* `value` may contain [`WebAssembly.Module`][] instances.
* `value` may not contain native (C++-backed) objects other than `MessagePort`s.
* `value` may not contain native (C++-backed) objects other than `MessagePort`s
and [`FileHandle`][]s.

```js
const { MessageChannel } = require('worker_threads');
Expand All @@ -349,7 +354,8 @@ circularData.foo = circularData;
port2.postMessage(circularData);
```

`transferList` may be a list of `ArrayBuffer` and `MessagePort` objects.
`transferList` may be a list of [`ArrayBuffer`][], [`MessagePort`][] and
[`FileHandle`][] objects.
After transferring, they will not be usable on the sending side of the channel
anymore (even if they are not contained in `value`). Unlike with
[child processes][], transferring handles such as network sockets is currently
Expand Down Expand Up @@ -813,13 +819,15 @@ active handle in the event system. If the worker is already `unref()`ed calling

[`'close'` event]: #worker_threads_event_close
[`'exit'` event]: #worker_threads_event_exit
[`ArrayBuffer`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/ArrayBuffer
[`AsyncResource`]: async_hooks.html#async_hooks_class_asyncresource
[`Buffer`]: buffer.html
[`Buffer.allocUnsafe()`]: buffer.html#buffer_static_method_buffer_allocunsafe_size
[`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`]: errors.html#errors_err_missing_message_port_in_transfer_list
[`ERR_WORKER_NOT_RUNNING`]: errors.html#ERR_WORKER_NOT_RUNNING
[`EventEmitter`]: events.html
[`EventTarget`]: https://developer.mozilla.org/en-US/docs/Web/API/EventTarget
[`FileHandle`]: fs.html#fs_class_filehandle
[`MessagePort`]: #worker_threads_class_messageport
[`SharedArrayBuffer`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer
[`Uint8Array`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array
Expand Down
28 changes: 26 additions & 2 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,17 @@ const { promisify } = require('internal/util');
const kHandle = Symbol('kHandle');
const kFd = Symbol('kFd');
const { kUsePromises } = binding;
const {
JSTransferable, kDeserialize, kTransfer, kTransferList
} = require('internal/worker/js_transferable');

const getDirectoryEntriesPromise = promisify(getDirents);

class FileHandle {
class FileHandle extends JSTransferable {
constructor(filehandle) {
super();
this[kHandle] = filehandle;
this[kFd] = filehandle.fd;
this[kFd] = filehandle ? filehandle.fd : -1;
}

getAsyncId() {
Expand Down Expand Up @@ -131,6 +135,26 @@ class FileHandle {
this[kFd] = -1;
return this[kHandle].close();
}

[kTransfer]() {
const handle = this[kHandle];
this[kFd] = -1;
this[kHandle] = null;

return {
data: { handle },
deserializeInfo: 'internal/fs/promises:FileHandle'
};
}

[kTransferList]() {
return [ this[kHandle] ];
}

[kDeserialize]({ handle }) {
this[kHandle] = handle;
this[kFd] = handle.fd;
}
}

function validateFileHandle(handle) {
Expand Down
8 changes: 8 additions & 0 deletions lib/internal/worker/js_transferable.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
const { Error } = primordials;
const {
messaging_deserialize_symbol,
messaging_transfer_symbol,
Expand All @@ -17,6 +18,13 @@ function setup() {
setDeserializerCreateObjectFunction((deserializeInfo) => {
const [ module, ctor ] = deserializeInfo.split(':');
const Ctor = require(module)[ctor];
if (typeof Ctor !== 'function' ||
!(Ctor.prototype instanceof JSTransferable)) {
// Not one of the official errors because one should not be able to get
// here without messing with Node.js internals.
// eslint-disable-next-line no-restricted-syntax
throw new Error(`Unknown deserialize spec ${deserializeInfo}`);
}
return new Ctor();
});
}
Expand Down
34 changes: 34 additions & 0 deletions src/node_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,40 @@ void FileHandle::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("current_read", current_read_);
}

FileHandle::TransferMode FileHandle::GetTransferMode() const {
return reading_ || closing_ || closed_ ?
TransferMode::kUntransferable : TransferMode::kTransferable;
}

std::unique_ptr<worker::TransferData> FileHandle::TransferForMessaging() {
CHECK_NE(GetTransferMode(), TransferMode::kUntransferable);
auto ret = std::make_unique<TransferData>(fd_);
closed_ = true;
return std::move(ret);
}

FileHandle::TransferData::TransferData(int fd) : fd_(fd) {}

FileHandle::TransferData::~TransferData() {
if (fd_ > 0) {
uv_fs_t close_req;
CHECK_EQ(0, uv_fs_close(nullptr, &close_req, fd_, nullptr));
uv_fs_req_cleanup(&close_req);
}
}

BaseObjectPtr<BaseObject> FileHandle::TransferData::Deserialize(
Environment* env_,
v8::Local<v8::Context> context,
std::unique_ptr<worker::TransferData> self) {
Environment* env = Environment::GetCurrent(context);
if (env == nullptr) return {};

int fd = fd_;
fd_ = -1;
return BaseObjectPtr<BaseObject> { FileHandle::New(env, fd) };
}

// Close the file descriptor if it hasn't already been closed. A process
// warning will be emitted using a SetImmediate to avoid calling back to
// JS during GC. If closing the fd fails at this point, a fatal exception
Expand Down
22 changes: 22 additions & 0 deletions src/node_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "node.h"
#include "aliased_buffer.h"
#include "node_messaging.h"
#include "stream_base.h"
#include <iostream>

Expand Down Expand Up @@ -250,7 +251,28 @@ class FileHandle final : public AsyncWrap, public StreamBase {
FileHandle(const FileHandle&&) = delete;
FileHandle& operator=(const FileHandle&&) = delete;

TransferMode GetTransferMode() const override;
std::unique_ptr<worker::TransferData> TransferForMessaging() override;

private:
class TransferData : public worker::TransferData {
public:
explicit TransferData(int fd);
~TransferData();

BaseObjectPtr<BaseObject> Deserialize(
Environment* env,
v8::Local<v8::Context> context,
std::unique_ptr<worker::TransferData> self) override;

SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(FileHandleTransferData)
SET_SELF_SIZE(TransferData)

private:
int fd_;
};

FileHandle(Environment* env, v8::Local<v8::Object> obj, int fd);

// Synchronous close that emits a warning
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const fs = require('fs').promises;
const { MessageChannel } = require('worker_threads');
const { once } = require('events');

// Test that overriding the internal kTransfer method of a JSTransferable does
// not enable loading arbitrary code from internal Node.js core modules.

(async function() {
const fh = await fs.open(__filename);
assert.strictEqual(fh.constructor.name, 'FileHandle');

const kTransfer = Object.getOwnPropertySymbols(Object.getPrototypeOf(fh))
.filter((symbol) => symbol.description === 'messaging_transfer_symbol')[0];
assert.strictEqual(typeof kTransfer, 'symbol');
fh[kTransfer] = () => {
return {
data: '✨',
deserializeInfo: 'net:Socket'
};
};

const { port1, port2 } = new MessageChannel();
port1.postMessage(fh, [ fh ]);
port2.on('message', common.mustNotCall());

const [ exception ] = await once(process, 'uncaughtException');

assert.strictEqual(exception.message, 'Unknown deserialize spec net:Socket');
port2.close();
})().then(common.mustCall());
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const fs = require('fs').promises;
const { MessageChannel } = require('worker_threads');
const { once } = require('events');

// Test that overriding the internal kTransfer method of a JSTransferable does
// not enable loading arbitrary code from the disk.

module.exports = {
NotARealClass: common.mustNotCall()
};

(async function() {
const fh = await fs.open(__filename);
assert.strictEqual(fh.constructor.name, 'FileHandle');

const kTransfer = Object.getOwnPropertySymbols(Object.getPrototypeOf(fh))
.filter((symbol) => symbol.description === 'messaging_transfer_symbol')[0];
assert.strictEqual(typeof kTransfer, 'symbol');
fh[kTransfer] = () => {
return {
data: '✨',
deserializeInfo: `${__filename}:NotARealClass`
};
};

const { port1, port2 } = new MessageChannel();
port1.postMessage(fh, [ fh ]);
port2.on('message', common.mustNotCall());

const [ exception ] = await once(process, 'uncaughtException');

assert.match(exception.message, /Missing internal module/);
port2.close();
})().then(common.mustCall());
65 changes: 65 additions & 0 deletions test/parallel/test-worker-message-port-transfer-filehandle.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const fs = require('fs').promises;
const vm = require('vm');
const { MessageChannel, moveMessagePortToContext } = require('worker_threads');
const { once } = require('events');

(async function() {
const fh = await fs.open(__filename);

const { port1, port2 } = new MessageChannel();

assert.throws(() => {
port1.postMessage(fh);
}, {
// See the TODO about error code in node_messaging.cc.
code: 'ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST'
});

// Check that transferring FileHandle instances works.
assert.notStrictEqual(fh.fd, -1);
port1.postMessage(fh, [ fh ]);
assert.strictEqual(fh.fd, -1);

const [ fh2 ] = await once(port2, 'message');
assert.strictEqual(Object.getPrototypeOf(fh2), Object.getPrototypeOf(fh));

assert.deepStrictEqual(await fh2.readFile(), await fs.readFile(__filename));
await fh2.close();

assert.rejects(() => fh.readFile(), { code: 'EBADF' });
})().then(common.mustCall());

(async function() {
// Check that there is no crash if the message is never read.
const fh = await fs.open(__filename);

const { port1 } = new MessageChannel();

assert.notStrictEqual(fh.fd, -1);
port1.postMessage(fh, [ fh ]);
assert.strictEqual(fh.fd, -1);
})().then(common.mustCall());

(async function() {
// Check that in the case of a context mismatch the message is discarded.
const fh = await fs.open(__filename);

const { port1, port2 } = new MessageChannel();

const ctx = vm.createContext();
const port2moved = moveMessagePortToContext(port2, ctx);
port2moved.onmessage = common.mustCall((msgEvent) => {
assert.strictEqual(msgEvent.data, 'second message');
port1.close();
});
port2moved.start();

assert.notStrictEqual(fh.fd, -1);
port1.postMessage(fh, [ fh ]);
assert.strictEqual(fh.fd, -1);

port1.postMessage('second message');
})().then(common.mustCall());

0 comments on commit dd51ba3

Please sign in to comment.