Skip to content

Commit

Permalink
net,http2: merge after-write code
Browse files Browse the repository at this point in the history
PR-URL: #24380
Refs: #19060
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Daniel Bevenius <[email protected]>
  • Loading branch information
addaleax authored and danbev committed Nov 21, 2018
1 parent 4eb9908 commit 8dd8b8f
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 54 deletions.
25 changes: 9 additions & 16 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ const {
writeGeneric,
writevGeneric,
onStreamRead,
kAfterAsyncWrite,
kMaybeDestroy,
kUpdateTimer
} = require('internal/stream_base_commons');
Expand Down Expand Up @@ -1515,21 +1516,6 @@ function trackWriteState(stream, bytes) {
session[kHandle].chunksSentSinceLastWrite = 0;
}

function afterDoStreamWrite(status, handle) {
const stream = handle[kOwner];
const session = stream[kSession];

stream[kUpdateTimer]();

const { bytes } = this;
stream[kState].writeQueueSize -= bytes;

if (session !== undefined)
session[kState].writeQueueSize -= bytes;
if (typeof this.callback === 'function')
this.callback(null);
}

function streamOnResume() {
if (!this.destroyed)
this[kHandle].readStart();
Expand Down Expand Up @@ -1782,6 +1768,13 @@ class Http2Stream extends Duplex {
'bug in Node.js');
}

[kAfterAsyncWrite]({ bytes }) {
this[kState].writeQueueSize -= bytes;

if (this.session !== undefined)
this.session[kState].writeQueueSize -= bytes;
}

[kWriteGeneric](writev, data, encoding, cb) {
// When the Http2Stream is first created, it is corked until the
// handle and the stream ID is assigned. However, if the user calls
Expand All @@ -1808,7 +1801,7 @@ class Http2Stream extends Duplex {
if (!this.headersSent)
this[kProceed]();

const req = createWriteWrap(this[kHandle], afterDoStreamWrite);
const req = createWriteWrap(this[kHandle]);
req.stream = this[kID];

if (writev)
Expand Down
28 changes: 26 additions & 2 deletions lib/internal/stream_base_commons.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const { owner_symbol } = require('internal/async_hooks').symbols;

const kMaybeDestroy = Symbol('kMaybeDestroy');
const kUpdateTimer = Symbol('kUpdateTimer');
const kAfterAsyncWrite = Symbol('kAfterAsyncWrite');

function handleWriteReq(req, data, encoding) {
const { handle } = req;
Expand Down Expand Up @@ -52,11 +53,33 @@ function handleWriteReq(req, data, encoding) {
}
}

function createWriteWrap(handle, oncomplete) {
function onWriteComplete(status) {
const stream = this.handle[owner_symbol];

if (stream.destroyed) {
if (typeof this.callback === 'function')
this.callback(null);
return;
}

if (status < 0) {
const ex = errnoException(status, 'write', this.error);
stream.destroy(ex, this.callback);
return;
}

stream[kUpdateTimer]();
stream[kAfterAsyncWrite](this);

if (typeof this.callback === 'function')
this.callback(null);
}

function createWriteWrap(handle) {
var req = new WriteWrap();

req.handle = handle;
req.oncomplete = oncomplete;
req.oncomplete = onWriteComplete;
req.async = false;
req.bytes = 0;
req.buffer = null;
Expand Down Expand Up @@ -160,6 +183,7 @@ module.exports = {
writevGeneric,
writeGeneric,
onStreamRead,
kAfterAsyncWrite,
kMaybeDestroy,
kUpdateTimer,
};
40 changes: 6 additions & 34 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const {
writevGeneric,
writeGeneric,
onStreamRead,
kAfterAsyncWrite,
kUpdateTimer
} = require('internal/stream_base_commons');
const {
Expand Down Expand Up @@ -685,6 +686,10 @@ protoGetter('localPort', function localPort() {
});


Socket.prototype[kAfterAsyncWrite] = function() {
this[kLastWriteQueueSize] = 0;
};

Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
// If we are still connecting, then buffer this for later.
// The Writable logic will buffer up any more writes while
Expand All @@ -707,7 +712,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {

this._unrefTimer();

var req = createWriteWrap(this._handle, afterWrite);
var req = createWriteWrap(this._handle);
if (writev)
writevGeneric(this, req, data, cb);
else
Expand Down Expand Up @@ -771,39 +776,6 @@ protoGetter('bytesWritten', function bytesWritten() {
});


function afterWrite(status, handle, err) {
var self = handle[owner_symbol];
if (self !== process.stderr && self !== process.stdout)
debug('afterWrite', status);

if (this.async)
self[kLastWriteQueueSize] = 0;

// callback may come after call to destroy.
if (self.destroyed) {
debug('afterWrite destroyed');
if (this.callback)
this.callback(null);
return;
}

if (status < 0) {
var ex = errnoException(status, 'write', this.error);
debug('write failure', ex);
self.destroy(ex, this.callback);
return;
}

self._unrefTimer();

if (self !== process.stderr && self !== process.stdout)
debug('afterWrite call cb');

if (this.callback)
this.callback.call(undefined);
}


function checkBindError(err, port, handle) {
// EADDRINUSE may not be reported until we call listen() or connect().
// To complicate matters, a failed bind() followed by listen() or connect()
Expand Down
8 changes: 6 additions & 2 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1571,8 +1571,12 @@ void Http2Session::ClearOutgoing(int status) {
current_outgoing_buffers_.swap(outgoing_buffers_);
for (const nghttp2_stream_write& wr : current_outgoing_buffers_) {
WriteWrap* wrap = wr.req_wrap;
if (wrap != nullptr)
wrap->Done(status);
if (wrap != nullptr) {
// TODO(addaleax): Pass `status` instead of 0, so that we actually error
// out with the error from the write to the underlying protocol,
// if one occurred.
wrap->Done(0);
}
}
}

Expand Down

0 comments on commit 8dd8b8f

Please sign in to comment.