Skip to content

Commit

Permalink
stream: reduce overhead of transfer
Browse files Browse the repository at this point in the history
PR-URL: nodejs#50107
Reviewed-By: Yagiz Nizipli <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Stephen Belanger <[email protected]>
  • Loading branch information
H4ad authored and alexfernandez committed Nov 1, 2023
1 parent 93f690a commit 940ff7c
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 85 deletions.
52 changes: 52 additions & 0 deletions benchmark/webstreams/js_transfer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict';

const common = require('../common.js');

const { MessageChannel } = require('worker_threads');
const { WritableStream, TransformStream, ReadableStream } = require('stream/web');

const bench = common.createBenchmark(main, {
payload: ['WritableStream', 'ReadableStream', 'TransformStream'],
n: [1e4],
});

function main({ n, payload: payloadType }) {
let createPayload;
let messages = 0;

switch (payloadType) {
case 'WritableStream':
createPayload = () => new WritableStream();
break;
case 'ReadableStream':
createPayload = () => new ReadableStream();
break;
case 'TransformStream':
createPayload = () => new TransformStream();
break;
default:
throw new Error('Unsupported payload type');
}

const { port1, port2 } = new MessageChannel();

port2.onmessage = onMessage;

function onMessage() {
if (messages++ === n) {
bench.end(n);
port1.close();
} else {
send();
}
}

function send() {
const stream = createPayload();

port1.postMessage(stream, [stream]);
}

bench.start();
send();
}
48 changes: 29 additions & 19 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ const {
PromisePrototypeThen,
PromiseResolve,
PromiseReject,
ReflectConstruct,
SafePromiseAll,
Symbol,
SymbolAsyncIterator,
Expand Down Expand Up @@ -642,26 +641,37 @@ ObjectDefineProperties(ReadableStream.prototype, {
[SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStream.name),
});

function TransferredReadableStream() {
return ReflectConstruct(
function() {
markTransferMode(this, false, true);
this[kType] = 'ReadableStream';
this[kState] = {
disturbed: false,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port: undefined,
promise: undefined,
},
};
this[kIsClosedPromise] = createDeferredPromise();
function InternalTransferredReadableStream() {
markTransferMode(this, false, true);
this[kType] = 'ReadableStream';
this[kState] = {
disturbed: false,
reader: undefined,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port1: undefined,
port2: undefined,
promise: undefined,
},
[], ReadableStream);
};

this[kIsClosedPromise] = createDeferredPromise();
}

ObjectSetPrototypeOf(InternalTransferredReadableStream.prototype, ReadableStream.prototype);
ObjectSetPrototypeOf(InternalTransferredReadableStream, ReadableStream);

function TransferredReadableStream() {
const stream = new InternalTransferredReadableStream();

stream.constructor = ReadableStream;

return stream;
}

TransferredReadableStream.prototype[kDeserialize] = () => {};

class ReadableStreamBYOBRequest {
Expand Down
44 changes: 26 additions & 18 deletions lib/internal/webstreams/transformstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ const {
FunctionPrototypeBind,
FunctionPrototypeCall,
ObjectDefineProperties,
ObjectSetPrototypeOf,
PromisePrototypeThen,
PromiseResolve,
ReflectConstruct,
SymbolToStringTag,
Symbol,
} = primordials;
Expand Down Expand Up @@ -247,25 +247,33 @@ ObjectDefineProperties(TransformStream.prototype, {
[SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStream.name),
});

function TransferredTransformStream() {
return ReflectConstruct(
function() {
markTransferMode(this, false, true);
this[kType] = 'TransformStream';
this[kState] = {
readable: undefined,
writable: undefined,
backpressure: undefined,
backpressureChange: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
controller: undefined,
};
function InternalTransferredTransformStream() {
markTransferMode(this, false, true);
this[kType] = 'TransformStream';
this[kState] = {
readable: undefined,
writable: undefined,
backpressure: undefined,
backpressureChange: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
[], TransformStream);
controller: undefined,
};
}

ObjectSetPrototypeOf(InternalTransferredTransformStream.prototype, TransformStream.prototype);
ObjectSetPrototypeOf(InternalTransferredTransformStream, TransformStream);

function TransferredTransformStream() {
const stream = new InternalTransferredTransformStream();

stream.constructor = TransformStream;

return stream;
}

TransferredTransformStream.prototype[kDeserialize] = () => {};

class TransformStreamDefaultController {
Expand Down
104 changes: 56 additions & 48 deletions lib/internal/webstreams/writablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ const {
FunctionPrototypeBind,
FunctionPrototypeCall,
ObjectDefineProperties,
ObjectSetPrototypeOf,
PromisePrototypeThen,
PromiseResolve,
PromiseReject,
ReflectConstruct,
Symbol,
SymbolToStringTag,
} = primordials;
Expand Down Expand Up @@ -326,55 +326,63 @@ ObjectDefineProperties(WritableStream.prototype, {
[SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStream.name),
});

function TransferredWritableStream() {
return ReflectConstruct(
function() {
markTransferMode(this, false, true);
this[kType] = 'WritableStream';
this[kState] = {
close: createDeferredPromise(),
closeRequest: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
inFlightWriteRequest: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
inFlightCloseRequest: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
pendingAbortRequest: {
abort: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
reason: undefined,
wasAlreadyErroring: false,
},
backpressure: false,
controller: undefined,
state: 'writable',
storedError: undefined,
writeRequests: [],
writer: undefined,
transfer: {
promise: undefined,
port1: undefined,
port2: undefined,
readable: undefined,
},
};
this[kIsClosedPromise] = createDeferredPromise();
this[kControllerErrorFunction] = () => {};
function InternalTransferredWritableStream() {
markTransferMode(this, false, true);
this[kType] = 'WritableStream';
this[kState] = {
close: createDeferredPromise(),
closeRequest: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
inFlightWriteRequest: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
[], WritableStream);
inFlightCloseRequest: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
pendingAbortRequest: {
abort: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
reason: undefined,
wasAlreadyErroring: false,
},
backpressure: false,
controller: undefined,
state: 'writable',
storedError: undefined,
writeRequests: [],
writer: undefined,
transfer: {
readable: undefined,
port1: undefined,
port2: undefined,
promise: undefined,
},
};

this[kIsClosedPromise] = createDeferredPromise();
}

ObjectSetPrototypeOf(InternalTransferredWritableStream.prototype, WritableStream.prototype);
ObjectSetPrototypeOf(InternalTransferredWritableStream, WritableStream);

function TransferredWritableStream() {
const stream = new InternalTransferredWritableStream();

stream.constructor = WritableStream;

return stream;
}

TransferredWritableStream.prototype[kDeserialize] = () => {};

class WritableStreamDefaultWriter {
Expand Down

0 comments on commit 940ff7c

Please sign in to comment.