From 3f7b05ec50bd9b745f29265a51a9459a2aa2a7f6 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 11 Sep 2023 10:45:11 +0300 Subject: [PATCH] stream: improve tee perf by reduce `ReflectConstruct` usages also added more webstream creation benchmarks PR-URL: https://github.com/nodejs/node/pull/49546 Reviewed-By: Yagiz Nizipli Reviewed-By: Matteo Collina --- benchmark/webstreams/creation.js | 68 ++++++++++++++++++++--- lib/internal/webstreams/readablestream.js | 63 ++++++++++++--------- 2 files changed, 96 insertions(+), 35 deletions(-) diff --git a/benchmark/webstreams/creation.js b/benchmark/webstreams/creation.js index babd9101b60439..2c3e1d273f6d4c 100644 --- a/benchmark/webstreams/creation.js +++ b/benchmark/webstreams/creation.js @@ -2,6 +2,8 @@ const common = require('../common.js'); const { ReadableStream, + ReadableStreamDefaultReader, + ReadableStreamBYOBReader, TransformStream, WritableStream, } = require('node:stream/web'); @@ -9,40 +11,90 @@ const assert = require('assert'); const bench = common.createBenchmark(main, { n: [50e3], - kind: ['ReadableStream', 'TransformStream', 'WritableStream'], + kind: [ + 'ReadableStream', + 'TransformStream', + 'WritableStream', + + 'ReadableStreamDefaultReader', + 'ReadableStreamBYOBReader', + + 'ReadableStream.tee', + ], }); -let rs, ws, ts; +let readableStream; +let transformStream; +let writableStream; +let readableStreamDefaultReader; +let readableStreamBYOBReader; +let teeResult; function main({ n, kind }) { switch (kind) { case 'ReadableStream': bench.start(); for (let i = 0; i < n; ++i) - rs = new ReadableStream(); + readableStream = new ReadableStream(); bench.end(n); // Avoid V8 deadcode (elimination) - assert.ok(rs); + assert.ok(readableStream); break; case 'WritableStream': bench.start(); for (let i = 0; i < n; ++i) - ws = new WritableStream(); + writableStream = new WritableStream(); bench.end(n); // Avoid V8 deadcode (elimination) - assert.ok(ws); + assert.ok(writableStream); break; case 'TransformStream': bench.start(); for (let i = 0; i < n; ++i) - ts = new TransformStream(); + transformStream = new TransformStream(); + bench.end(n); + + // Avoid V8 deadcode (elimination) + assert.ok(transformStream); + break; + case 'ReadableStreamDefaultReader': { + const readers = Array.from({ length: n }, () => new ReadableStream()); + + bench.start(); + for (let i = 0; i < n; ++i) + readableStreamDefaultReader = new ReadableStreamDefaultReader(readers[i]); + bench.end(n); + + // Avoid V8 deadcode (elimination) + assert.ok(readableStreamDefaultReader); + break; + } + case 'ReadableStreamBYOBReader': { + const readers = Array.from({ length: n }, () => new ReadableStream({ type: 'bytes' })); + + bench.start(); + for (let i = 0; i < n; ++i) + readableStreamBYOBReader = new ReadableStreamBYOBReader(readers[i]); + bench.end(n); + + // Avoid V8 deadcode (elimination) + assert.ok(readableStreamBYOBReader); + break; + } + case 'ReadableStream.tee': { + const streams = Array.from({ length: n }, () => new ReadableStream()); + + bench.start(); + for (let i = 0; i < n; ++i) + teeResult = streams[i].tee(); bench.end(n); // Avoid V8 deadcode (elimination) - assert.ok(ts); + assert.ok(teeResult); break; + } default: throw new Error('Invalid kind'); } diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 2c6a30ab4ed672..3d70f97a84bb47 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1200,34 +1200,43 @@ ObjectDefineProperties(ReadableByteStreamController.prototype, { [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableByteStreamController.name), }); +function TeeReadableStream(start, pull, cancel) { + this[kType] = 'ReadableStream'; + this[kState] = { + disturbed: false, + state: 'readable', + storedError: undefined, + stream: undefined, + transfer: { + writable: undefined, + port: undefined, + promise: undefined, + }, + }; + this[kIsClosedPromise] = createDeferredPromise(); + setupReadableStreamDefaultControllerFromSource( + this, + ObjectCreate(null, { + start: { __proto__: null, value: start }, + pull: { __proto__: null, value: pull }, + cancel: { __proto__: null, value: cancel }, + }), + 1, + () => 1); + + + return makeTransferable(this); +} + +ObjectSetPrototypeOf(TeeReadableStream.prototype, ReadableStream.prototype); +ObjectSetPrototypeOf(TeeReadableStream, ReadableStream); + function createTeeReadableStream(start, pull, cancel) { - return ReflectConstruct( - function() { - this[kType] = 'ReadableStream'; - this[kState] = { - disturbed: false, - state: 'readable', - storedError: undefined, - stream: undefined, - transfer: { - writable: undefined, - port: undefined, - promise: undefined, - }, - }; - this[kIsClosedPromise] = createDeferredPromise(); - setupReadableStreamDefaultControllerFromSource( - this, - ObjectCreate(null, { - start: { __proto__: null, value: start }, - pull: { __proto__: null, value: pull }, - cancel: { __proto__: null, value: cancel }, - }), - 1, - () => 1); - return makeTransferable(this); - }, [], ReadableStream, - ); + const tee = new TeeReadableStream(start, pull, cancel); + + // For spec compliance the Tee must be a ReadableStream + tee.constructor = ReadableStream; + return tee; } const isReadableStream =