From c2835e5e4705d9e5740a8a88350923d8c227f203 Mon Sep 17 00:00:00 2001 From: Ashok Date: Sun, 18 Mar 2018 13:05:47 +0530 Subject: [PATCH] lib: merge stream code for http2 streams & net.Socket Squashed from: - lib: separate writev responsibilities from writeGeneric - lib: fix calling of cb twice - lib: extract streamId out of stream_base to caller - lib: add symbols instead of methods to hide impl details - lib: remove unneeded lines - lib: use Object.assign instead of apply - lib: rename mixin StreamBase to StreamSharedMethods - lib: use stream shared funcs as top level instead of properties of prototypes - lib: mv lib/internal/stream_shared_methods.js lib/internal/stream_base_commons.js - lib: add comment for readability - lib: refactor _writev in Http2Stream - lib: rephrase comment - lib: revert usage of const,let for perf reasons PR-URL: https://github.com/nodejs/node/pull/19527 Refs: https://github.com/nodejs/node/issues/19060 Reviewed-By: Matteo Collina Reviewed-By: Trivikram Kamat Reviewed-By: Luigi Pinca Reviewed-By: James M Snell Reviewed-By: Anna Henningsen --- lib/internal/http2/core.js | 61 +++++----------------- lib/internal/stream_base_commons.js | 79 +++++++++++++++++++++++++++++ lib/net.js | 74 +++++---------------------- node.gyp | 1 + 4 files changed, 108 insertions(+), 107 deletions(-) create mode 100644 lib/internal/stream_base_commons.js diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 89cc8db0b00b89..8005d7941a6df3 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -12,14 +12,12 @@ const binding = process.binding('http2'); const { FileHandle } = process.binding('fs'); const { StreamPipe } = internalBinding('stream_pipe'); const assert = require('assert'); -const { Buffer } = require('buffer'); const EventEmitter = require('events'); const net = require('net'); const tls = require('tls'); const util = require('util'); const fs = require('fs'); const { - errnoException, codes: { ERR_HTTP2_ALTSVC_INVALID_ORIGIN, ERR_HTTP2_ALTSVC_LENGTH, @@ -107,8 +105,13 @@ const { validateTimerDuration, refreshFnSymbol } = require('internal/timers'); +const { + createWriteWrap, + writeGeneric, + writevGeneric +} = require('internal/stream_base_commons'); -const { ShutdownWrap, WriteWrap } = process.binding('stream_wrap'); +const { ShutdownWrap } = process.binding('stream_wrap'); const { constants, nameForErrorCode } = binding; const NETServer = net.Server; @@ -1429,28 +1432,6 @@ class ClientHttp2Session extends Http2Session { } } -function createWriteReq(req, handle, data, encoding) { - switch (encoding) { - case 'utf8': - case 'utf-8': - return handle.writeUtf8String(req, data); - case 'ascii': - return handle.writeAsciiString(req, data); - case 'ucs2': - case 'ucs-2': - case 'utf16le': - case 'utf-16le': - return handle.writeUcs2String(req, data); - case 'latin1': - case 'binary': - return handle.writeLatin1String(req, data); - case 'buffer': - return handle.writeBuffer(req, data); - default: - return handle.writeBuffer(req, Buffer.from(data, encoding)); - } -} - function trackWriteState(stream, bytes) { const session = stream[kSession]; stream[kState].writeQueueSize += bytes; @@ -1674,16 +1655,12 @@ class Http2Stream extends Duplex { if (!this.headersSent) this[kProceed](); - const handle = this[kHandle]; - const req = new WriteWrap(); + const req = createWriteWrap(this[kHandle], afterDoStreamWrite); req.stream = this[kID]; - req.handle = handle; req.callback = cb; - req.oncomplete = afterDoStreamWrite; - req.async = false; - const err = createWriteReq(req, handle, data, encoding); - if (err) - return this.destroy(errnoException(err, 'write', req.error), cb); + + writeGeneric(this, req, data, encoding, cb); + trackWriteState(this, req.bytes); } @@ -1711,22 +1688,12 @@ class Http2Stream extends Duplex { if (!this.headersSent) this[kProceed](); - const handle = this[kHandle]; - const req = new WriteWrap(); + var req = createWriteWrap(this[kHandle], afterDoStreamWrite); req.stream = this[kID]; - req.handle = handle; req.callback = cb; - req.oncomplete = afterDoStreamWrite; - req.async = false; - const chunks = new Array(data.length << 1); - for (var i = 0; i < data.length; i++) { - const entry = data[i]; - chunks[i * 2] = entry.chunk; - chunks[i * 2 + 1] = entry.encoding; - } - const err = handle.writev(req, chunks); - if (err) - return this.destroy(errnoException(err, 'write', req.error), cb); + + writevGeneric(this, req, data, cb); + trackWriteState(this, req.bytes); } diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js new file mode 100644 index 00000000000000..d902a501524791 --- /dev/null +++ b/lib/internal/stream_base_commons.js @@ -0,0 +1,79 @@ +'use strict'; + +const { Buffer } = require('buffer'); +const errors = require('internal/errors'); +const { WriteWrap } = process.binding('stream_wrap'); + +const errnoException = errors.errnoException; + +function handleWriteReq(req, data, encoding) { + const { handle } = req; + + switch (encoding) { + case 'buffer': + return handle.writeBuffer(req, data); + case 'latin1': + case 'binary': + return handle.writeLatin1String(req, data); + case 'utf8': + case 'utf-8': + return handle.writeUtf8String(req, data); + case 'ascii': + return handle.writeAsciiString(req, data); + case 'ucs2': + case 'ucs-2': + case 'utf16le': + case 'utf-16le': + return handle.writeUcs2String(req, data); + default: + return handle.writeBuffer(req, Buffer.from(data, encoding)); + } +} + +function createWriteWrap(handle, oncomplete) { + var req = new WriteWrap(); + + req.handle = handle; + req.oncomplete = oncomplete; + req.async = false; + + return req; +} + +function writevGeneric(self, req, data, cb) { + var allBuffers = data.allBuffers; + var chunks; + var i; + if (allBuffers) { + chunks = data; + for (i = 0; i < data.length; i++) + data[i] = data[i].chunk; + } else { + chunks = new Array(data.length << 1); + for (i = 0; i < data.length; i++) { + var entry = data[i]; + chunks[i * 2] = entry.chunk; + chunks[i * 2 + 1] = entry.encoding; + } + } + var err = req.handle.writev(req, chunks, allBuffers); + + // Retain chunks + if (err === 0) req._chunks = chunks; + + if (err) + return self.destroy(errnoException(err, 'write', req.error), cb); +} + +function writeGeneric(self, req, data, encoding, cb) { + var err = handleWriteReq(req, data, encoding); + + if (err) + return self.destroy(errnoException(err, 'write', req.error), cb); +} + +module.exports = { + createWriteWrap, + writevGeneric, + writeGeneric +}; diff --git a/lib/net.js b/lib/net.js index e3cd8559b98e90..c9116fb1a80a81 100644 --- a/lib/net.js +++ b/lib/net.js @@ -46,12 +46,17 @@ const { TCP, constants: TCPConstants } = process.binding('tcp_wrap'); const { Pipe, constants: PipeConstants } = process.binding('pipe_wrap'); const { TCPConnectWrap } = process.binding('tcp_wrap'); const { PipeConnectWrap } = process.binding('pipe_wrap'); -const { ShutdownWrap, WriteWrap } = process.binding('stream_wrap'); +const { ShutdownWrap } = process.binding('stream_wrap'); const { newAsyncId, defaultTriggerAsyncIdScope, symbols: { async_id_symbol } } = require('internal/async_hooks'); +const { + createWriteWrap, + writevGeneric, + writeGeneric +} = require('internal/stream_base_commons'); const errors = require('internal/errors'); const { ERR_INVALID_ADDRESS_FAMILY, @@ -740,38 +745,15 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { return false; } - var req = new WriteWrap(); - req.handle = this._handle; - req.oncomplete = afterWrite; - req.async = false; - var err; - - if (writev) { - var allBuffers = data.allBuffers; - var chunks; - var i; - if (allBuffers) { - chunks = data; - for (i = 0; i < data.length; i++) - data[i] = data[i].chunk; - } else { - chunks = new Array(data.length << 1); - for (i = 0; i < data.length; i++) { - var entry = data[i]; - chunks[i * 2] = entry.chunk; - chunks[i * 2 + 1] = entry.encoding; - } - } - err = this._handle.writev(req, chunks, allBuffers); - - // Retain chunks - if (err === 0) req._chunks = chunks; - } else { - err = createWriteReq(req, this._handle, data, encoding); - } + var ret; + var req = createWriteWrap(this._handle, afterWrite); + if (writev) + ret = writevGeneric(this, req, data, cb); + else + ret = writeGeneric(this, req, data, encoding, cb); - if (err) - return this.destroy(errnoException(err, 'write', req.error), cb); + // Bail out if handle.write* returned an error + if (ret) return ret; this._bytesDispatched += req.bytes; @@ -794,34 +776,6 @@ Socket.prototype._write = function(data, encoding, cb) { this._writeGeneric(false, data, encoding, cb); }; -function createWriteReq(req, handle, data, encoding) { - switch (encoding) { - case 'latin1': - case 'binary': - return handle.writeLatin1String(req, data); - - case 'buffer': - return handle.writeBuffer(req, data); - - case 'utf8': - case 'utf-8': - return handle.writeUtf8String(req, data); - - case 'ascii': - return handle.writeAsciiString(req, data); - - case 'ucs2': - case 'ucs-2': - case 'utf16le': - case 'utf-16le': - return handle.writeUcs2String(req, data); - - default: - return handle.writeBuffer(req, Buffer.from(data, encoding)); - } -} - - protoGetter('bytesWritten', function bytesWritten() { var bytes = this._bytesDispatched; const state = this._writableState; diff --git a/node.gyp b/node.gyp index 91d57631d6d55e..23c6e339856ea9 100644 --- a/node.gyp +++ b/node.gyp @@ -144,6 +144,7 @@ 'lib/internal/v8_prof_polyfill.js', 'lib/internal/v8_prof_processor.js', 'lib/internal/vm/Module.js', + 'lib/internal/stream_base_commons.js', 'lib/internal/streams/lazy_transform.js', 'lib/internal/streams/async_iterator.js', 'lib/internal/streams/BufferList.js',