Skip to content

Commit

Permalink
stream: improve Readable legacy compat
Browse files Browse the repository at this point in the history
The popular stream-shift library accesses internal Readable
state which has been modified.

Refs: googleapis/nodejs-storage#2391
Refs: mafintosh/stream-shift#10
PR-URL: nodejs#51470
  • Loading branch information
ronag committed Jan 15, 2024
1 parent ee61c2c commit e833e46
Showing 1 changed file with 61 additions and 23 deletions.
84 changes: 61 additions & 23 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ const kErroredValue = Symbol('kErroredValue');
const kDefaultEncodingValue = Symbol('kDefaultEncodingValue');
const kDecoderValue = Symbol('kDecoderValue');
const kEncodingValue = Symbol('kEncodingValue');
const kBuffer = Symbol('kBuffer');
const kBufferIndex = Symbol('kBufferIndex');

const kEnded = 1 << 9;
const kEndEmitted = 1 << 10;
Expand Down Expand Up @@ -276,8 +278,8 @@ function ReadableState(options, stream, isDuplex) {
getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) :
getDefaultHighWaterMark(false);

this.buffer = [];
this.bufferIndex = 0;
this[kBuffer] = [];
this[kBufferIndex] = 0;
this.length = 0;
this.pipes = [];

Expand Down Expand Up @@ -561,13 +563,13 @@ function addChunk(stream, state, chunk, addToFront) {
// Update the buffer info.
state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length;
if (addToFront) {
if (state.bufferIndex > 0) {
state.buffer[--state.bufferIndex] = chunk;
if (state[kBufferIndex] > 0) {
state[kBuffer][--state[kBufferIndex]] = chunk;
} else {
state.buffer.unshift(chunk); // Slow path
state[kBuffer].unshift(chunk); // Slow path
}
} else {
state.buffer.push(chunk);
state[kBuffer].push(chunk);
}

if ((state[kState] & kNeedReadable) !== 0)
Expand All @@ -592,14 +594,14 @@ Readable.prototype.setEncoding = function(enc) {

// Iterate over current buffer to convert already stored Buffers:
let content = '';
for (const data of state.buffer.slice(state.bufferIndex)) {
for (const data of state[kBuffer].slice(state[kBufferIndex])) {
content += decoder.write(data);
}
state.buffer.length = 0;
state.bufferIndex = 0;
state[kBuffer].length = 0;
state[kBufferIndex] = 0;

if (content !== '')
state.buffer.push(content);
state[kBuffer].push(content);
state.length = content.length;
return this;
};
Expand Down Expand Up @@ -633,7 +635,7 @@ function howMuchToRead(n, state) {
if (NumberIsNaN(n)) {
// Only flow one buffer at a time.
if ((state[kState] & kFlowing) !== 0 && state.length)
return state.buffer[state.bufferIndex].length;
return state[kBuffer][state[kBufferIndex]].length;
return state.length;
}
if (n <= state.length)
Expand Down Expand Up @@ -790,7 +792,7 @@ function onEofChunk(stream, state) {
if (decoder) {
const chunk = decoder.end();
if (chunk && chunk.length) {
state.buffer.push(chunk);
state[kBuffer].push(chunk);
state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length;
}
}
Expand Down Expand Up @@ -1459,7 +1461,7 @@ ObjectDefineProperties(Readable.prototype, {
__proto__: null,
enumerable: false,
get: function() {
return this._readableState && this._readableState.buffer;
return this._readableState && this._readableState[kBuffer];
},
},

Expand Down Expand Up @@ -1541,9 +1543,15 @@ ObjectDefineProperties(Readable.prototype, {
return this._readableState ? this._readableState.endEmitted : false;
},
},

});

function normalizeBuffer(state) {
if (state[kBufferIndex] > 0) {
state.splice(state[kBufferIndex]);
state[kBufferIndex] = 0;
}
}

ObjectDefineProperties(ReadableState.prototype, {
// Legacy getter for `pipesCount`.
pipesCount: {
Expand All @@ -1568,6 +1576,36 @@ ObjectDefineProperties(ReadableState.prototype, {
}
},
},

// Legacy compat
buffer: {
__proto__: null,
get() {
return new Proxy(this._readableState[kBuffer], {
get (target, name) {
if (name === 'length') {
return target[kBuffer].length - target[kBufferIndex];
}
if (name === '0') {
return target[kBuffer][target[kBufferIndex]];
}
normalizeBuffer(target);
return target[name];
},
set (target, name, value, receiver) {
normalizeBuffer(target);
target[name] = value;
return true;
}
})
},
},
bufferIndex: {
__proto__: null,
get() {
return 0;
}
}
});

// Exposed for testing purposes only.
Expand All @@ -1582,10 +1620,10 @@ function fromList(n, state) {
if (state.length === 0)
return null;

let idx = state.bufferIndex;
let idx = state[kBufferIndex];
let ret;

const buf = state.buffer;
const buf = state[kBuffer];
const len = buf.length;

if ((state[kState] & kObjectMode) !== 0) {
Expand Down Expand Up @@ -1656,22 +1694,22 @@ function fromList(n, state) {
TypedArrayPrototypeSet(ret, data, retLen - n);
buf[idx++] = null;
} else {
TypedArrayPrototypeSet(ret, new FastBuffer(data.buffer, data.byteOffset, n), retLen - n);
buf[idx] = new FastBuffer(data.buffer, data.byteOffset + n, data.length - n);
TypedArrayPrototypeSet(ret, new FastBuffer(data[kBuffer], data.byteOffset, n), retLen - n);
buf[idx] = new FastBuffer(data[kBuffer], data.byteOffset + n, data.length - n);
}
break;
}
}
}

if (idx === len) {
state.buffer.length = 0;
state.bufferIndex = 0;
state[kBuffer].length = 0;
state[kBufferIndex] = 0;
} else if (idx > 1024) {
state.buffer.splice(0, idx);
state.bufferIndex = 0;
state[kBuffer].splice(0, idx);
state[kBufferIndex] = 0;
} else {
state.bufferIndex = idx;
state[kBufferIndex] = idx;
}

return ret;
Expand Down

0 comments on commit e833e46

Please sign in to comment.