Skip to content

Commit

Permalink
streams: use Array for Readable buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Oct 23, 2023
1 parent 25576b5 commit 1bffece
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 24 deletions.
2 changes: 1 addition & 1 deletion benchmark/streams/readable-bigread.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ function main({ n }) {

bench.start();
for (let k = 0; k < n; ++k) {
for (let i = 0; i < 1e4; ++i)
for (let i = 0; i < 1e3; ++i)
s.push(b);
while (s.read(128));
}
Expand Down
2 changes: 1 addition & 1 deletion benchmark/streams/readable-readall.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ function main({ n }) {

bench.start();
for (let k = 0; k < n; ++k) {
for (let i = 0; i < 1e4; ++i)
for (let i = 0; i < 1e3; ++i)
s.push(b);
while (s.read());
}
Expand Down
134 changes: 112 additions & 22 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const {
const { validateObject } = require('internal/validators');

const kState = Symbol('kState');
const FastBuffer = Buffer[Symbol.species];

const { StringDecoder } = require('string_decoder');
const from = require('internal/streams/from');
Expand Down Expand Up @@ -278,7 +279,8 @@ function ReadableState(options, stream, isDuplex) {
// A linked list is used to store data chunks instead of an array because the
// linked list can remove elements from the beginning faster than
// array.shift().
this.buffer = new BufferList();
this.buffer = [];
this.bufferIndex = 0;
this.length = 0;
this.pipes = [];

Expand Down Expand Up @@ -546,10 +548,15 @@ function addChunk(stream, state, chunk, addToFront) {
} else {
// Update the buffer info.
state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length;
if (addToFront)
state.buffer.unshift(chunk);
else
if (addToFront) {
if (state.bufferIndex > 0) {
state.buffer[--state.bufferIndex] = chunk;
} else {
state.buffer.unshift(chunk); // Slow path
}
} else {
state.buffer.push(chunk);
}

if ((state[kState] & kNeedReadable) !== 0)
emitReadable(stream);
Expand All @@ -564,21 +571,24 @@ Readable.prototype.isPaused = function() {

// Backwards compatibility.
Readable.prototype.setEncoding = function(enc) {
const state = this._readableState;

const decoder = new StringDecoder(enc);
this._readableState.decoder = decoder;
state.decoder = decoder;
// If setEncoding(null), decoder.encoding equals utf8.
this._readableState.encoding = this._readableState.decoder.encoding;
state.encoding = state.decoder.encoding;

const buffer = this._readableState.buffer;
// Iterate over current buffer to convert already stored Buffers:
let content = '';
for (const data of buffer) {
for (const data of state.buffer.slice(state.bufferIndex)) {
content += decoder.write(data);
}
buffer.clear();
state.buffer.length = 0;
state.bufferIndex = 0;

if (content !== '')
buffer.push(content);
this._readableState.length = content.length;
state.length = content.length;
return this;
};

Expand Down Expand Up @@ -611,7 +621,7 @@ function howMuchToRead(n, state) {
if (NumberIsNaN(n)) {
// Only flow one buffer at a time.
if ((state[kState] & kFlowing) !== 0 && state.length)
return state.buffer.first().length;
return state.buffer[state.bufferIndex].length;
return state.length;
}
if (n <= state.length)
Expand Down Expand Up @@ -1550,20 +1560,100 @@ function fromList(n, state) {
return null;

let ret;
if (state.objectMode)
ret = state.buffer.shift();
else if (!n || n >= state.length) {
if ((state[kState] & kObjectMode) !== 0) {
ret = state.buffer[state.bufferIndex++];
} else if (!n || n >= state.length) {
// Read it all, truncate the list.
if (state.decoder)
ret = state.buffer.join('');
else if (state.buffer.length === 1)
ret = state.buffer.first();
else
ret = state.buffer.concat(state.length);
state.buffer.clear();
if ((state[kState] & kDecoder) !== 0) {
ret = ''
for (let n = state.bufferIndex; n < state.buffer.length; n++) {
ret += state.buffer[n];
}
} else if (state.buffer.length - state.bufferIndex === 0) {
ret = Buffer.alloc(0)
} else if (state.buffer.length - state.bufferIndex === 1) {
ret = state.buffer[state.bufferIndex];
} else {
ret = Buffer.allocUnsafe(n >>> 0);

const idx = state.bufferIndex;
const buf = state.buffer;
const len = buf.length;

let i = 0;
for (let n = idx; n < len; n++) {
ret.set(buf[n], i);
i += buf[n].length;
}
}
state.buffer.length = 0;
state.bufferIndex = 0;
} else {
// read part of list.
ret = state.buffer.consume(n, state.decoder);

const buf = state.buffer;
const len = buf.length;

let idx = state.bufferIndex;

if (n < buf[idx].length) {
// `slice` is the same for buffers and strings.
ret = buf[idx].slice(0, n);
buf[idx] = buf[idx].slice(n);
} else if (n === data.length) {
// First chunk is a perfect match.
ret = buf[idx++];
} else if ((state[kState] & kDecoder) !== 0) {
ret = '';
while (idx < state.buffer.length) {
const str = buf[idx];
if (n > str.length) {
ret += str;
n -= str.length;
idx++;
} else {
if (n === buf.length) {
ret += str;
idx++;
} else {
ret += str.slice(0, n);
buf[idx] = str.slice(n);
}
break;
}
}
} else {
ret = Buffer.allocUnsafe(n);

const retLen = n;
while (idx < len) {
const data = buf[idx];
if (n > data.length) {
ret.set(data, retLen - n);
n -= data.length;
idx++;
} else {
if (n === data.length) {
ret.set(data, retLen - n);
idx++;
} else {
ret.set(new FastBuffer(data.buffer, data.byteOffset, n), retLen - n);
buf[idx] = new FastBuffer(data.buffer, data.byteOffset + n);
}
break;
}
}
}

if (idx === buf.length) {
state.buffer.length = 0;
state.bufferIndex = 0
} else if (idx > 1024) {
state.buffer.splice(0, idx);
state.bufferIndex = 0;
} else {
state.bufferIndex = idx;
}
}

return ret;
Expand Down

0 comments on commit 1bffece

Please sign in to comment.