Skip to content

Commit

Permalink
stream: writable state bitmap
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Sep 28, 2023
1 parent c19b2a7 commit d7b1625
Showing 1 changed file with 133 additions and 59 deletions.
192 changes: 133 additions & 59 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,27 +73,35 @@ ObjectSetPrototypeOf(Writable, Stream);
function nop() {}

const kOnFinished = Symbol('kOnFinished');
const kErrored = Symbol('kErrored');
const kCorkedValue = Symbol('kCorked');

const kCorked = 0b111111; // 6 bits
const kObjectMode = 1 << 7;
const kEnded = 1 << 8;
const kConstructed = 1 << 9;
const kSync = 1 << 10;
const kErrorEmitted = 1 << 11;
const kEmitClose = 1 << 12;
const kAutoDestroy = 1 << 13;
const kDestroyed = 1 << 14;
const kClosed = 1 << 15;
const kCloseEmitted = 1 << 16;
const kFinalCalled = 1 << 17;
const kNeedDrain = 1 << 18;
const kEnding = 1 << 19;
const kFinished = 1 << 20;
const kDecodeStrings = 1 << 21;
const kWriting = 1 << 22;
const kBufferProcessing = 1 << 23;
const kPrefinished = 1 << 24;
const kAllBuffers = 1 << 25;
const kAllNoop = 1 << 26;
const kHasOnFinished = 1 << 27;
const kHasErrored = 1 << 28;
const kHasWritable = 1 << 29;
const kWritable = 1 << 30;

const kObjectMode = 1 << 0;
const kEnded = 1 << 1;
const kConstructed = 1 << 2;
const kSync = 1 << 3;
const kErrorEmitted = 1 << 4;
const kEmitClose = 1 << 5;
const kAutoDestroy = 1 << 6;
const kDestroyed = 1 << 7;
const kClosed = 1 << 8;
const kCloseEmitted = 1 << 9;
const kFinalCalled = 1 << 10;
const kNeedDrain = 1 << 11;
const kEnding = 1 << 12;
const kFinished = 1 << 13;
const kDecodeStrings = 1 << 14;
const kWriting = 1 << 15;
const kBufferProcessing = 1 << 16;
const kPrefinished = 1 << 17;
const kAllBuffers = 1 << 18;
const kAllNoop = 1 << 19;

// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
Expand Down Expand Up @@ -176,6 +184,58 @@ ObjectDefineProperties(WritableState.prototype, {

allBuffers: makeBitMapDescriptor(kAllBuffers),
allNoop: makeBitMapDescriptor(kAllNoop),

// Indicates whether the stream has errored. When true all write() calls
// should return false. This is needed since when autoDestroy
// is disabled we need a way to tell whether the stream has failed.
// This is/should be a cold path.
errored: {
enumerable: false,
get() { return (this.state & kHasErrored) !== 0 ? this[kErrored] : null; },
set(value) {
if (value) {
this[kErrored] = value;
this.state |= kHasErrored;
} else {
this.state &= ~kHasErrored;
}
},
},


writable: {
enumerable: false,
get() { return (this.state & kHasWritable) !== 0 ? (this.state & kWritable) !== 0 : null; },
set(value) {
if (value == null) {
this.state &= (kHasWritable | kWritable);
} else if (value) {
this.state |= (kHasWritable | kWritable);
} else {
this.state |= kHasWritable;
this.state &= ~kWritable;
}
},
},

// When true all writes will be buffered until .uncork() call.
// This is/should be a cold path.
corked: {
enumerable: false,
get() {
const corked = this.state & kCorked;
return corked !== kCorked ? val : this[kCorkedValue];
},
set(value) {
if (value < kCorked) {
this.state &= ~kCorked;
this.state |= value;
} else {
this.state |= kCorked
this[kCorkedValue] = value;
}
},
},
});

