diff --git a/benchmark/dgram/array-vs-concat.js b/benchmark/dgram/array-vs-concat.js index 669cf47df4..d260a48063 100644 --- a/benchmark/dgram/array-vs-concat.js +++ b/benchmark/dgram/array-vs-concat.js @@ -29,17 +29,25 @@ function main({ dur, len, num, type, chunks }) { function onsendConcat() { if (sent++ % num === 0) { - for (var i = 0; i < num; i++) { - socket.send(Buffer.concat(chunk), PORT, '127.0.0.1', onsend); - } + // The setImmediate() is necessary to have event loop progress on OSes + // that only perform synchronous I/O on nonblocking UDP sockets. + setImmediate(() => { + for (var i = 0; i < num; i++) { + socket.send(Buffer.concat(chunk), PORT, '127.0.0.1', onsend); + } + }); } } function onsendMulti() { if (sent++ % num === 0) { - for (var i = 0; i < num; i++) { - socket.send(chunk, PORT, '127.0.0.1', onsend); - } + // The setImmediate() is necessary to have event loop progress on OSes + // that only perform synchronous I/O on nonblocking UDP sockets. + setImmediate(() => { + for (var i = 0; i < num; i++) { + socket.send(chunk, PORT, '127.0.0.1', onsend); + } + }); } } diff --git a/benchmark/dgram/multi-buffer.js b/benchmark/dgram/multi-buffer.js index a1c50551b8..7b69a82255 100644 --- a/benchmark/dgram/multi-buffer.js +++ b/benchmark/dgram/multi-buffer.js @@ -27,9 +27,13 @@ function main({ dur, len, num, type, chunks }) { function onsend() { if (sent++ % num === 0) { - for (var i = 0; i < num; i++) { - socket.send(chunk, PORT, '127.0.0.1', onsend); - } + // The setImmediate() is necessary to have event loop progress on OSes + // that only perform synchronous I/O on nonblocking UDP sockets. + setImmediate(() => { + for (var i = 0; i < num; i++) { + socket.send(chunk, PORT, '127.0.0.1', onsend); + } + }); } } diff --git a/benchmark/dgram/offset-length.js b/benchmark/dgram/offset-length.js index 7c672acae2..696fa6a7a0 100644 --- a/benchmark/dgram/offset-length.js +++ b/benchmark/dgram/offset-length.js @@ -23,9 +23,13 @@ function main({ dur, len, num, type }) { function onsend() { if (sent++ % num === 0) { - for (var i = 0; i < num; i++) { - socket.send(chunk, 0, chunk.length, PORT, '127.0.0.1', onsend); - } + // The setImmediate() is necessary to have event loop progress on OSes + // that only perform synchronous I/O on nonblocking UDP sockets. + setImmediate(() => { + for (var i = 0; i < num; i++) { + socket.send(chunk, 0, chunk.length, PORT, '127.0.0.1', onsend); + } + }); } } diff --git a/benchmark/dgram/single-buffer.js b/benchmark/dgram/single-buffer.js index d183b9cd1d..5c95b17887 100644 --- a/benchmark/dgram/single-buffer.js +++ b/benchmark/dgram/single-buffer.js @@ -23,9 +23,13 @@ function main({ dur, len, num, type }) { function onsend() { if (sent++ % num === 0) { - for (var i = 0; i < num; i++) { - socket.send(chunk, PORT, '127.0.0.1', onsend); - } + // The setImmediate() is necessary to have event loop progress on OSes + // that only perform synchronous I/O on nonblocking UDP sockets. + setImmediate(() => { + for (var i = 0; i < num; i++) { + socket.send(chunk, PORT, '127.0.0.1', onsend); + } + }); } } diff --git a/lib/dgram.js b/lib/dgram.js index 923463cc2e..cc61d4d274 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -666,6 +666,14 @@ function doSend(ex, self, ip, list, address, port, callback) { else err = state.handle.send(req, list, list.length, !!callback); + if (err >= 1) { + // Synchronous finish. The return code is msg_length + 1 so that we can + // distinguish between synchronous success and asynchronous success. + if (callback) + process.nextTick(callback, null, err - 1); + return; + } + if (err && callback) { // Don't emit as error, dgram_legacy.js compatibility const ex = exceptionWithHostPort(err, 'send', address, port); diff --git a/src/node_options.cc b/src/node_options.cc index 34efe6336a..4320493e08 100644 --- a/src/node_options.cc +++ b/src/node_options.cc @@ -449,6 +449,8 @@ EnvironmentOptionsParser::EnvironmentOptionsParser() { "write warnings to file instead of stderr", &EnvironmentOptions::redirect_warnings, kAllowedInEnvironment); + AddOption("--test-udp-no-try-send", "", // For testing only. + &EnvironmentOptions::test_udp_no_try_send); AddOption("--throw-deprecation", "throw an exception on deprecations", &EnvironmentOptions::throw_deprecation, diff --git a/src/node_options.h b/src/node_options.h index 780e669eb6..236f1aab92 100644 --- a/src/node_options.h +++ b/src/node_options.h @@ -134,6 +134,7 @@ class EnvironmentOptions : public Options { bool heap_prof = false; #endif // HAVE_INSPECTOR std::string redirect_warnings; + bool test_udp_no_try_send = false; bool throw_deprecation = false; bool trace_deprecation = false; bool trace_sync_io = false; diff --git a/src/udp_wrap.cc b/src/udp_wrap.cc index 3c66db2155..64c4c8b304 100644 --- a/src/udp_wrap.cc +++ b/src/udp_wrap.cc @@ -429,11 +429,6 @@ void UDPWrap::DoSend(const FunctionCallbackInfo& args, int family) { size_t count = args[2].As()->Value(); const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue(); - SendWrap* req_wrap; - { - AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap); - req_wrap = new SendWrap(env, req_wrap_obj, have_callback); - } size_t msg_size = 0; MaybeStackBuffer bufs(count); @@ -448,8 +443,6 @@ void UDPWrap::DoSend(const FunctionCallbackInfo& args, int family) { msg_size += length; } - req_wrap->msg_size = msg_size; - int err = 0; struct sockaddr_storage addr_storage; sockaddr* addr = nullptr; @@ -462,18 +455,47 @@ void UDPWrap::DoSend(const FunctionCallbackInfo& args, int family) { } } + uv_buf_t* bufs_ptr = *bufs; + if (err == 0 && !UNLIKELY(env->options()->test_udp_no_try_send)) { + err = uv_udp_try_send(&wrap->handle_, bufs_ptr, count, addr); + if (err == UV_ENOSYS || err == UV_EAGAIN) { + err = 0; + } else if (err >= 0) { + size_t sent = err; + while (count > 0 && bufs_ptr->len <= sent) { + sent -= bufs_ptr->len; + bufs_ptr++; + count--; + } + if (count > 0) { + CHECK_LT(sent, bufs_ptr->len); + bufs_ptr->base += sent; + bufs_ptr->len -= sent; + } else { + CHECK_EQ(static_cast(err), msg_size); + // + 1 so that the JS side can distinguish 0-length async sends from + // 0-length sync sends. + args.GetReturnValue().Set(static_cast(msg_size) + 1); + return; + } + } + } + if (err == 0) { + AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap); + SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback); + req_wrap->msg_size = msg_size; + err = req_wrap->Dispatch(uv_udp_send, &wrap->handle_, - *bufs, + bufs_ptr, count, addr, OnSend); + if (err) + delete req_wrap; } - if (err) - delete req_wrap; - args.GetReturnValue().Set(err); } diff --git a/test/async-hooks/test-udpsendwrap.js b/test/async-hooks/test-udpsendwrap.js index 25b7eb6103..f1403e3226 100644 --- a/test/async-hooks/test-udpsendwrap.js +++ b/test/async-hooks/test-udpsendwrap.js @@ -1,3 +1,4 @@ +// Flags: --test-udp-no-try-send 'use strict'; const common = require('../common'); diff --git a/test/parallel/test-dgram-send-callback-recursive.js b/test/parallel/test-dgram-send-callback-recursive.js index 835fa332df..1a4c7c84fc 100644 --- a/test/parallel/test-dgram-send-callback-recursive.js +++ b/test/parallel/test-dgram-send-callback-recursive.js @@ -22,7 +22,7 @@ function onsend() { client.on('listening', function() { port = this.address().port; - setImmediate(function() { + process.nextTick(() => { async = true; }); diff --git a/test/sequential/test-async-wrap-getasyncid.js b/test/sequential/test-async-wrap-getasyncid.js index 39fd8f6183..2abe7f61d7 100644 --- a/test/sequential/test-async-wrap-getasyncid.js +++ b/test/sequential/test-async-wrap-getasyncid.js @@ -1,5 +1,5 @@ 'use strict'; -// Flags: --expose-gc --expose-internals --no-warnings +// Flags: --expose-gc --expose-internals --no-warnings --test-udp-no-try-send const common = require('../common'); const { internalBinding } = require('internal/test/binding');