Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge http2 stream & net socket #19527

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 14 additions & 47 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ const binding = process.binding('http2');
const { FileHandle } = process.binding('fs');
const { StreamPipe } = internalBinding('stream_pipe');
const assert = require('assert');
const { Buffer } = require('buffer');
const EventEmitter = require('events');
const net = require('net');
const tls = require('tls');
const util = require('util');
const fs = require('fs');
const {
errnoException,
codes: {
ERR_HTTP2_ALTSVC_INVALID_ORIGIN,
ERR_HTTP2_ALTSVC_LENGTH,
Expand Down Expand Up @@ -107,8 +105,13 @@ const {
validateTimerDuration,
refreshFnSymbol
} = require('internal/timers');
const {
createWriteWrap,
writeGeneric,
writevGeneric
} = require('internal/stream_base_commons');

const { ShutdownWrap, WriteWrap } = process.binding('stream_wrap');
const { ShutdownWrap } = process.binding('stream_wrap');
const { constants, nameForErrorCode } = binding;

const NETServer = net.Server;
Expand Down Expand Up @@ -1426,28 +1429,6 @@ class ClientHttp2Session extends Http2Session {
}
}

function createWriteReq(req, handle, data, encoding) {
switch (encoding) {
case 'utf8':
case 'utf-8':
return handle.writeUtf8String(req, data);
case 'ascii':
return handle.writeAsciiString(req, data);
case 'ucs2':
case 'ucs-2':
case 'utf16le':
case 'utf-16le':
return handle.writeUcs2String(req, data);
case 'latin1':
case 'binary':
return handle.writeLatin1String(req, data);
case 'buffer':
return handle.writeBuffer(req, data);
default:
return handle.writeBuffer(req, Buffer.from(data, encoding));
}
}

function trackWriteState(stream, bytes) {
const session = stream[kSession];
stream[kState].writeQueueSize += bytes;
Expand Down Expand Up @@ -1671,16 +1652,12 @@ class Http2Stream extends Duplex {
if (!this.headersSent)
this[kProceed]();

const handle = this[kHandle];
const req = new WriteWrap();
const req = createWriteWrap(this[kHandle], afterDoStreamWrite);
req.stream = this[kID];
req.handle = handle;
req.callback = cb;
req.oncomplete = afterDoStreamWrite;
req.async = false;
const err = createWriteReq(req, handle, data, encoding);
if (err)
return this.destroy(errnoException(err, 'write', req.error), cb);

writeGeneric(this, req, data, encoding, cb);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have this done for writev too :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@addaleax done 👍


trackWriteState(this, req.bytes);
}

Expand Down Expand Up @@ -1708,22 +1685,12 @@ class Http2Stream extends Duplex {
if (!this.headersSent)
this[kProceed]();

const handle = this[kHandle];
const req = new WriteWrap();
var req = createWriteWrap(this[kHandle], afterDoStreamWrite);
req.stream = this[kID];
req.handle = handle;
req.callback = cb;
req.oncomplete = afterDoStreamWrite;
req.async = false;
const chunks = new Array(data.length << 1);
for (var i = 0; i < data.length; i++) {
const entry = data[i];
chunks[i * 2] = entry.chunk;
chunks[i * 2 + 1] = entry.encoding;
}
const err = handle.writev(req, chunks);
if (err)
return this.destroy(errnoException(err, 'write', req.error), cb);

writevGeneric(this, req, data, cb);

trackWriteState(this, req.bytes);
}

Expand Down
79 changes: 79 additions & 0 deletions lib/internal/stream_base_commons.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
'use strict';

const { Buffer } = require('buffer');
const errors = require('internal/errors');
const { WriteWrap } = process.binding('stream_wrap');

const errnoException = errors.errnoException;

function handleWriteReq(req, data, encoding) {
const { handle } = req;

switch (encoding) {
case 'buffer':
return handle.writeBuffer(req, data);
case 'latin1':
case 'binary':
return handle.writeLatin1String(req, data);
case 'utf8':
case 'utf-8':
return handle.writeUtf8String(req, data);
case 'ascii':
return handle.writeAsciiString(req, data);
case 'ucs2':
case 'ucs-2':
case 'utf16le':
case 'utf-16le':
return handle.writeUcs2String(req, data);
default:
return handle.writeBuffer(req, Buffer.from(data, encoding));
}
}

function createWriteWrap(handle, oncomplete) {
var req = new WriteWrap();

req.handle = handle;
req.oncomplete = oncomplete;
req.async = false;

return req;
}

function writevGeneric(self, req, data, cb) {
var allBuffers = data.allBuffers;
var chunks;
var i;
if (allBuffers) {
chunks = data;
for (i = 0; i < data.length; i++)
data[i] = data[i].chunk;
} else {
chunks = new Array(data.length << 1);
for (i = 0; i < data.length; i++) {
var entry = data[i];
chunks[i * 2] = entry.chunk;
chunks[i * 2 + 1] = entry.encoding;
}
}
var err = req.handle.writev(req, chunks, allBuffers);

// Retain chunks
if (err === 0) req._chunks = chunks;

if (err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it should be ok to use an else branch on the previous if statement for this.

return self.destroy(errnoException(err, 'write', req.error), cb);
}

function writeGeneric(self, req, data, encoding, cb) {
var err = handleWriteReq(req, data, encoding);

if (err)
return self.destroy(errnoException(err, 'write', req.error), cb);
}

module.exports = {
createWriteWrap,
writevGeneric,
writeGeneric
};
74 changes: 14 additions & 60 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ const { TCP, constants: TCPConstants } = process.binding('tcp_wrap');
const { Pipe, constants: PipeConstants } = process.binding('pipe_wrap');
const { TCPConnectWrap } = process.binding('tcp_wrap');
const { PipeConnectWrap } = process.binding('pipe_wrap');
const { ShutdownWrap, WriteWrap } = process.binding('stream_wrap');
const { ShutdownWrap } = process.binding('stream_wrap');
const {
newAsyncId,
defaultTriggerAsyncIdScope,
symbols: { async_id_symbol }
} = require('internal/async_hooks');
const {
createWriteWrap,
writevGeneric,
writeGeneric
} = require('internal/stream_base_commons');
const errors = require('internal/errors');
const {
ERR_INVALID_ADDRESS_FAMILY,
Expand Down Expand Up @@ -740,38 +745,15 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
return false;
}

var req = new WriteWrap();
req.handle = this._handle;
req.oncomplete = afterWrite;
req.async = false;
var err;

if (writev) {
var allBuffers = data.allBuffers;
var chunks;
var i;
if (allBuffers) {
chunks = data;
for (i = 0; i < data.length; i++)
data[i] = data[i].chunk;
} else {
chunks = new Array(data.length << 1);
for (i = 0; i < data.length; i++) {
var entry = data[i];
chunks[i * 2] = entry.chunk;
chunks[i * 2 + 1] = entry.encoding;
}
}
err = this._handle.writev(req, chunks, allBuffers);

// Retain chunks
if (err === 0) req._chunks = chunks;
} else {
err = createWriteReq(req, this._handle, data, encoding);
}
var ret;
var req = createWriteWrap(this._handle, afterWrite);
if (writev)
ret = writevGeneric(this, req, data, cb);
else
ret = writeGeneric(this, req, data, encoding, cb);

if (err)
return this.destroy(errnoException(err, 'write', req.error), cb);
// Bail out if handle.write* returned an error
if (ret) return ret;

this._bytesDispatched += req.bytes;

Expand All @@ -794,34 +776,6 @@ Socket.prototype._write = function(data, encoding, cb) {
this._writeGeneric(false, data, encoding, cb);
};

function createWriteReq(req, handle, data, encoding) {
switch (encoding) {
case 'latin1':
case 'binary':
return handle.writeLatin1String(req, data);

case 'buffer':
return handle.writeBuffer(req, data);

case 'utf8':
case 'utf-8':
return handle.writeUtf8String(req, data);

case 'ascii':
return handle.writeAsciiString(req, data);

case 'ucs2':
case 'ucs-2':
case 'utf16le':
case 'utf-16le':
return handle.writeUcs2String(req, data);

default:
return handle.writeBuffer(req, Buffer.from(data, encoding));
}
}


protoGetter('bytesWritten', function bytesWritten() {
var bytes = this._bytesDispatched;
const state = this._writableState;
Expand Down
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
'lib/internal/v8_prof_polyfill.js',
'lib/internal/v8_prof_processor.js',
'lib/internal/vm/Module.js',
'lib/internal/stream_base_commons.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/async_iterator.js',
'lib/internal/streams/BufferList.js',
Expand Down