From 67ee480936947aa5b1953b7b6e48a0c7a191501e Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Tue, 22 Apr 2014 16:23:19 -0700 Subject: [PATCH 1/2] native: Remove unused and untested UnixDatagram --- src/libnative/io/pipe_unix.rs | 71 ----------------------------------- 1 file changed, 71 deletions(-) diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index b332ced1fc572..5d13a6b5fc5cd 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -155,77 +155,6 @@ impl rtio::RtioPipe for UnixStream { } } -//////////////////////////////////////////////////////////////////////////////// -// Unix Datagram -//////////////////////////////////////////////////////////////////////////////// - -pub struct UnixDatagram { - inner: UnsafeArc, -} - -impl UnixDatagram { - pub fn connect(addr: &CString) -> IoResult { - connect(addr, libc::SOCK_DGRAM).map(|inner| { - UnixDatagram { inner: UnsafeArc::new(inner) } - }) - } - - pub fn bind(addr: &CString) -> IoResult { - bind(addr, libc::SOCK_DGRAM).map(|inner| { - UnixDatagram { inner: UnsafeArc::new(inner) } - }) - } - - fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } } - - pub fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> { - let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; - let storagep = &mut storage as *mut libc::sockaddr_storage; - let mut addrlen: libc::socklen_t = - mem::size_of::() as libc::socklen_t; - let ret = retry(|| unsafe { - libc::recvfrom(self.fd(), - buf.as_ptr() as *mut libc::c_void, - buf.len() as libc::size_t, - 0, - storagep as *mut libc::sockaddr, - &mut addrlen) as libc::c_int - }); - if ret < 0 { return Err(super::last_error()) } - sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| { - Ok((ret as uint, addr)) - }) - } - - pub fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> { - let (dst, len) = try!(addr_to_sockaddr_un(dst)); - let dstp = &dst as *libc::sockaddr_storage; - let ret = retry(|| unsafe { - libc::sendto(self.fd(), - buf.as_ptr() as *libc::c_void, - buf.len() as libc::size_t, - 0, - dstp as *libc::sockaddr, - len as libc::socklen_t) as libc::c_int - }); - match ret { - -1 => Err(super::last_error()), - n if n as uint != buf.len() => { - Err(io::IoError { - kind: io::OtherIoError, - desc: "couldn't send entire packet at once", - detail: None, - }) - } - _ => Ok(()) - } - } - - pub fn clone(&mut self) -> UnixDatagram { - UnixDatagram { inner: self.inner.clone() } - } -} - //////////////////////////////////////////////////////////////////////////////// // Unix Listener //////////////////////////////////////////////////////////////////////////////// From 6328f7c199a1697aaee7e5fe2b397c457e6c311a Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Tue, 22 Apr 2014 18:38:59 -0700 Subject: [PATCH 2/2] std: Add timeouts to unix connect/accept This adds support for connecting to a unix socket with a timeout (a named pipe on windows), and accepting a connection with a timeout. The goal is to bring unix pipes/named sockets back in line with TCP support for timeouts. Similarly to the TCP sockets, all methods are marked #[experimental] due to uncertainty about the type of the timeout argument. This internally involved a good bit of refactoring to share as much code as possible between TCP servers and pipe servers, but the core implementation did not change drastically as part of this commit. cc #13523 --- src/liblibc/lib.rs | 2 +- src/libnative/io/c_win32.rs | 2 + src/libnative/io/mod.rs | 6 +- src/libnative/io/net.rs | 128 +------------ src/libnative/io/pipe_unix.rs | 59 +++--- src/libnative/io/pipe_win32.rs | 56 +++++- src/libnative/io/util.rs | 136 ++++++++++++++ src/librustuv/net.rs | 332 ++++++++++++++++++--------------- src/librustuv/pipe.rs | 59 +++--- src/librustuv/uvio.rs | 5 +- src/libstd/io/net/unix.rs | 91 ++++++++- src/libstd/rt/rtio.rs | 4 +- 12 files changed, 531 insertions(+), 349 deletions(-) create mode 100644 src/libnative/io/util.rs diff --git a/src/liblibc/lib.rs b/src/liblibc/lib.rs index 98613f885cd45..bebf95a4a3ba6 100644 --- a/src/liblibc/lib.rs +++ b/src/liblibc/lib.rs @@ -225,7 +225,7 @@ pub use funcs::bsd43::{shutdown}; #[cfg(windows)] pub use consts::os::extra::{PIPE_UNLIMITED_INSTANCES, ERROR_ACCESS_DENIED}; #[cfg(windows)] pub use consts::os::extra::{FILE_WRITE_ATTRIBUTES, FILE_READ_ATTRIBUTES}; #[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_BUSY, ERROR_IO_PENDING}; -#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED}; +#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED, WAIT_OBJECT_0}; #[cfg(windows)] pub use types::os::common::bsd44::{SOCKET}; #[cfg(windows)] pub use types::os::common::posix01::{stat, utimbuf}; #[cfg(windows)] pub use types::os::arch::extra::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES}; diff --git a/src/libnative/io/c_win32.rs b/src/libnative/io/c_win32.rs index dbbb39b3b7b52..6c84424e97a0d 100644 --- a/src/libnative/io/c_win32.rs +++ b/src/libnative/io/c_win32.rs @@ -59,4 +59,6 @@ extern "system" { optname: libc::c_int, optval: *mut libc::c_char, optlen: *mut libc::c_int) -> libc::c_int; + + pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL; } diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index 19cb5c5f1d4f0..944766e8fd070 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -44,6 +44,7 @@ pub use self::process::Process; pub mod addrinfo; pub mod net; pub mod process; +mod util; #[cfg(unix)] #[path = "file_unix.rs"] @@ -177,8 +178,9 @@ impl rtio::IoFactory for IoFactory { fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener:Send> { pipe::UnixListener::bind(path).map(|s| ~s as ~RtioUnixListener:Send) } - fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send> { - pipe::UnixStream::connect(path).map(|s| ~s as ~RtioPipe:Send) + fn unix_connect(&mut self, path: &CString, + timeout: Option) -> IoResult<~RtioPipe:Send> { + pipe::UnixStream::connect(path, timeout).map(|s| ~s as ~RtioPipe:Send) } fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, hint: Option) -> IoResult<~[ai::Info]> { diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index 93ec23e32ad42..cc41da846b2b2 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -13,13 +13,12 @@ use std::cast; use std::io::net::ip; use std::io; use std::mem; -use std::os; -use std::ptr; use std::rt::rtio; use std::sync::arc::UnsafeArc; use super::{IoResult, retry, keep_going}; use super::c; +use super::util; //////////////////////////////////////////////////////////////////////////////// // sockaddr and misc bindings @@ -118,8 +117,8 @@ fn setsockopt(fd: sock_t, opt: libc::c_int, val: libc::c_int, } } -fn getsockopt(fd: sock_t, opt: libc::c_int, - val: libc::c_int) -> IoResult { +pub fn getsockopt(fd: sock_t, opt: libc::c_int, + val: libc::c_int) -> IoResult { unsafe { let mut slot: T = mem::init(); let mut len = mem::size_of::() as libc::socklen_t; @@ -145,21 +144,6 @@ fn last_error() -> io::IoError { super::last_error() } -fn ms_to_timeval(ms: u64) -> libc::timeval { - libc::timeval { - tv_sec: (ms / 1000) as libc::time_t, - tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t, - } -} - -fn timeout(desc: &'static str) -> io::IoError { - io::IoError { - kind: io::TimedOut, - desc: desc, - detail: None, - } -} - #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); } #[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); } @@ -270,7 +254,7 @@ impl TcpStream { let addrp = &addr as *_ as *libc::sockaddr; match timeout { Some(timeout) => { - try!(TcpStream::connect_timeout(fd, addrp, len, timeout)); + try!(util::connect_timeout(fd, addrp, len, timeout)); Ok(ret) }, None => { @@ -282,84 +266,6 @@ impl TcpStream { } } - // See http://developerweb.net/viewtopic.php?id=3196 for where this is - // derived from. - fn connect_timeout(fd: sock_t, - addrp: *libc::sockaddr, - len: libc::socklen_t, - timeout_ms: u64) -> IoResult<()> { - #[cfg(unix)] use INPROGRESS = libc::EINPROGRESS; - #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS; - #[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK; - #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK; - - // Make sure the call to connect() doesn't block - try!(set_nonblocking(fd, true)); - - let ret = match unsafe { libc::connect(fd, addrp, len) } { - // If the connection is in progress, then we need to wait for it to - // finish (with a timeout). The current strategy for doing this is - // to use select() with a timeout. - -1 if os::errno() as int == INPROGRESS as int || - os::errno() as int == WOULDBLOCK as int => { - let mut set: c::fd_set = unsafe { mem::init() }; - c::fd_set(&mut set, fd); - match await(fd, &mut set, timeout_ms) { - 0 => Err(timeout("connection timed out")), - -1 => Err(last_error()), - _ => { - let err: libc::c_int = try!( - getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR)); - if err == 0 { - Ok(()) - } else { - Err(io::IoError::from_errno(err as uint, true)) - } - } - } - } - - -1 => Err(last_error()), - _ => Ok(()), - }; - - // be sure to turn blocking I/O back on - try!(set_nonblocking(fd, false)); - return ret; - - #[cfg(unix)] - fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> { - let set = nb as libc::c_int; - super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) })) - } - #[cfg(windows)] - fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> { - let mut set = nb as libc::c_ulong; - if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } { - Err(last_error()) - } else { - Ok(()) - } - } - - #[cfg(unix)] - fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int { - let start = ::io::timer::now(); - retry(|| unsafe { - // Recalculate the timeout each iteration (it is generally - // undefined what the value of the 'tv' is after select - // returns EINTR). - let tv = ms_to_timeval(timeout - (::io::timer::now() - start)); - c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv) - }) - } - #[cfg(windows)] - fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int { - let tv = ms_to_timeval(timeout); - unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) } - } - } - pub fn fd(&self) -> sock_t { // This unsafety is fine because it's just a read-only arc unsafe { (*self.inner.get()).fd } @@ -533,7 +439,7 @@ impl TcpAcceptor { pub fn native_accept(&mut self) -> IoResult { if self.deadline != 0 { - try!(self.accept_deadline()); + try!(util::accept_deadline(self.fd(), self.deadline)); } unsafe { let mut storage: libc::sockaddr_storage = mem::init(); @@ -550,25 +456,6 @@ impl TcpAcceptor { } } } - - fn accept_deadline(&mut self) -> IoResult<()> { - let mut set: c::fd_set = unsafe { mem::init() }; - c::fd_set(&mut set, self.fd()); - - match retry(|| { - // If we're past the deadline, then pass a 0 timeout to select() so - // we can poll the status of the socket. - let now = ::io::timer::now(); - let ms = if self.deadline > now {0} else {self.deadline - now}; - let tv = ms_to_timeval(ms); - let n = if cfg!(windows) {1} else {self.fd() as libc::c_int + 1}; - unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) } - }) { - -1 => Err(last_error()), - 0 => Err(timeout("accept timed out")), - _ => return Ok(()), - } - } } impl rtio::RtioSocket for TcpAcceptor { @@ -585,10 +472,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) } fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) } fn set_timeout(&mut self, timeout: Option) { - self.deadline = match timeout { - None => 0, - Some(t) => ::io::timer::now() + t, - }; + self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); } } diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index 5d13a6b5fc5cd..190cae05d4343 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -8,16 +8,17 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use libc; use std::c_str::CString; use std::cast; +use std::intrinsics; use std::io; -use libc; use std::mem; use std::rt::rtio; use std::sync::arc::UnsafeArc; -use std::intrinsics; use super::{IoResult, retry, keep_going}; +use super::util; use super::file::fd_t; fn unix_socket(ty: libc::c_int) -> IoResult { @@ -52,22 +53,6 @@ fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint return Ok((storage, len)); } -fn sockaddr_to_unix(storage: &libc::sockaddr_storage, - len: uint) -> IoResult { - match storage.ss_family as libc::c_int { - libc::AF_UNIX => { - assert!(len as uint <= mem::size_of::()); - let storage: &libc::sockaddr_un = unsafe { - cast::transmute(storage) - }; - unsafe { - Ok(CString::new(storage.sun_path.as_ptr(), false).clone()) - } - } - _ => Err(io::standard_error(io::InvalidInput)) - } -} - struct Inner { fd: fd_t, } @@ -76,16 +61,24 @@ impl Drop for Inner { fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } } } -fn connect(addr: &CString, ty: libc::c_int) -> IoResult { +fn connect(addr: &CString, ty: libc::c_int, + timeout: Option) -> IoResult { let (addr, len) = try!(addr_to_sockaddr_un(addr)); let inner = Inner { fd: try!(unix_socket(ty)) }; - let addrp = &addr as *libc::sockaddr_storage; - match retry(|| unsafe { - libc::connect(inner.fd, addrp as *libc::sockaddr, - len as libc::socklen_t) - }) { - -1 => Err(super::last_error()), - _ => Ok(inner) + let addrp = &addr as *_ as *libc::sockaddr; + let len = len as libc::socklen_t; + + match timeout { + None => { + match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) { + -1 => Err(super::last_error()), + _ => Ok(inner) + } + } + Some(timeout_ms) => { + try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms)); + Ok(inner) + } } } @@ -110,8 +103,9 @@ pub struct UnixStream { } impl UnixStream { - pub fn connect(addr: &CString) -> IoResult { - connect(addr, libc::SOCK_STREAM).map(|inner| { + pub fn connect(addr: &CString, + timeout: Option) -> IoResult { + connect(addr, libc::SOCK_STREAM, timeout).map(|inner| { UnixStream { inner: UnsafeArc::new(inner) } }) } @@ -176,7 +170,7 @@ impl UnixListener { pub fn native_listen(self, backlog: int) -> IoResult { match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { -1 => Err(super::last_error()), - _ => Ok(UnixAcceptor { listener: self }) + _ => Ok(UnixAcceptor { listener: self, deadline: 0 }) } } } @@ -189,12 +183,16 @@ impl rtio::RtioUnixListener for UnixListener { pub struct UnixAcceptor { listener: UnixListener, + deadline: u64, } impl UnixAcceptor { fn fd(&self) -> fd_t { self.listener.fd() } pub fn native_accept(&mut self) -> IoResult { + if self.deadline != 0 { + try!(util::accept_deadline(self.fd(), self.deadline)); + } let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; let storagep = &mut storage as *mut libc::sockaddr_storage; let size = mem::size_of::(); @@ -214,6 +212,9 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor { fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> { self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send) } + fn set_timeout(&mut self, timeout: Option) { + self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + } } impl Drop for UnixListener { diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs index 84b3d887c0498..a4f09ded0ac11 100644 --- a/src/libnative/io/pipe_win32.rs +++ b/src/libnative/io/pipe_win32.rs @@ -93,6 +93,8 @@ use std::sync::arc::UnsafeArc; use std::intrinsics; use super::IoResult; +use super::c; +use super::util; struct Event(libc::HANDLE); @@ -210,8 +212,9 @@ impl UnixStream { None } - pub fn connect(addr: &CString) -> IoResult { + pub fn connect(addr: &CString, timeout: Option) -> IoResult { as_utf16_p(addr.as_str().unwrap(), |p| { + let start = ::io::timer::now(); loop { match UnixStream::try_connect(p) { Some(handle) => { @@ -246,11 +249,26 @@ impl UnixStream { return Err(super::last_error()) } - // An example I found on microsoft's website used 20 seconds, - // libuv uses 30 seconds, hence we make the obvious choice of - // waiting for 25 seconds. - if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 { - return Err(super::last_error()) + match timeout { + Some(timeout) => { + let now = ::io::timer::now(); + let timed_out = (now - start) >= timeout || unsafe { + let ms = (timeout - (now - start)) as libc::DWORD; + libc::WaitNamedPipeW(p, ms) == 0 + }; + if timed_out { + return Err(util::timeout("connect timed out")) + } + } + + // An example I found on microsoft's website used 20 + // seconds, libuv uses 30 seconds, hence we make the + // obvious choice of waiting for 25 seconds. + None => { + if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 { + return Err(super::last_error()) + } + } } } }) @@ -372,6 +390,7 @@ impl UnixListener { Ok(UnixAcceptor { listener: self, event: try!(Event::new(true, false)), + deadline: 0, }) } } @@ -391,6 +410,7 @@ impl rtio::RtioUnixListener for UnixListener { pub struct UnixAcceptor { listener: UnixListener, event: Event, + deadline: u64, } impl UnixAcceptor { @@ -438,7 +458,28 @@ impl UnixAcceptor { overlapped.hEvent = self.event.handle(); if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } { let mut err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_IO_PENDING as libc::DWORD { + // If we've got a timeout, use WaitForSingleObject in tandem + // with CancelIo to figure out if we should indeed get the + // result. + if self.deadline != 0 { + let now = ::io::timer::now(); + let timeout = self.deadline < now || unsafe { + let ms = (self.deadline - now) as libc::DWORD; + let r = libc::WaitForSingleObject(overlapped.hEvent, + ms); + r != libc::WAIT_OBJECT_0 + }; + if timeout { + unsafe { let _ = c::CancelIo(handle); } + return Err(util::timeout("accept timed out")) + } + } + + // This will block until the overlapped I/O is completed. The + // timeout was previously handled, so this will either block in + // the normal case or succeed very quickly in the timeout case. let ret = unsafe { let mut transfer = 0; libc::GetOverlappedResult(handle, @@ -488,5 +529,8 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor { fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> { self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send) } + fn set_timeout(&mut self, timeout: Option) { + self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0); + } } diff --git a/src/libnative/io/util.rs b/src/libnative/io/util.rs new file mode 100644 index 0000000000000..0aaac8f8ad81e --- /dev/null +++ b/src/libnative/io/util.rs @@ -0,0 +1,136 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use libc; +use std::io::IoResult; +use std::io; +use std::mem; +use std::ptr; + +use super::c; +use super::net; +use super::{retry, last_error}; + +pub fn timeout(desc: &'static str) -> io::IoError { + io::IoError { + kind: io::TimedOut, + desc: desc, + detail: None, + } +} + +pub fn ms_to_timeval(ms: u64) -> libc::timeval { + libc::timeval { + tv_sec: (ms / 1000) as libc::time_t, + tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t, + } +} + +// See http://developerweb.net/viewtopic.php?id=3196 for where this is +// derived from. +pub fn connect_timeout(fd: net::sock_t, + addrp: *libc::sockaddr, + len: libc::socklen_t, + timeout_ms: u64) -> IoResult<()> { + use std::os; + #[cfg(unix)] use INPROGRESS = libc::EINPROGRESS; + #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS; + #[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK; + #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK; + + // Make sure the call to connect() doesn't block + try!(set_nonblocking(fd, true)); + + let ret = match unsafe { libc::connect(fd, addrp, len) } { + // If the connection is in progress, then we need to wait for it to + // finish (with a timeout). The current strategy for doing this is + // to use select() with a timeout. + -1 if os::errno() as int == INPROGRESS as int || + os::errno() as int == WOULDBLOCK as int => { + let mut set: c::fd_set = unsafe { mem::init() }; + c::fd_set(&mut set, fd); + match await(fd, &mut set, timeout_ms) { + 0 => Err(timeout("connection timed out")), + -1 => Err(last_error()), + _ => { + let err: libc::c_int = try!( + net::getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR)); + if err == 0 { + Ok(()) + } else { + Err(io::IoError::from_errno(err as uint, true)) + } + } + } + } + + -1 => Err(last_error()), + _ => Ok(()), + }; + + // be sure to turn blocking I/O back on + try!(set_nonblocking(fd, false)); + return ret; + + #[cfg(unix)] + fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> { + let set = nb as libc::c_int; + super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) })) + } + + #[cfg(windows)] + fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> { + let mut set = nb as libc::c_ulong; + if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } { + Err(last_error()) + } else { + Ok(()) + } + } + + #[cfg(unix)] + fn await(fd: net::sock_t, set: &mut c::fd_set, + timeout: u64) -> libc::c_int { + let start = ::io::timer::now(); + retry(|| unsafe { + // Recalculate the timeout each iteration (it is generally + // undefined what the value of the 'tv' is after select + // returns EINTR). + let tv = ms_to_timeval(timeout - (::io::timer::now() - start)); + c::select(fd + 1, ptr::null(), set as *mut _ as *_, + ptr::null(), &tv) + }) + } + #[cfg(windows)] + fn await(_fd: net::sock_t, set: &mut c::fd_set, + timeout: u64) -> libc::c_int { + let tv = ms_to_timeval(timeout); + unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) } + } +} + +pub fn accept_deadline(fd: net::sock_t, deadline: u64) -> IoResult<()> { + let mut set: c::fd_set = unsafe { mem::init() }; + c::fd_set(&mut set, fd); + + match retry(|| { + // If we're past the deadline, then pass a 0 timeout to select() so + // we can poll the status of the socket. + let now = ::io::timer::now(); + let ms = if deadline < now {0} else {deadline - now}; + let tv = ms_to_timeval(ms); + let n = if cfg!(windows) {1} else {fd as libc::c_int + 1}; + unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) } + }) { + -1 => Err(last_error()), + 0 => Err(timeout("accept timed out")), + _ => return Ok(()), + } +} diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 27a0691193980..470a343b84ed6 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -9,7 +9,7 @@ // except according to those terms. use std::cast; -use std::io::IoError; +use std::io::{IoError, IoResult}; use std::io::net::ip; use libc::{size_t, ssize_t, c_int, c_void, c_uint}; use libc; @@ -145,96 +145,43 @@ fn socket_name(sk: SocketNameKind, n => Err(uv_error_to_io_error(UvError(n))) } } - //////////////////////////////////////////////////////////////////////////////// -/// TCP implementation +// Helpers for handling timeouts, shared for pipes/tcp //////////////////////////////////////////////////////////////////////////////// -pub struct TcpWatcher { - handle: *uvll::uv_tcp_t, - stream: StreamWatcher, - home: HomeHandle, - refcount: Refcount, - - // libuv can't support concurrent reads and concurrent writes of the same - // stream object, so we use these access guards in order to arbitrate among - // multiple concurrent reads and writes. Note that libuv *can* read and - // write simultaneously, it just can't read and read simultaneously. - read_access: Access, - write_access: Access, -} - -pub struct TcpListener { - home: HomeHandle, - handle: *uvll::uv_pipe_t, - closing_task: Option, - outgoing: Sender>, - incoming: Receiver>, +pub struct ConnectCtx { + pub status: c_int, + pub task: Option, + pub timer: Option<~TimerWatcher>, } -pub struct TcpAcceptor { - listener: ~TcpListener, +pub struct AcceptTimeout { timer: Option, timeout_tx: Option>, timeout_rx: Option>, } -// TCP watchers (clients/streams) - -impl TcpWatcher { - pub fn new(io: &mut UvIoFactory) -> TcpWatcher { - let handle = io.make_handle(); - TcpWatcher::new_home(&io.loop_, handle) - } - - fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher { - let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) }; - assert_eq!(unsafe { - uvll::uv_tcp_init(loop_.handle, handle) - }, 0); - TcpWatcher { - home: home, - handle: handle, - stream: StreamWatcher::new(handle), - refcount: Refcount::new(), - read_access: Access::new(), - write_access: Access::new(), - } - } - - pub fn connect(io: &mut UvIoFactory, - address: ip::SocketAddr, - timeout: Option) -> Result { - struct Ctx { - status: c_int, - task: Option, - timer: Option<~TimerWatcher>, - } - - let tcp = TcpWatcher::new(io); - let (addr, _len) = addr_to_sockaddr(address); +impl ConnectCtx { + pub fn connect( + mut self, obj: T, timeout: Option, io: &mut UvIoFactory, + f: |&Request, &T, uvll::uv_connect_cb| -> libc::c_int + ) -> Result { let mut req = Request::new(uvll::UV_CONNECT); - let result = unsafe { - let addr_p = &addr as *libc::sockaddr_storage; - uvll::uv_tcp_connect(req.handle, tcp.handle, - addr_p as *libc::sockaddr, - connect_cb) - }; - return match result { + let r = f(&req, &obj, connect_cb); + return match r { 0 => { req.defuse(); // uv callback now owns this request - let mut cx = Ctx { status: -1, task: None, timer: None }; match timeout { Some(t) => { let mut timer = TimerWatcher::new(io); timer.start(timer_cb, t, 0); - cx.timer = Some(timer); + self.timer = Some(timer); } None => {} } - wait_until_woken_after(&mut cx.task, &io.loop_, || { - let data = &cx as *_; - match cx.timer { + wait_until_woken_after(&mut self.task, &io.loop_, || { + let data = &self as *_; + match self.timer { Some(ref mut timer) => unsafe { timer.set_data(data) }, None => {} } @@ -247,9 +194,9 @@ impl TcpWatcher { // If we failed because of a timeout, drop the TcpWatcher as // soon as possible because it's data is now set to null and we // want to cancel the callback ASAP. - match cx.status { - 0 => Ok(tcp), - n => { drop(tcp); Err(UvError(n)) } + match self.status { + 0 => Ok(obj), + n => { drop(obj); Err(UvError(n)) } } } n => Err(UvError(n)) @@ -258,8 +205,8 @@ impl TcpWatcher { extern fn timer_cb(handle: *uvll::uv_timer_t) { // Don't close the corresponding tcp request, just wake up the task // and let RAII take care of the pending watcher. - let cx: &mut Ctx = unsafe { - &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx) + let cx: &mut ConnectCtx = unsafe { + &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx) }; cx.status = uvll::ECANCELED; wakeup(&mut cx.task); @@ -279,7 +226,7 @@ impl TcpWatcher { let data = unsafe { uvll::get_data_for_req(req.handle) }; if data.is_null() { return } - let cx: &mut Ctx = unsafe { &mut *(data as *mut Ctx) }; + let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) }; cx.status = status; match cx.timer { Some(ref mut t) => t.stop(), @@ -299,6 +246,157 @@ impl TcpWatcher { } } +impl AcceptTimeout { + pub fn new() -> AcceptTimeout { + AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None } + } + + pub fn accept(&mut self, c: &Receiver>) -> IoResult { + match self.timeout_rx { + None => c.recv(), + Some(ref rx) => { + use std::comm::Select; + + // Poll the incoming channel first (don't rely on the order of + // select just yet). If someone's pending then we should return + // them immediately. + match c.try_recv() { + Ok(data) => return data, + Err(..) => {} + } + + // Use select to figure out which channel gets ready first. We + // do some custom handling of select to ensure that we never + // actually drain the timeout channel (we'll keep seeing the + // timeout message in the future). + let s = Select::new(); + let mut timeout = s.handle(rx); + let mut data = s.handle(c); + unsafe { + timeout.add(); + data.add(); + } + if s.wait() == timeout.id() { + Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) + } else { + c.recv() + } + } + } + } + + pub fn clear(&mut self) { + // Clear any previous timeout by dropping the timer and transmission + // channels + drop((self.timer.take(), + self.timeout_tx.take(), + self.timeout_rx.take())) + } + + pub fn set_timeout + HomingIO>( + &mut self, ms: u64, t: &mut T + ) { + // If we have a timeout, lazily initialize the timer which will be used + // to fire when the timeout runs out. + if self.timer.is_none() { + let _m = t.fire_homing_missile(); + let loop_ = Loop::wrap(unsafe { + uvll::get_loop_for_uv_handle(t.uv_handle()) + }); + let mut timer = TimerWatcher::new_home(&loop_, t.home().clone()); + unsafe { + timer.set_data(self as *mut _ as *AcceptTimeout); + } + self.timer = Some(timer); + } + + // Once we've got a timer, stop any previous timeout, reset it for the + // current one, and install some new channels to send/receive data on + let timer = self.timer.get_mut_ref(); + timer.stop(); + timer.start(timer_cb, ms, 0); + let (tx, rx) = channel(); + self.timeout_tx = Some(tx); + self.timeout_rx = Some(rx); + + extern fn timer_cb(timer: *uvll::uv_timer_t) { + let acceptor: &mut AcceptTimeout = unsafe { + &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout) + }; + // This send can never fail because if this timer is active then the + // receiving channel is guaranteed to be alive + acceptor.timeout_tx.get_ref().send(()); + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +/// TCP implementation +//////////////////////////////////////////////////////////////////////////////// + +pub struct TcpWatcher { + handle: *uvll::uv_tcp_t, + stream: StreamWatcher, + home: HomeHandle, + refcount: Refcount, + + // libuv can't support concurrent reads and concurrent writes of the same + // stream object, so we use these access guards in order to arbitrate among + // multiple concurrent reads and writes. Note that libuv *can* read and + // write simultaneously, it just can't read and read simultaneously. + read_access: Access, + write_access: Access, +} + +pub struct TcpListener { + home: HomeHandle, + handle: *uvll::uv_pipe_t, + closing_task: Option, + outgoing: Sender>, + incoming: Receiver>, +} + +pub struct TcpAcceptor { + listener: ~TcpListener, + timeout: AcceptTimeout, +} + +// TCP watchers (clients/streams) + +impl TcpWatcher { + pub fn new(io: &mut UvIoFactory) -> TcpWatcher { + let handle = io.make_handle(); + TcpWatcher::new_home(&io.loop_, handle) + } + + fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher { + let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) }; + assert_eq!(unsafe { + uvll::uv_tcp_init(loop_.handle, handle) + }, 0); + TcpWatcher { + home: home, + handle: handle, + stream: StreamWatcher::new(handle), + refcount: Refcount::new(), + read_access: Access::new(), + write_access: Access::new(), + } + } + + pub fn connect(io: &mut UvIoFactory, + address: ip::SocketAddr, + timeout: Option) -> Result { + let tcp = TcpWatcher::new(io); + let cx = ConnectCtx { status: -1, task: None, timer: None }; + let (addr, _len) = addr_to_sockaddr(address); + let addr_p = &addr as *_ as *libc::sockaddr; + cx.connect(tcp, timeout, io, |req, tcp, cb| { + unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) } + }) + } +} + impl HomingIO for TcpWatcher { fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home } } @@ -463,9 +561,7 @@ impl rtio::RtioTcpListener for TcpListener { // create the acceptor object from ourselves let mut acceptor = ~TcpAcceptor { listener: self, - timer: None, - timeout_tx: None, - timeout_rx: None, + timeout: AcceptTimeout::new(), }; let _m = acceptor.fire_homing_missile(); @@ -516,37 +612,7 @@ impl rtio::RtioSocket for TcpAcceptor { impl rtio::RtioTcpAcceptor for TcpAcceptor { fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> { - match self.timeout_rx { - None => self.listener.incoming.recv(), - Some(ref rx) => { - use std::comm::Select; - - // Poll the incoming channel first (don't rely on the order of - // select just yet). If someone's pending then we should return - // them immediately. - match self.listener.incoming.try_recv() { - Ok(data) => return data, - Err(..) => {} - } - - // Use select to figure out which channel gets ready first. We - // do some custom handling of select to ensure that we never - // actually drain the timeout channel (we'll keep seeing the - // timeout message in the future). - let s = Select::new(); - let mut timeout = s.handle(rx); - let mut data = s.handle(&self.listener.incoming); - unsafe { - timeout.add(); - data.add(); - } - if s.wait() == timeout.id() { - Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) - } else { - self.listener.incoming.recv() - } - } - } + self.timeout.accept(&self.listener.incoming) } fn accept_simultaneously(&mut self) -> Result<(), IoError> { @@ -564,47 +630,9 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { } fn set_timeout(&mut self, ms: Option) { - // First, if the timeout is none, clear any previous timeout by dropping - // the timer and transmission channels - let ms = match ms { - None => { - return drop((self.timer.take(), - self.timeout_tx.take(), - self.timeout_rx.take())) - } - Some(ms) => ms, - }; - - // If we have a timeout, lazily initialize the timer which will be used - // to fire when the timeout runs out. - if self.timer.is_none() { - let _m = self.fire_homing_missile(); - let loop_ = Loop::wrap(unsafe { - uvll::get_loop_for_uv_handle(self.listener.handle) - }); - let mut timer = TimerWatcher::new_home(&loop_, self.home().clone()); - unsafe { - timer.set_data(self as *mut _ as *TcpAcceptor); - } - self.timer = Some(timer); - } - - // Once we've got a timer, stop any previous timeout, reset it for the - // current one, and install some new channels to send/receive data on - let timer = self.timer.get_mut_ref(); - timer.stop(); - timer.start(timer_cb, ms, 0); - let (tx, rx) = channel(); - self.timeout_tx = Some(tx); - self.timeout_rx = Some(rx); - - extern fn timer_cb(timer: *uvll::uv_timer_t) { - let acceptor: &mut TcpAcceptor = unsafe { - &mut *(uvll::get_data_for_uv_handle(timer) as *mut TcpAcceptor) - }; - // This send can never fail because if this timer is active then the - // receiving channel is guaranteed to be alive - acceptor.timeout_tx.get_ref().send(()); + match ms { + None => self.timeout.clear(), + Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener), } } } diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index 6ee684ff9bdc0..7277be1616b71 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -12,14 +12,13 @@ use std::c_str::CString; use std::io::IoError; use libc; use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor}; -use std::rt::task::BlockedTask; use access::Access; use homing::{HomingIO, HomeHandle}; +use net; use rc::Refcount; use stream::StreamWatcher; -use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error, - wait_until_woken_after, wakeup}; +use super::{Loop, UvError, UvHandle, uv_error_to_io_error}; use uvio::UvIoFactory; use uvll; @@ -43,6 +42,7 @@ pub struct PipeListener { pub struct PipeAcceptor { listener: ~PipeListener, + timeout: net::AcceptTimeout, } // PipeWatcher implementation and traits @@ -84,36 +84,18 @@ impl PipeWatcher { } } - pub fn connect(io: &mut UvIoFactory, name: &CString) + pub fn connect(io: &mut UvIoFactory, name: &CString, timeout: Option) -> Result { - struct Ctx { task: Option, result: libc::c_int, } - let mut cx = Ctx { task: None, result: 0 }; - let mut req = Request::new(uvll::UV_CONNECT); let pipe = PipeWatcher::new(io, false); - - wait_until_woken_after(&mut cx.task, &io.loop_, || { + let cx = net::ConnectCtx { status: -1, task: None, timer: None }; + cx.connect(pipe, timeout, io, |req, pipe, cb| { unsafe { - uvll::uv_pipe_connect(req.handle, - pipe.handle(), - name.with_ref(|p| p), - connect_cb) + uvll::uv_pipe_connect(req.handle, pipe.handle(), + name.with_ref(|p| p), cb) } - req.set_data(&cx); - req.defuse(); // uv callback now owns this request - }); - return match cx.result { - 0 => Ok(pipe), - n => Err(UvError(n)) - }; - - extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {; - let req = Request::wrap(req); - assert!(status != uvll::ECANCELED); - let cx: &mut Ctx = unsafe { req.get_data() }; - cx.result = status; - wakeup(&mut cx.task); - } + 0 + }) } pub fn handle(&self) -> *uvll::uv_pipe_t { self.stream.handle } @@ -199,7 +181,10 @@ impl PipeListener { impl RtioUnixListener for PipeListener { fn listen(~self) -> Result<~RtioUnixAcceptor:Send, IoError> { // create the acceptor object from ourselves - let mut acceptor = ~PipeAcceptor { listener: self }; + let mut acceptor = ~PipeAcceptor { + listener: self, + timeout: net::AcceptTimeout::new(), + }; let _m = acceptor.fire_homing_missile(); // FIXME: the 128 backlog should be configurable @@ -247,7 +232,14 @@ impl Drop for PipeListener { impl RtioUnixAcceptor for PipeAcceptor { fn accept(&mut self) -> Result<~RtioPipe:Send, IoError> { - self.listener.incoming.recv() + self.timeout.accept(&self.listener.incoming) + } + + fn set_timeout(&mut self, timeout_ms: Option) { + match timeout_ms { + None => self.timeout.clear(), + Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener), + } } } @@ -265,7 +257,8 @@ mod tests { #[test] fn connect_err() { - match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str()) { + match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str(), + None) { Ok(..) => fail!(), Err(..) => {} } @@ -312,7 +305,7 @@ mod tests { assert!(client.write([2]).is_ok()); }); rx.recv(); - let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str()).unwrap(); + let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap(); assert!(c.write([1]).is_ok()); let mut buf = [0]; assert!(c.read(buf).unwrap() == 1); @@ -332,7 +325,7 @@ mod tests { drop(p.accept().unwrap()); }); rx.recv(); - let _c = PipeWatcher::connect(local_loop(), &path.to_c_str()).unwrap(); + let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap(); fail!() } diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index 3127a01d70e46..81d7ac6601e23 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -291,8 +291,9 @@ impl IoFactory for UvIoFactory { } } - fn unix_connect(&mut self, path: &CString) -> Result<~rtio::RtioPipe:Send, IoError> { - match PipeWatcher::connect(self, path) { + fn unix_connect(&mut self, path: &CString, + timeout: Option) -> Result<~rtio::RtioPipe:Send, IoError> { + match PipeWatcher::connect(self, path, timeout) { Ok(p) => Ok(~p as ~rtio::RtioPipe:Send), Err(e) => Err(uv_error_to_io_error(e)), } diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index bf56817702021..b75b797e9744f 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -61,7 +61,31 @@ impl UnixStream { /// ``` pub fn connect(path: &P) -> IoResult { LocalIo::maybe_raise(|io| { - io.unix_connect(&path.to_c_str()).map(UnixStream::new) + io.unix_connect(&path.to_c_str(), None).map(UnixStream::new) + }) + } + + /// Connect to a pipe named by `path`. This will attempt to open a + /// connection to the underlying socket. + /// + /// The returned stream will be closed when the object falls out of scope. + /// + /// # Example + /// + /// ```rust + /// # #![allow(unused_must_use)] + /// use std::io::net::unix::UnixStream; + /// + /// let server = Path::new("path/to/my/socket"); + /// let mut stream = UnixStream::connect(&server); + /// stream.write([1, 2, 3]); + /// ``` + #[experimental = "the timeout argument is likely to change types"] + pub fn connect_timeout(path: &P, + timeout_ms: u64) -> IoResult { + LocalIo::maybe_raise(|io| { + let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms)); + s.map(UnixStream::new) }) } } @@ -128,6 +152,25 @@ pub struct UnixAcceptor { obj: ~RtioUnixAcceptor:Send, } +impl UnixAcceptor { + /// Sets a timeout for this acceptor, after which accept() will no longer + /// block indefinitely. + /// + /// The argument specified is the amount of time, in milliseconds, into the + /// future after which all invocations of accept() will not block (and any + /// pending invocation will return). A value of `None` will clear any + /// existing timeout. + /// + /// When using this method, it is likely necessary to reset the timeout as + /// appropriate, the timeout specified is specific to this object, not + /// specific to the next request. + #[experimental = "the name and arguments to this function are likely \ + to change"] + pub fn set_timeout(&mut self, timeout_ms: Option) { + self.obj.set_timeout(timeout_ms) + } +} + impl Acceptor for UnixAcceptor { fn accept(&mut self) -> IoResult { self.obj.accept().map(UnixStream::new) @@ -135,6 +178,7 @@ impl Acceptor for UnixAcceptor { } #[cfg(test)] +#[allow(experimental)] mod tests { use prelude::*; use super::*; @@ -371,4 +415,49 @@ mod tests { drop(l.listen().unwrap()); assert!(!path.exists()); } #[cfg(not(windows))]) + + iotest!(fn accept_timeout() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap(); + + a.set_timeout(Some(10)); + + // Make sure we time out once and future invocations also time out + let err = a.accept().err().unwrap(); + assert_eq!(err.kind, TimedOut); + let err = a.accept().err().unwrap(); + assert_eq!(err.kind, TimedOut); + + // Also make sure that even though the timeout is expired that we will + // continue to receive any pending connections. + let l = UnixStream::connect(&addr).unwrap(); + for i in range(0, 1001) { + match a.accept() { + Ok(..) => break, + Err(ref e) if e.kind == TimedOut => {} + Err(e) => fail!("error: {}", e), + } + if i == 1000 { fail!("should have a pending connection") } + } + drop(l); + + // Unset the timeout and make sure that this always blocks. + a.set_timeout(None); + let addr2 = addr.clone(); + spawn(proc() { + drop(UnixStream::connect(&addr2)); + }); + a.accept().unwrap(); + }) + + iotest!(fn connect_timeout_error() { + let addr = next_test_unix(); + assert!(UnixStream::connect_timeout(&addr, 100).is_err()); + }) + + iotest!(fn connect_timeout_success() { + let addr = next_test_unix(); + let _a = UnixListener::bind(&addr).unwrap().listen().unwrap(); + assert!(UnixStream::connect_timeout(&addr, 100).is_ok()); + }) } diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 5dd148346695d..f3c7fdaf7105b 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -152,7 +152,8 @@ pub trait IoFactory { fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket:Send>; fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener:Send>; - fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send>; + fn unix_connect(&mut self, path: &CString, + timeout: Option) -> IoResult<~RtioPipe:Send>; fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, hint: Option) -> IoResult<~[ai::Info]>; @@ -274,6 +275,7 @@ pub trait RtioUnixListener { pub trait RtioUnixAcceptor { fn accept(&mut self) -> IoResult<~RtioPipe:Send>; + fn set_timeout(&mut self, timeout: Option); } pub trait RtioTTY {