Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streams: introduce StreamWrap and JSStream #926

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions lib/_stream_wrap.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
const util = require('util');
const Socket = require('net').Socket;
const JSStream = process.binding('js_stream').JSStream;
const uv = process.binding('uv');

function StreamWrap(stream) {
var handle = new JSStream();

this.stream = stream;

var self = this;
handle.close = function(cb) {
cb();
};
handle.isAlive = function() {
return self.isAlive();
};
handle.isClosing = function() {
return self.isClosing();
};
handle.onreadstart = function() {
return self.readStart();
};
handle.onreadstop = function() {
return self.readStop();
};
handle.onshutdown = function(req) {
return self.shutdown(req);
};
handle.onwrite = function(req, bufs) {
return self.write(req, bufs);
};

this.stream.pause();
this.stream.on('data', function(chunk) {
self._handle.readBuffer(chunk);
});
this.stream.once('end', function() {
self._handle.emitEOF();
});
this.stream.on('error', function(err) {
self.emit('error', err);
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the underlying stream emits an error? Per this diagram I don't think the stream will emit "end" or "finish". Is there a way to recover from an error in this state?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this should be handled by the code that created this stream... it could be propagated to the StreamWrap object, though, and handled in _tls_wrap.js. Does this sound plausible?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that sound plausible – my expectation would be that a StreamWrap'd stream would expose the inner stream's errors on the outer stream.

I guess this should be handled by the code that created this stream...

This could be a situation where a user gets a stream from package X, instantiates it and passes it to TLSSocket Y; I think the user assumes responsibility for adding error handlers to TLSSocket Y, while TLSSocket Y would be responsible for shutting down / closing / cleaning up once an error on stream X happens.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this is not the way it works for piping the streams right now, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An error downstream will unpipe and clean up listeners on the upstream stream, but will not shut down or destroy the source stream. This has caused a bit of confusion for stream users.

In addition, the act of nesting streams is distinct from piping streams, though piping can be involved in nesting. We don't nest streams often in core (TLS and arguably http are the only places we do something like this.) In userland usually the wrapping stream assumes responsibility for shutting itself down on inner stream error and exposing those errors to userland. Illustrated:

// user assumes responsibility for streams a, b, and c
a = stream()
b = stream()
c = stream()
a.pipe(b).pipe(c).pipe(b).pipe(a)

// user assumes responsibility for streams a and b, but not stream c
a = stream()
c = stream()
b = stream(c)
a.pipe(b).pipe(a).

Socket.call(this, {
handle: handle
});
}
util.inherits(StreamWrap, Socket);
module.exports = StreamWrap;

// require('_stream_wrap').StreamWrap
StreamWrap.StreamWrap = StreamWrap;

StreamWrap.prototype.isAlive = function isAlive() {
return this.readable && this.writable;
};

StreamWrap.prototype.isClosing = function isClosing() {
return !this.isAlive();
};

StreamWrap.prototype.readStart = function readStart() {
this.stream.resume();
return 0;
};

StreamWrap.prototype.readStop = function readStop() {
this.stream.pause();
return 0;
};

StreamWrap.prototype.shutdown = function shutdown(req) {
var self = this;

this.stream.end(function() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can get an error here, too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we can't :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am mistaken!

// Ensure that write was dispatched
setImmediate(function() {
self._handle.finishShutdown(req, 0);
});
});
return 0;
};

StreamWrap.prototype.write = function write(req, bufs) {
var pending = bufs.length;
var self = this;

bufs.forEach(function(buf) {
self.stream.write(buf, done);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be useful to sniff for self.stream._writev, and if present, surround the forEach with cork() / uncork()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be enough to just use .cork()/.uncork() and hope for the best.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


function done(err) {
if (err || --pending !== 0)
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that if there's an error we'll never execute the rest of the function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake, will fix it in a bit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


// Ensure that this is called once in case of error
pending = 0;

// Ensure that write was dispatched
setImmediate(function() {
var errCode = 0;
if (err) {
if (err.code && uv['UV_' + err.code])
errCode = uv['UV_' + err.code];
else
errCode = uv.UV_EPIPE;
}

self._handle.doAfterWrite(req);
self._handle.finishWrite(req, errCode);
});
}

return 0;
};
15 changes: 13 additions & 2 deletions lib/_tls_wrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const tls = require('tls');
const util = require('util');
const listenerCount = require('events').listenerCount;
const common = require('_tls_common');
const StreamWrap = require('_stream_wrap').StreamWrap;
const Duplex = require('stream').Duplex;
const debug = util.debuglog('tls');
const Timer = process.binding('timer_wrap').Timer;
const tls_wrap = process.binding('tls_wrap');
Expand Down Expand Up @@ -224,6 +226,10 @@ function TLSSocket(socket, options) {
this.authorized = false;
this.authorizationError = null;

// Wrap plain JS Stream into StreamWrap
if (!(socket instanceof net.Socket) && socket instanceof Duplex)
socket = new StreamWrap(socket);

// Just a documented property to make secure sockets
// distinguishable from regular ones.
this.encrypted = true;
Expand Down Expand Up @@ -280,7 +286,8 @@ TLSSocket.prototype._wrapHandle = function(handle) {
// Proxy HandleWrap, PipeWrap and TCPWrap methods
proxiedMethods.forEach(function(name) {
res[name] = function methodProxy() {
return handle[name].apply(handle, arguments);
if (handle[name])
return handle[name].apply(handle, arguments);
};
});

Expand Down Expand Up @@ -373,7 +380,7 @@ TLSSocket.prototype._init = function(socket) {
this.setTimeout(options.handshakeTimeout, this._handleTimeout);

// Socket already has some buffered data - emulate receiving it
if (socket && socket._readableState.length) {
if (socket && socket._readableState && socket._readableState.length) {
var buf;
while ((buf = socket.read()) !== null)
ssl.receive(buf);
Expand All @@ -388,6 +395,10 @@ TLSSocket.prototype._init = function(socket) {
self._connecting = false;
self.emit('connect');
});

socket.on('error', function(err) {
self._tlsError(err);
});
}

// Assume `tls.connect()`
Expand Down
3 changes: 3 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
'lib/_stream_duplex.js',
'lib/_stream_transform.js',
'lib/_stream_passthrough.js',
'lib/_stream_wrap.js',
'lib/string_decoder.js',
'lib/sys.js',
'lib/timers.js',
Expand Down Expand Up @@ -95,6 +96,7 @@
'src/fs_event_wrap.cc',
'src/cares_wrap.cc',
'src/handle_wrap.cc',
'src/js_stream.cc',
'src/node.cc',
'src/node_buffer.cc',
'src/node_constants.cc',
Expand Down Expand Up @@ -132,6 +134,7 @@
'src/env.h',
'src/env-inl.h',
'src/handle_wrap.h',
'src/js_stream.h',
'src/node.h',
'src/node_buffer.h',
'src/node_constants.h',
Expand Down
1 change: 1 addition & 0 deletions src/async-wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace node {
V(FSREQWRAP) \
V(GETADDRINFOREQWRAP) \
V(GETNAMEINFOREQWRAP) \
V(JSSTREAM) \
V(PIPEWRAP) \
V(PROCESSWRAP) \
V(QUERYWRAP) \
Expand Down
7 changes: 7 additions & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ namespace node {
V(ipv4_string, "IPv4") \
V(ipv6_lc_string, "ipv6") \
V(ipv6_string, "IPv6") \
V(isalive_string, "isAlive") \
V(isclosing_string, "isClosing") \
V(issuer_string, "issuer") \
V(issuercert_string, "issuerCertificate") \
V(kill_signal_string, "killSignal") \
Expand Down Expand Up @@ -141,9 +143,13 @@ namespace node {
V(onnewsessiondone_string, "onnewsessiondone") \
V(onocspresponse_string, "onocspresponse") \
V(onread_string, "onread") \
V(onreadstart_string, "onreadstart") \
V(onreadstop_string, "onreadstop") \
V(onselect_string, "onselect") \
V(onshutdown_string, "onshutdown") \
V(onsignal_string, "onsignal") \
V(onstop_string, "onstop") \
V(onwrite_string, "onwrite") \
V(output_string, "output") \
V(order_string, "order") \
V(owner_string, "owner") \
Expand Down Expand Up @@ -225,6 +231,7 @@ namespace node {
V(context, v8::Context) \
V(domain_array, v8::Array) \
V(fs_stats_constructor_function, v8::Function) \
V(jsstream_constructor_template, v8::FunctionTemplate) \
V(module_load_list_array, v8::Array) \
V(pipe_constructor_template, v8::FunctionTemplate) \
V(process_object, v8::Object) \
Expand Down
Loading