From fa556a142512ab932b7359760e5e4585e4e035b6 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 12 Nov 2010 16:24:53 -0800 Subject: [PATCH] Add callback to socket.write(), fix test-sendfds --- lib/net.js | 46 +++++++++++++++++++++++++++++--------- src/node_io_watcher.cc | 13 +++++++++++ test/fixtures/recvfd.js | 16 ++++++------- test/simple/test-sendfd.js | 17 +++++++------- 4 files changed, 63 insertions(+), 29 deletions(-) diff --git a/lib/net.js b/lib/net.js index 03b09c3c5af032..7019d9d1078b60 100644 --- a/lib/net.js +++ b/lib/net.js @@ -56,17 +56,18 @@ var ioWatchers = new FreeList("iowatcher", 100, function () { IOWatcher.prototype.ondrain = function () { - assert(this.socket); - var socket = this.socket; + if (this.socket) { + var socket = this.socket; - if (socket.writable || socket.readable) { - require('timers').active(socket); - } + if (socket.writable || socket.readable) { + require('timers').active(socket); + } - socket.emit('drain'); - if (socket.ondrain) socket.ondrain(); + socket.emit('drain'); + if (socket.ondrain) socket.ondrain(); - if (socket._eof) socket._shutdown(); + if (socket._eof) socket._shutdown(); + } }; @@ -252,12 +253,13 @@ Object.defineProperty(Stream.prototype, 'readyState', { }); -Stream.prototype._appendBucket = function (data, encoding, fd) { +Stream.prototype._appendBucket = function (data, encoding, fd, callback) { if (data.length != 0) { // TODO reject empty data. var newBucket = { data: data }; if (encoding) newBucket.encoding = encoding; if (fd) newBucket.fd = fd; + if (callback) newBucket.callback = callback; // TODO properly calculate queueSize @@ -280,7 +282,7 @@ Stream.prototype._appendBucket = function (data, encoding, fd) { }; -Stream.prototype.write = function (data, encoding, fd) { +Stream.prototype.write = function (data /* encoding, fd, callback */) { if (this._eof) { throw new Error('Stream.end() called already; cannot write.'); } @@ -289,7 +291,29 @@ Stream.prototype.write = function (data, encoding, fd) { throw new Error('Stream is not writable'); } - var queueSize = this._appendBucket(data, encoding, fd); + // parse the arguments. ugly. + + var encoding, fd, callback; + + if (arguments[1] === undefined || typeof arguments[1] == 'string') { + encoding = arguments[1]; + if (typeof arguments[2] == 'number') { + fd = arguments[2]; + callback = arguments[3]; + } else { + callback = arguments[2]; + } + } else if (typeof arguments[1] == 'number') { + fd = arguments[1]; + callback = arguments[2]; + } else if (typeof arguments[1] == 'function') { + callback = arguments[1]; + } else { + throw new Error("Bad type for second argument"); + } + + + var queueSize = this._appendBucket(data, encoding, fd, callback); if (this._connecting) return false; diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index 3f850762796589..62c56464632e0f 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -38,6 +38,7 @@ static Persistent is_unix_socket_sym; static Persistent first_bucket_sym; static Persistent last_bucket_sym; static Persistent queue_size_sym; +static Persistent callback_sym; void IOWatcher::Initialize(Handle target) { @@ -73,6 +74,7 @@ void IOWatcher::Initialize(Handle target) { is_unix_socket_sym = NODE_PSYMBOL("isUnixSocket"); data_sym = NODE_PSYMBOL("data"); encoding_sym = NODE_PSYMBOL("encoding"); + callback_sym = NODE_PSYMBOL("callback"); ev_prepare_init(&dumper, IOWatcher::Dump); @@ -497,6 +499,17 @@ void IOWatcher::Dump() { written -= bucket_len - offset; + Local bucket_callback_v = bucket->Get(callback_sym); + if (bucket_callback_v->IsFunction()) { + Local bucket_callback = + Local::Cast(bucket_callback_v); + TryCatch try_catch; + bucket_callback->Call(io->handle_, 0, NULL); + if (try_catch.HasCaught()) { + FatalException(try_catch); + } + } + // Offset is now zero watcher->Set(offset_sym, Integer::NewFromUnsigned(0)); } diff --git a/test/fixtures/recvfd.js b/test/fixtures/recvfd.js index 09b2864b7ed2e2..8f064693895d04 100644 --- a/test/fixtures/recvfd.js +++ b/test/fixtures/recvfd.js @@ -22,35 +22,33 @@ function processData(s) { // version of our modified object back. Clean up when we're done. var pipeStream = new net.Stream(fd); - var drainFunc = function() { + pipeStream.resume(); + + pipeStream.write(JSON.stringify(d) + '\n', function () { pipeStream.destroy(); if (++numSentMessages == 2) { s.destroy(); } - }; - - pipeStream.addListener('drain', drainFunc); - pipeStream.resume(); - - if (pipeStream.write(JSON.stringify(d) + '\n')) { - drainFunc(); - } + }); }; // Create a UNIX socket to the path defined by argv[2] and read a file // descriptor and misc data from it. var s = new net.Stream(); + s.addListener('fd', function(fd) { receivedFDs.unshift(fd); processData(s); }); + s.addListener('data', function(data) { data.toString('utf8').trim().split('\n').forEach(function(d) { receivedData.unshift(JSON.parse(d)); }); processData(s); }); + s.connect(process.argv[2]); // vim:ts=2 sw=2 et diff --git a/test/simple/test-sendfd.js b/test/simple/test-sendfd.js index 7ed7b02c1a9c5b..8052a136673793 100644 --- a/test/simple/test-sendfd.js +++ b/test/simple/test-sendfd.js @@ -53,7 +53,7 @@ var logChild = function(d) { d.split('\n').forEach(function(l) { if (l.length > 0) { - common.debug('CHILD: ' + l); + console.error('CHILD: ' + l); } }); }; @@ -96,19 +96,18 @@ var srv = net.createServer(function(s) { buf.write(JSON.stringify(DATA) + '\n', 'utf8'); s.write(str, 'utf8', pipeFDs[1]); - if (s.write(buf, undefined, pipeFDs[1])) { + + s.write(buf, pipeFDs[1], function () { + console.error("close pipeFDs[1]"); netBinding.close(pipeFDs[1]); - } else { - s.addListener('drain', function() { - netBinding.close(pipeFDs[1]); - }); - } + }); }); srv.listen(SOCK_PATH); // Spawn a child running test/fixtures/recvfd.js -var cp = child_process.spawn(process.argv[0], - [path.join(common.fixturesDir, 'recvfd.js'), SOCK_PATH]); +var cp = child_process.spawn(process.execPath, + [path.join(common.fixturesDir, 'recvfd.js'), + SOCK_PATH]); cp.stdout.addListener('data', logChild); cp.stderr.addListener('data', logChild);