function WritableState(options, stream, isDuplex) {
Expand Down Expand Up @@ -226,9 +286,6 @@ function WritableState(options, stream, isDuplex) {
// socket or file.
this.length = 0;

// When true all writes will be buffered until .uncork() call.
this.corked = 0;

// The callback that's passed to _write(chunk, cb).
this.onwrite = onwrite.bind(undefined, stream);

Expand All @@ -247,13 +304,6 @@ function WritableState(options, stream, isDuplex) {
// Number of pending user-supplied write callbacks
// this must be 0 before 'finish' can be emitted.
this.pendingcb = 0;

// Indicates whether the stream has errored. When true all write() calls
// should return false. This is needed since when autoDestroy
// is disabled we need a way to tell whether the stream has failed.
this.errored = null;

this[kOnFinished] = [];
}

function resetBuffer(state) {
Expand Down Expand Up @@ -394,17 +444,32 @@ Writable.prototype.write = function(chunk, encoding, cb) {
};

Writable.prototype.cork = function() {
this._writableState.corked++;
const state = this._writableState;

const corked = (state & kCorked) + 1;
if (corked < kCorked) {
this._writableState.state += 1;
} else {
state.corked++;
}
};

Writable.prototype.uncork = function() {
const state = this._writableState;

if (state.corked) {
if ((state.state & kCorked) === 0) {
return
}

const corked = state & kCorked;
if (corked < kCorked) {
this._writableState.state -= 1;
} else {
state.corked--;
}

if ((state.state & kWriting) === 0)
clearBuffer(this, state);
if ((state.state & kWriting) === 0) {
clearBuffer(this, state);
}
};

Expand Down Expand Up @@ -432,7 +497,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
if (!ret)
state.state |= kNeedDrain;

if ((state.state & kWriting) !== 0 || state.corked || state.errored || (state.state & kConstructed) === 0) {
if ((state.state & kWriting) !== 0 || (state.state & (kHasErrored | kCorked)) || (state.state & kConstructed) === 0) {
state.buffered.push({ chunk, encoding, callback });
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
state.state &= ~kAllBuffers;
Expand All @@ -450,7 +515,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {

// Return false if errored or destroyed in order to break
// any synchronous while(stream.write(data)) loops.
return ret && !state.errored && (state.state & kDestroyed) === 0;
return ret && (state.state & kHasErrored) === 0 && (state.state & kDestroyed) === 0;
}

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
Expand Down Expand Up @@ -498,7 +563,7 @@ function onwrite(stream, er) {
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
er.stack; // eslint-disable-line no-unused-expressions

if (!state.errored) {
if ((state.state & kHasErrored) === 0) {
state.errored = er;
}

Expand Down Expand Up @@ -573,18 +638,19 @@ function errorBuffer(state) {
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
}

const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end'));
if ((state.state & kHasOnFinished) !== 0) {
const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end'));
}
}

resetBuffer(state);
}

// If there's something in the buffer waiting, then process it.
function clearBuffer(stream, state) {
if (state.corked ||
(state.state & (kDestroyed | kBufferProcessing)) !== 0 ||
if ((state.state & (kDestroyed | kBufferProcessing | kCorked)) !== 0 ||
(state.state & kConstructed) === 0) {
return;
}
Expand Down Expand Up @@ -669,14 +735,16 @@ Writable.prototype.end = function(chunk, encoding, cb) {
}

// .end() fully uncorks.
if (state.corked) {
state.corked = 1;
this.uncork();
if ((state.state & kCorked) !== 0) {
state.state &= ~kCorked;
if ((state.state & kWriting) === 0) {
clearBuffer(this, state);
}
}

if (err) {
// Do nothing...
} else if (!state.errored && (state.state & kEnding) === 0) {
} else if ((state.state & kErrored) === 0 && (state.state & kEnding) === 0) {
// This is forgiving in terms of unnecessary calls to end() and can hide
// logic errors. However, usually such errors are harmless and causing a
// hard error can be disproportionately destructive. It is not always
Expand All @@ -698,6 +766,8 @@ Writable.prototype.end = function(chunk, encoding, cb) {
} else if ((state.state & kFinished) !== 0) {
process.nextTick(cb, null);
} else {
state.state |= kHasOnFinished;
state[kOnFinished] ??= [];
state[kOnFinished].push(cb);
}
}
Expand All @@ -715,10 +785,10 @@ function needFinish(state) {
kFinished |
kWriting |
kErrorEmitted |
kCloseEmitted
kCloseEmitted |
kHasErrored
)) === (kEnding | kConstructed) &&
state.length === 0 &&
!state.errored &&
state.buffered.length === 0);
}

Expand All @@ -734,9 +804,11 @@ function callFinal(stream, state) {

state.pendingcb--;
if (err) {
const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](err);
if ((state.state & kHasOnFinished) !== 0) {
const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](err);
}
}
errorOrDestroy(stream, err, (state.state & kSync) !== 0);
} else if (needFinish(state)) {
Expand Down Expand Up @@ -799,9 +871,11 @@ function finish(stream, state) {
state.pendingcb--;
state.state |= kFinished;

const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](null);
if ((state.state & kHasOnFinished) !== 0) {
const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](null);
}
}

stream.emit('finish');
Expand Down Expand Up @@ -853,8 +927,8 @@ ObjectDefineProperties(Writable.prototype, {
// where the writable side was disabled upon construction.
// Compat. The user might manually disable writable side through
// deprecated setter.
return !!w && w.writable !== false && !w.errored &&
(w.state & (kEnding | kEnded | kDestroyed)) === 0;
return !!w && w.writable !== false &&
(w.state & (kEnding | kEnded | kDestroyed | kHasErrored)) === 0;
},
set(val) {
// Backwards compatible.
Expand Down Expand Up @@ -928,7 +1002,7 @@ ObjectDefineProperties(Writable.prototype, {
__proto__: null,
enumerable: false,
get() {
return this._writableState ? this._writableState.errored : null;
return this._writableState && (this._writableState.state & kHasErrored) !== 0 ? this._writableState.errored : null;
},
},

Expand All @@ -938,7 +1012,7 @@ ObjectDefineProperties(Writable.prototype, {
get: function() {
return !!(
this._writableState.writable !== false &&
((this._writableState.state & kDestroyed) !== 0 || this._writableState.errored) &&
(this._writableState.state & (kDestroyed | kHasErrored)) !== 0 &&
(this._writableState.state & kFinished) === 0
);
},
Expand All @@ -952,7 +1026,7 @@ Writable.prototype.destroy = function(err, cb) {
// Invoke pending callbacks.
if ((state.state & kDestroyed) === 0 &&
(state.bufferedIndex < state.buffered.length ||
state[kOnFinished].length)) {
(((state.state & kHasOnFinished) !== 0) && state[kOnFinished].length))) {
process.nextTick(errorBuffer, state);
}

Expand Down

0 comments on commit d7b1625

Please sign in to comment.