Skip to content

Commit

Permalink
lib: make AbortSignal cloneable/transferable
Browse files Browse the repository at this point in the history
Allows for using `AbortSignal` across worker threads and contexts.

```js
const ac = new AbortController();
const mc = new MessageChannel();
mc.port1.onmessage = ({ data }) => {
  data.addEventListener('abort', () => {
    console.log('aborted!');
  });
};
mc.port2.postMessage(ac.signal, [ac.signal]);
```

Signed-off-by: James M Snell <[email protected]>
  • Loading branch information
jasnell committed Dec 2, 2021
1 parent 722f113 commit 75915a1
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 5 deletions.
96 changes: 91 additions & 5 deletions lib/internal/abort_controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,37 @@ const {
setTimeout,
} = require('timers');

const kAborted = Symbol('kAborted');
const kReason = Symbol('kReason');
const kTimeout = Symbol('kTimeout');
const {
messaging_deserialize_symbol: kDeserialize,
messaging_transfer_symbol: kTransfer,
messaging_transfer_list_symbol: kTransferList
} = internalBinding('symbols');

const timeOutSignals = new SafeSet();
let MessageChannel;
let makeTransferable;

// Loading the MessageChannel and makeTransferable have to be done lazily
// because otherwise we'll end up with a require cycle that ends up with
// an incomplete initialization of abort_controller.

function lazyMessageChannel() {
MessageChannel ??= require('internal/worker/io').MessageChannel;
return new MessageChannel();
}

function lazMakeTransferable(obj) {
makeTransferable ??=
require('internal/worker/js_transferable').makeTransferable;
return makeTransferable(obj);
}

const clearTimeoutRegistry = new SafeFinalizationRegistry(clearTimeout);
const timeOutSignals = new SafeSet();

const kAborted = Symbol('kAborted');
const kReason = Symbol('kReason');
const kCloneData = Symbol('kCloneData');
const kTimeout = Symbol('kTimeout');

function customInspect(self, obj, depth, options) {
if (depth < 0)
Expand Down Expand Up @@ -165,7 +189,68 @@ class AbortSignal extends EventTarget {
timeOutSignals.delete(this);
}
}

[kTransfer]() {
validateAbortSignal(this);
const aborted = this.aborted;
if (aborted) {
const reason = this.reason;
return {
data: { aborted, reason },
deserializeInfo: 'internal/abort_controller:ClonedAbortSignal',
};
}

const { port1, port2 } = this[kCloneData];
this[kCloneData] = port2;

this.addEventListener('abort', () => {
port1.postMessage(this.reason);
port1.close();
}, { once: true });

return {
data: { port: port2 },
deserializeInfo: 'internal/abort_controller:ClonedAbortSignal',
};
}

[kTransferList]() {
if (!this.aborted) {
const { port1, port2 } = lazyMessageChannel();
port1.unref();
port2.unref();
this[kCloneData] = {
port1,
port2,
};
return [port2];
}
return [];
}

[kDeserialize]({ aborted, reason, port }) {
if (aborted) {
this[kAborted] = aborted;
this[kReason] = reason;
return;
}

port.onmessage = ({ data }) => {
abortSignal(this, data);
port.close();
port.onmessage = undefined;
};
// The receiving port, by itself, should never keep the event loop open.
// The unref() has to be called *after* setting the onmessage handler.
port.unref();
}
}

function ClonedAbortSignal() {
return createAbortSignal();
}
ClonedAbortSignal.prototype[kDeserialize] = () => {};

ObjectDefineProperties(AbortSignal.prototype, {
aborted: { enumerable: true }
Expand All @@ -185,7 +270,7 @@ function createAbortSignal(aborted = false, reason = undefined) {
ObjectSetPrototypeOf(signal, AbortSignal.prototype);
signal[kAborted] = aborted;
signal[kReason] = reason;
return signal;
return lazMakeTransferable(signal);
}

function abortSignal(signal, reason) {
Expand Down Expand Up @@ -252,4 +337,5 @@ module.exports = {
kAborted,
AbortController,
AbortSignal,
ClonedAbortSignal,
};
52 changes: 52 additions & 0 deletions test/parallel/test-abortsignal-cloneable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict';

const common = require('../common');
const { ok, strictEqual } = require('assert');

{
const ac = new AbortController();
const mc = new MessageChannel();
mc.port1.onmessage = common.mustCall(({ data }) => {
data.addEventListener('abort', common.mustCall(() => {
strictEqual(data.reason, 'boom');
}));
}, 2);
mc.port2.postMessage(ac.signal, [ac.signal]);

// Can be cloned/transferd multiple times and they all still work
mc.port2.postMessage(ac.signal, [ac.signal]);

mc.port2.close();

// Although we're using transfer semantics, the local AbortSignal
// is still usable locally.
ac.signal.addEventListener('abort', common.mustCall(() => {
strictEqual(ac.signal.reason, 'boom');
}));

ac.abort('boom');
}

{
const signal = AbortSignal.abort('boom');
ok(signal.aborted);
strictEqual(signal.reason, 'boom');
const mc = new MessageChannel();
mc.port1.onmessage = common.mustCall(({ data }) => {
ok(data instanceof AbortSignal);
ok(data.aborted);
strictEqual(data.reason, 'boom');
mc.port1.close();
});
mc.port2.postMessage(signal, [signal]);
}

{
// The cloned AbortSignal does not keep the event loop open
// waiting for the abort to be triggered.
const ac = new AbortController();
const mc = new MessageChannel();
mc.port1.onmessage = common.mustCall();
mc.port2.postMessage(ac.signal, [ac.signal]);
mc.port2.close();
}

0 comments on commit 75915a1

Please sign in to comment.