Skip to content

Commit

Permalink
stream: use BLQS when not in objectMode (fixes nodejs#46347)
Browse files Browse the repository at this point in the history
BLQS = ByteLengthQueuingStrategy
  • Loading branch information
CGQAQ committed Jul 21, 2023
1 parent c301404 commit 27e63b2
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 2 deletions.
7 changes: 5 additions & 2 deletions lib/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const {
CountQueuingStrategy,
} = require('internal/webstreams/queuingstrategies');

const { kFresh } = require('internal/webstreams/util');

const {
Writable,
Readable,
Expand Down Expand Up @@ -457,12 +459,13 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
streamReadable.on('data', onData);

return new ReadableStream({
start(c) { controller = c; },
start(c) { controller = c; controller[kFresh] = true; console.log("start", typeof c); },

pull() { streamReadable.resume(); },
pull() { streamReadable.resume(); },

cancel(reason) {
destroy(streamReadable, reason);
console.log("cancel")
},
}, strategy);
}
Expand Down
5 changes: 5 additions & 0 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ const {
iteratorNext,
kType,
kState,
kFresh,
} = require('internal/webstreams/util');

const {
Expand Down Expand Up @@ -2286,6 +2287,10 @@ function readableStreamDefaultControllerGetDesiredSize(controller) {
}

function readableStreamDefaultControllerShouldCallPull(controller) {
if (controller[kFresh]) {
controller[kFresh] = false;
return false;
}
const {
stream,
} = controller[kState];
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/webstreams/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const {
validateFunction,
} = require('internal/validators');

const kFresh = Symbol('kFresh');
const kState = Symbol('kState');
const kType = Symbol('kType');

Expand Down Expand Up @@ -298,4 +299,5 @@ module.exports = {
iteratorNext,
kType,
kState,
kFresh,
};
44 changes: 44 additions & 0 deletions t/46347.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// since it's ESM, save it as .mjs

import fs from 'node:fs'
import process from 'node:process'
import { Readable } from 'node:stream'

// we initialize a stream, but not start consuming it
const randomNodeStream = fs.createReadStream('/dev/urandom')
// after 10 seconds, it'll get converted to web stream
let randomWebStream

// we check memory usage every second
// since it's a stream, it shouldn't be higher than the chunk size
const reportMemoryUsage = () => {
const { arrayBuffers } = process.memoryUsage()
console.log(
`Array buffers memory usage is ${Math.round(
arrayBuffers / 1024 / 1024
)} MiB`
)
if (arrayBuffers > 256 * 1024 * 1024) {
// streaming should not lead to such a memory increase
// therefore, if it happens => bail
console.log('Over 256 MiB taken, exiting')
process.exit(0)
}
}
setInterval(reportMemoryUsage, 1000)

// after 10 seconds we use Readable.toWeb
// memory usage should stay pretty much the same since it's still a stream
setTimeout(() => {
console.log('converting node stream to web stream')
randomWebStream = Readable.toWeb(randomNodeStream)
}, 5000)

// after 15 seconds we start consuming the stream
// memory usage will grow, but the old chunks should be garbage-collected pretty quickly
setTimeout(async () => {
console.log('reading the chunks')
for await (const chunk of randomWebStream) {
// do nothing, just let the stream flow
}
}, 15000)

0 comments on commit 27e63b2

Please sign in to comment.