diff --git a/src/libnative/io/c_windows.rs b/src/libnative/io/c_windows.rs index 80c9e91b48f7a..909b37895b7b5 100644 --- a/src/libnative/io/c_windows.rs +++ b/src/libnative/io/c_windows.rs @@ -26,6 +26,14 @@ pub static ENABLE_INSERT_MODE: libc::DWORD = 0x20; pub static ENABLE_LINE_INPUT: libc::DWORD = 0x2; pub static ENABLE_PROCESSED_INPUT: libc::DWORD = 0x1; pub static ENABLE_QUICK_EDIT_MODE: libc::DWORD = 0x40; +pub static WSA_INVALID_EVENT: WSAEVENT = 0 as WSAEVENT; + +pub static FD_ACCEPT: libc::c_long = 0x08; +pub static FD_MAX_EVENTS: uint = 10; +pub static WSA_INFINITE: libc::DWORD = libc::INFINITE; +pub static WSA_WAIT_TIMEOUT: libc::DWORD = libc::consts::os::extra::WAIT_TIMEOUT; +pub static WSA_WAIT_EVENT_0: libc::DWORD = libc::consts::os::extra::WAIT_OBJECT_0; +pub static WSA_WAIT_FAILED: libc::DWORD = libc::consts::os::extra::WAIT_FAILED; #[repr(C)] #[cfg(target_arch = "x86")] @@ -52,6 +60,16 @@ pub struct WSADATA { pub type LPWSADATA = *mut WSADATA; +#[repr(C)] +pub struct WSANETWORKEVENTS { + pub lNetworkEvents: libc::c_long, + pub iErrorCode: [libc::c_int, ..FD_MAX_EVENTS], +} + +pub type LPWSANETWORKEVENTS = *mut WSANETWORKEVENTS; + +pub type WSAEVENT = libc::HANDLE; + #[repr(C)] pub struct fd_set { fd_count: libc::c_uint, @@ -68,6 +86,21 @@ extern "system" { pub fn WSAStartup(wVersionRequested: libc::WORD, lpWSAData: LPWSADATA) -> libc::c_int; pub fn WSAGetLastError() -> libc::c_int; + pub fn WSACloseEvent(hEvent: WSAEVENT) -> libc::BOOL; + pub fn WSACreateEvent() -> WSAEVENT; + pub fn WSAEventSelect(s: libc::SOCKET, + hEventObject: WSAEVENT, + lNetworkEvents: libc::c_long) -> libc::c_int; + pub fn WSASetEvent(hEvent: WSAEVENT) -> libc::BOOL; + pub fn WSAWaitForMultipleEvents(cEvents: libc::DWORD, + lphEvents: *const WSAEVENT, + fWaitAll: libc::BOOL, + dwTimeout: libc::DWORD, + fAltertable: libc::BOOL) -> libc::DWORD; + pub fn WSAEnumNetworkEvents(s: libc::SOCKET, + hEventObject: WSAEVENT, + lpNetworkEvents: LPWSANETWORKEVENTS) + -> libc::c_int; pub fn ioctlsocket(s: libc::SOCKET, cmd: libc::c_long, argp: *mut libc::c_ulong) -> libc::c_int; @@ -82,6 +115,12 @@ extern "system" { optval: *mut libc::c_char, optlen: *mut libc::c_int) -> libc::c_int; + pub fn SetEvent(hEvent: libc::HANDLE) -> libc::BOOL; + pub fn WaitForMultipleObjects(nCount: libc::DWORD, + lpHandles: *const libc::HANDLE, + bWaitAll: libc::BOOL, + dwMilliseconds: libc::DWORD) -> libc::DWORD; + pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL; pub fn CancelIoEx(hFile: libc::HANDLE, lpOverlapped: libc::LPOVERLAPPED) -> libc::BOOL; diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index 2255578ba8038..368b5914444ac 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -11,21 +11,25 @@ use alloc::arc::Arc; use libc; use std::mem; +use std::ptr; use std::rt::mutex; use std::rt::rtio; use std::rt::rtio::{IoResult, IoError}; +use std::sync::atomic; use super::{retry, keep_going}; use super::c; use super::util; +#[cfg(unix)] use super::process; +#[cfg(unix)] use super::file::FileDesc; + +pub use self::os::{init, sock_t, last_error}; + //////////////////////////////////////////////////////////////////////////////// // sockaddr and misc bindings //////////////////////////////////////////////////////////////////////////////// -#[cfg(windows)] pub type sock_t = libc::SOCKET; -#[cfg(unix)] pub type sock_t = super::file::fd_t; - pub fn htons(u: u16) -> u16 { u.to_be() } @@ -97,7 +101,7 @@ fn socket(addr: rtio::SocketAddr, ty: libc::c_int) -> IoResult { rtio::Ipv6Addr(..) => libc::AF_INET6, }; match libc::socket(fam, ty, 0) { - -1 => Err(super::last_error()), + -1 => Err(os::last_error()), fd => Ok(fd), } } @@ -111,7 +115,7 @@ fn setsockopt(fd: sock_t, opt: libc::c_int, val: libc::c_int, payload, mem::size_of::() as libc::socklen_t); if ret != 0 { - Err(last_error()) + Err(os::last_error()) } else { Ok(()) } @@ -127,7 +131,7 @@ pub fn getsockopt(fd: sock_t, opt: libc::c_int, &mut slot as *mut _ as *mut _, &mut len); if ret != 0 { - Err(last_error()) + Err(os::last_error()) } else { assert!(len as uint == mem::size_of::()); Ok(slot) @@ -135,25 +139,6 @@ pub fn getsockopt(fd: sock_t, opt: libc::c_int, } } -#[cfg(windows)] -pub fn last_error() -> IoError { - use std::os; - let code = unsafe { c::WSAGetLastError() as uint }; - IoError { - code: code, - extra: 0, - detail: Some(os::error_string(code)), - } -} - -#[cfg(not(windows))] -fn last_error() -> IoError { - super::last_error() -} - -#[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); } -#[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); } - fn sockname(fd: sock_t, f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr, *mut libc::socklen_t) -> libc::c_int) @@ -167,7 +152,7 @@ fn sockname(fd: sock_t, storage as *mut libc::sockaddr, &mut len as *mut libc::socklen_t); if ret != 0 { - return Err(last_error()) + return Err(os::last_error()) } } return sockaddr_to_addr(&storage, len as uint); @@ -221,28 +206,6 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage, } } -#[cfg(unix)] -pub fn init() {} - -#[cfg(windows)] -pub fn init() { - - unsafe { - use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; - static mut INITIALIZED: bool = false; - static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; - - let _guard = LOCK.lock(); - if !INITIALIZED { - let mut data: c::WSADATA = mem::zeroed(); - let ret = c::WSAStartup(0x202, // version 2.2 - &mut data); - assert_eq!(ret, 0); - INITIALIZED = true; - } - } -} - //////////////////////////////////////////////////////////////////////////////// // TCP streams //////////////////////////////////////////////////////////////////////////////// @@ -289,7 +252,7 @@ impl TcpStream { }, None => { match retry(|| unsafe { libc::connect(fd, addrp, len) }) { - -1 => Err(last_error()), + -1 => Err(os::last_error()), _ => Ok(ret), } } @@ -435,7 +398,7 @@ impl rtio::RtioSocket for TcpStream { } impl Drop for Inner { - fn drop(&mut self) { unsafe { close(self.fd); } } + fn drop(&mut self) { unsafe { os::close(self.fd); } } } #[unsafe_destructor] @@ -471,7 +434,7 @@ impl TcpListener { } match unsafe { libc::bind(fd, addrp, len) } { - -1 => Err(last_error()), + -1 => Err(os::last_error()), _ => Ok(ret), } } @@ -480,8 +443,44 @@ impl TcpListener { pub fn native_listen(self, backlog: int) -> IoResult { match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { - -1 => Err(last_error()), - _ => Ok(TcpAcceptor { listener: self, deadline: 0 }) + -1 => Err(os::last_error()), + + #[cfg(unix)] + _ => { + let (reader, writer) = try!(process::pipe()); + try!(util::set_nonblocking(reader.fd(), true)); + try!(util::set_nonblocking(writer.fd(), true)); + try!(util::set_nonblocking(self.fd(), true)); + Ok(TcpAcceptor { + inner: Arc::new(AcceptorInner { + listener: self, + reader: reader, + writer: writer, + closed: atomic::AtomicBool::new(false), + }), + deadline: 0, + }) + } + + #[cfg(windows)] + _ => { + let accept = try!(os::Event::new()); + let ret = unsafe { + c::WSAEventSelect(self.fd(), accept.handle(), c::FD_ACCEPT) + }; + if ret != 0 { + return Err(os::last_error()) + } + Ok(TcpAcceptor { + inner: Arc::new(AcceptorInner { + listener: self, + abort: try!(os::Event::new()), + accept: accept, + closed: atomic::AtomicBool::new(false), + }), + deadline: 0, + }) + } } } } @@ -502,31 +501,135 @@ impl rtio::RtioSocket for TcpListener { } pub struct TcpAcceptor { - listener: TcpListener, + inner: Arc, deadline: u64, } +#[cfg(unix)] +struct AcceptorInner { + listener: TcpListener, + reader: FileDesc, + writer: FileDesc, + closed: atomic::AtomicBool, +} + +#[cfg(windows)] +struct AcceptorInner { + listener: TcpListener, + abort: os::Event, + accept: os::Event, + closed: atomic::AtomicBool, +} + impl TcpAcceptor { - pub fn fd(&self) -> sock_t { self.listener.fd() } + pub fn fd(&self) -> sock_t { self.inner.listener.fd() } + #[cfg(unix)] pub fn native_accept(&mut self) -> IoResult { - if self.deadline != 0 { - try!(util::await(self.fd(), Some(self.deadline), util::Readable)); + // In implementing accept, the two main concerns are dealing with + // close_accept() and timeouts. The unix implementation is based on a + // nonblocking accept plus a call to select(). Windows ends up having + // an entirely separate implementation than unix, which is explained + // below. + // + // To implement timeouts, all blocking is done via select() instead of + // accept() by putting the socket in non-blocking mode. Because + // select() takes a timeout argument, we just pass through the timeout + // to select(). + // + // To implement close_accept(), we have a self-pipe to ourselves which + // is passed to select() along with the socket being accepted on. The + // self-pipe is never written to unless close_accept() is called. + let deadline = if self.deadline == 0 {None} else {Some(self.deadline)}; + + while !self.inner.closed.load(atomic::SeqCst) { + match retry(|| unsafe { + libc::accept(self.fd(), ptr::mut_null(), ptr::mut_null()) + }) { + -1 if util::wouldblock() => {} + -1 => return Err(os::last_error()), + fd => return Ok(TcpStream::new(Inner::new(fd as sock_t))), + } + try!(util::await([self.fd(), self.inner.reader.fd()], + deadline, util::Readable)); } - unsafe { - let mut storage: libc::sockaddr_storage = mem::zeroed(); - let storagep = &mut storage as *mut libc::sockaddr_storage; - let size = mem::size_of::(); - let mut size = size as libc::socklen_t; - match retry(|| { - libc::accept(self.fd(), - storagep as *mut libc::sockaddr, - &mut size as *mut libc::socklen_t) as libc::c_int - }) as sock_t { - -1 => Err(last_error()), - fd => Ok(TcpStream::new(Inner::new(fd))), + + Err(util::eof()) + } + + #[cfg(windows)] + pub fn native_accept(&mut self) -> IoResult { + // Unlink unix, windows cannot invoke `select` on arbitrary file + // descriptors like pipes, only sockets. Consequently, windows cannot + // use the same implementation as unix for accept() when close_accept() + // is considered. + // + // In order to implement close_accept() and timeouts, windows uses + // event handles. An acceptor-specific abort event is created which + // will only get set in close_accept(), and it will never be un-set. + // Additionally, another acceptor-specific event is associated with the + // FD_ACCEPT network event. + // + // These two events are then passed to WaitForMultipleEvents to see + // which one triggers first, and the timeout passed to this function is + // the local timeout for the acceptor. + // + // If the wait times out, then the accept timed out. If the wait + // succeeds with the abort event, then we were closed, and if the wait + // succeeds otherwise, then we do a nonblocking poll via `accept` to + // see if we can accept a connection. The connection is candidate to be + // stolen, so we do all of this in a loop as well. + let events = [self.inner.abort.handle(), self.inner.accept.handle()]; + + while !self.inner.closed.load(atomic::SeqCst) { + let ms = if self.deadline == 0 { + c::WSA_INFINITE as u64 + } else { + let now = ::io::timer::now(); + if self.deadline < now {0} else {self.deadline - now} + }; + let ret = unsafe { + c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE, + ms as libc::DWORD, libc::FALSE) + }; + match ret { + c::WSA_WAIT_TIMEOUT => { + return Err(util::timeout("accept timed out")) + } + c::WSA_WAIT_FAILED => return Err(os::last_error()), + c::WSA_WAIT_EVENT_0 => break, + n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1), + } + + let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() }; + let ret = unsafe { + c::WSAEnumNetworkEvents(self.fd(), events[1], &mut wsaevents) + }; + if ret != 0 { return Err(os::last_error()) } + + if wsaevents.lNetworkEvents & c::FD_ACCEPT == 0 { continue } + match unsafe { + libc::accept(self.fd(), ptr::mut_null(), ptr::mut_null()) + } { + -1 if util::wouldblock() => {} + -1 => return Err(os::last_error()), + + // Accepted sockets inherit the same properties as the caller, + // so we need to deregister our event and switch the socket back + // to blocking mode + fd => { + let stream = TcpStream::new(Inner::new(fd)); + let ret = unsafe { + c::WSAEventSelect(fd, events[1], 0) + }; + if ret != 0 { return Err(os::last_error()) } + try!(util::set_nonblocking(fd, false)); + return Ok(stream) + } } } + + Err(util::eof()) } } @@ -546,6 +649,35 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { fn set_timeout(&mut self, timeout: Option) { self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); } + + fn clone(&self) -> Box { + box TcpAcceptor { + inner: self.inner.clone(), + deadline: 0, + } as Box + } + + #[cfg(unix)] + fn close_accept(&mut self) -> IoResult<()> { + self.inner.closed.store(true, atomic::SeqCst); + let mut fd = FileDesc::new(self.inner.writer.fd(), false); + match fd.inner_write([0]) { + Ok(..) => Ok(()), + Err(..) if util::wouldblock() => Ok(()), + Err(e) => Err(e), + } + } + + #[cfg(windows)] + fn close_accept(&mut self) -> IoResult<()> { + self.inner.closed.store(true, atomic::SeqCst); + let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) }; + if ret == libc::TRUE { + Ok(()) + } else { + Err(os::last_error()) + } + } } //////////////////////////////////////////////////////////////////////////////// @@ -572,7 +704,7 @@ impl UdpSocket { let addrp = &storage as *const _ as *const libc::sockaddr; match unsafe { libc::bind(fd, addrp, len) } { - -1 => Err(last_error()), + -1 => Err(os::last_error()), _ => Ok(ret), } } @@ -817,7 +949,7 @@ pub fn read(fd: sock_t, // With a timeout, first we wait for the socket to become // readable using select(), specifying the relevant timeout for // our previously set deadline. - try!(util::await(fd, deadline, util::Readable)); + try!(util::await([fd], deadline, util::Readable)); // At this point, we're still within the timeout, and we've // determined that the socket is readable (as returned by @@ -828,7 +960,7 @@ pub fn read(fd: sock_t, let _guard = lock(); match retry(|| read(deadline.is_some())) { -1 if util::wouldblock() => { assert!(deadline.is_some()); } - -1 => return Err(last_error()), + -1 => return Err(os::last_error()), n => { ret = n; break } } } @@ -836,7 +968,7 @@ pub fn read(fd: sock_t, match ret { 0 => Err(util::eof()), - n if n < 0 => Err(last_error()), + n if n < 0 => Err(os::last_error()), n => Ok(n as uint) } } @@ -871,7 +1003,7 @@ pub fn write(fd: sock_t, while written < buf.len() && (write_everything || written == 0) { // As with read(), first wait for the socket to be ready for // the I/O operation. - match util::await(fd, deadline, util::Writable) { + match util::await([fd], deadline, util::Writable) { Err(ref e) if e.code == libc::EOF as uint && written > 0 => { assert!(deadline.is_some()); return Err(util::short_write(written, "short write")) @@ -887,15 +1019,88 @@ pub fn write(fd: sock_t, let len = buf.len() - written; match retry(|| write(deadline.is_some(), ptr, len) as libc::c_int) { -1 if util::wouldblock() => {} - -1 => return Err(last_error()), + -1 => return Err(os::last_error()), n => { written += n as uint; } } } ret = 0; } if ret < 0 { - Err(last_error()) + Err(os::last_error()) } else { Ok(written) } } + +#[cfg(windows)] +mod os { + use libc; + use std::mem; + use std::rt::rtio::{IoError, IoResult}; + + use io::c; + + pub type sock_t = libc::SOCKET; + pub struct Event(c::WSAEVENT); + + impl Event { + pub fn new() -> IoResult { + let event = unsafe { c::WSACreateEvent() }; + if event == c::WSA_INVALID_EVENT { + Err(last_error()) + } else { + Ok(Event(event)) + } + } + + pub fn handle(&self) -> c::WSAEVENT { let Event(handle) = *self; handle } + } + + impl Drop for Event { + fn drop(&mut self) { + unsafe { let _ = c::WSACloseEvent(self.handle()); } + } + } + + pub fn init() { + unsafe { + use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; + static mut INITIALIZED: bool = false; + static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; + + let _guard = LOCK.lock(); + if !INITIALIZED { + let mut data: c::WSADATA = mem::zeroed(); + let ret = c::WSAStartup(0x202, // version 2.2 + &mut data); + assert_eq!(ret, 0); + INITIALIZED = true; + } + } + } + + pub fn last_error() -> IoError { + use std::os; + let code = unsafe { c::WSAGetLastError() as uint }; + IoError { + code: code, + extra: 0, + detail: Some(os::error_string(code)), + } + } + + pub unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); } +} + +#[cfg(unix)] +mod os { + use libc; + use std::rt::rtio::IoError; + use io; + + pub type sock_t = io::file::fd_t; + + pub fn init() {} + pub fn last_error() -> IoError { io::last_error() } + pub unsafe fn close(sock: sock_t) { let _ = libc::close(sock); } +} diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index 895b8b5929c96..a3564dfe2cc9a 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -15,12 +15,14 @@ use std::mem; use std::rt::mutex; use std::rt::rtio; use std::rt::rtio::{IoResult, IoError}; +use std::sync::atomic; use super::retry; use super::net; use super::util; use super::c; -use super::file::fd_t; +use super::process; +use super::file::{fd_t, FileDesc}; fn unix_socket(ty: libc::c_int) -> IoResult { match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } { @@ -225,7 +227,23 @@ impl UnixListener { pub fn native_listen(self, backlog: int) -> IoResult { match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { -1 => Err(super::last_error()), - _ => Ok(UnixAcceptor { listener: self, deadline: 0 }) + + #[cfg(unix)] + _ => { + let (reader, writer) = try!(process::pipe()); + try!(util::set_nonblocking(reader.fd(), true)); + try!(util::set_nonblocking(writer.fd(), true)); + try!(util::set_nonblocking(self.fd(), true)); + Ok(UnixAcceptor { + inner: Arc::new(AcceptorInner { + listener: self, + reader: reader, + writer: writer, + closed: atomic::AtomicBool::new(false), + }), + deadline: 0, + }) + } } } } @@ -240,29 +258,45 @@ impl rtio::RtioUnixListener for UnixListener { } pub struct UnixAcceptor { - listener: UnixListener, + inner: Arc, deadline: u64, } +#[cfg(unix)] +struct AcceptorInner { + listener: UnixListener, + reader: FileDesc, + writer: FileDesc, + closed: atomic::AtomicBool, +} + impl UnixAcceptor { - fn fd(&self) -> fd_t { self.listener.fd() } + fn fd(&self) -> fd_t { self.inner.listener.fd() } pub fn native_accept(&mut self) -> IoResult { - if self.deadline != 0 { - try!(util::await(self.fd(), Some(self.deadline), util::Readable)); - } - let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; - let storagep = &mut storage as *mut libc::sockaddr_storage; - let size = mem::size_of::(); - let mut size = size as libc::socklen_t; - match retry(|| unsafe { - libc::accept(self.fd(), - storagep as *mut libc::sockaddr, - &mut size as *mut libc::socklen_t) as libc::c_int - }) { - -1 => Err(super::last_error()), - fd => Ok(UnixStream::new(Arc::new(Inner::new(fd)))) + let deadline = if self.deadline == 0 {None} else {Some(self.deadline)}; + + while !self.inner.closed.load(atomic::SeqCst) { + unsafe { + let mut storage: libc::sockaddr_storage = mem::zeroed(); + let storagep = &mut storage as *mut libc::sockaddr_storage; + let size = mem::size_of::(); + let mut size = size as libc::socklen_t; + match retry(|| { + libc::accept(self.fd(), + storagep as *mut libc::sockaddr, + &mut size as *mut libc::socklen_t) as libc::c_int + }) { + -1 if util::wouldblock() => {} + -1 => return Err(super::last_error()), + fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))), + } + } + try!(util::await([self.fd(), self.inner.reader.fd()], + deadline, util::Readable)); } + + Err(util::eof()) } } @@ -273,6 +307,24 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor { fn set_timeout(&mut self, timeout: Option) { self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); } + + fn clone(&self) -> Box { + box UnixAcceptor { + inner: self.inner.clone(), + deadline: 0, + } as Box + } + + #[cfg(unix)] + fn close_accept(&mut self) -> IoResult<()> { + self.inner.closed.store(true, atomic::SeqCst); + let mut fd = FileDesc::new(self.inner.writer.fd(), false); + match fd.inner_write([0]) { + Ok(..) => Ok(()), + Err(..) if util::wouldblock() => Ok(()), + Err(e) => Err(e), + } + } } impl Drop for UnixListener { diff --git a/src/libnative/io/pipe_windows.rs b/src/libnative/io/pipe_windows.rs index 717915e5d23bd..95afa11f4a9a0 100644 --- a/src/libnative/io/pipe_windows.rs +++ b/src/libnative/io/pipe_windows.rs @@ -169,23 +169,30 @@ unsafe fn pipe(name: *const u16, init: bool) -> libc::HANDLE { } pub fn await(handle: libc::HANDLE, deadline: u64, - overlapped: &mut libc::OVERLAPPED) -> bool { - if deadline == 0 { return true } + events: &[libc::HANDLE]) -> IoResult { + use libc::consts::os::extra::{WAIT_FAILED, WAIT_TIMEOUT, WAIT_OBJECT_0}; // If we've got a timeout, use WaitForSingleObject in tandem with CancelIo // to figure out if we should indeed get the result. - let now = ::io::timer::now(); - let timeout = deadline < now || unsafe { - let ms = (deadline - now) as libc::DWORD; - let r = libc::WaitForSingleObject(overlapped.hEvent, - ms); - r != libc::WAIT_OBJECT_0 - }; - if timeout { - unsafe { let _ = c::CancelIo(handle); } - false + let ms = if deadline == 0 { + libc::INFINITE as u64 } else { - true + let now = ::io::timer::now(); + if deadline < now {0} else {deadline - now} + }; + let ret = unsafe { + c::WaitForMultipleObjects(events.len() as libc::DWORD, + events.as_ptr(), + libc::FALSE, + ms as libc::DWORD) + }; + match ret { + WAIT_FAILED => Err(super::last_error()), + WAIT_TIMEOUT => unsafe { + let _ = c::CancelIo(handle); + Err(util::timeout("operation timed out")) + }, + n => Ok((n - WAIT_OBJECT_0) as uint) } } @@ -390,8 +397,8 @@ impl rtio::RtioPipe for UnixStream { drop(guard); loop { // Process a timeout if one is pending - let succeeded = await(self.handle(), self.read_deadline, - &mut overlapped); + let wait_succeeded = await(self.handle(), self.read_deadline, + [overlapped.hEvent]); let ret = unsafe { libc::GetOverlappedResult(self.handle(), @@ -408,7 +415,7 @@ impl rtio::RtioPipe for UnixStream { // If the reading half is now closed, then we're done. If we woke up // because the writing half was closed, keep trying. - if !succeeded { + if wait_succeeded.is_err() { return Err(util::timeout("read timed out")) } if self.read_closed() { @@ -458,8 +465,8 @@ impl rtio::RtioPipe for UnixStream { }) } // Process a timeout if one is pending - let succeeded = await(self.handle(), self.write_deadline, - &mut overlapped); + let wait_succeeded = await(self.handle(), self.write_deadline, + [overlapped.hEvent]); let ret = unsafe { libc::GetOverlappedResult(self.handle(), &mut overlapped, @@ -473,7 +480,7 @@ impl rtio::RtioPipe for UnixStream { if os::errno() != libc::ERROR_OPERATION_ABORTED as uint { return Err(super::last_error()) } - if !succeeded { + if !wait_succeeded.is_ok() { let amt = offset + bytes_written as uint; return if amt > 0 { Err(IoError { @@ -577,6 +584,10 @@ impl UnixListener { listener: self, event: try!(Event::new(true, false)), deadline: 0, + inner: Arc::new(AcceptorState { + abort: try!(Event::new(true, false)), + closed: atomic::AtomicBool::new(false), + }), }) } } @@ -597,11 +608,17 @@ impl rtio::RtioUnixListener for UnixListener { } pub struct UnixAcceptor { + inner: Arc, listener: UnixListener, event: Event, deadline: u64, } +struct AcceptorState { + abort: Event, + closed: atomic::AtomicBool, +} + impl UnixAcceptor { pub fn native_accept(&mut self) -> IoResult { // This function has some funky implementation details when working with @@ -638,6 +655,10 @@ impl UnixAcceptor { // using the original server pipe. let handle = self.listener.handle; + // If we've had an artifical call to close_accept, be sure to never + // proceed in accepting new clients in the future + if self.inner.closed.load(atomic::SeqCst) { return Err(util::eof()) } + let name = try!(to_utf16(&self.listener.name)); // Once we've got a "server handle", we need to wait for a client to @@ -652,7 +673,9 @@ impl UnixAcceptor { if err == libc::ERROR_IO_PENDING as libc::DWORD { // Process a timeout if one is pending - let _ = await(handle, self.deadline, &mut overlapped); + let wait_succeeded = await(handle, self.deadline, + [self.inner.abort.handle(), + overlapped.hEvent]); // This will block until the overlapped I/O is completed. The // timeout was previously handled, so this will either block in @@ -665,7 +688,11 @@ impl UnixAcceptor { libc::TRUE) }; if ret == 0 { - err = unsafe { libc::GetLastError() }; + if wait_succeeded.is_ok() { + err = unsafe { libc::GetLastError() }; + } else { + return Err(util::timeout("accept timed out")) + } } else { // we succeeded, bypass the check below err = libc::ERROR_PIPE_CONNECTED as libc::DWORD; @@ -709,5 +736,34 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor { fn set_timeout(&mut self, timeout: Option) { self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0); } + + fn clone(&self) -> Box { + let name = to_utf16(&self.listener.name).ok().unwrap(); + box UnixAcceptor { + inner: self.inner.clone(), + event: Event::new(true, false).ok().unwrap(), + deadline: 0, + listener: UnixListener { + name: self.listener.name.clone(), + handle: unsafe { + let p = pipe(name.as_ptr(), false) ; + assert!(p != libc::INVALID_HANDLE_VALUE as libc::HANDLE); + p + }, + }, + } as Box + } + + fn close_accept(&mut self) -> IoResult<()> { + self.inner.closed.store(true, atomic::SeqCst); + let ret = unsafe { + c::SetEvent(self.inner.abort.handle()) + }; + if ret == 0 { + Err(super::last_error()) + } else { + Ok(()) + } + } } diff --git a/src/libnative/io/process.rs b/src/libnative/io/process.rs index d1b2885415747..b8ec0cd549611 100644 --- a/src/libnative/io/process.rs +++ b/src/libnative/io/process.rs @@ -191,7 +191,7 @@ impl Drop for Process { } } -fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> { +pub fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> { #[cfg(unix)] use libc::EMFILE as ERROR; #[cfg(windows)] use libc::WSAEMFILE as ERROR; struct Closer { fd: libc::c_int } diff --git a/src/libnative/io/util.rs b/src/libnative/io/util.rs index 356805d91de7a..078989b058180 100644 --- a/src/libnative/io/util.rs +++ b/src/libnative/io/util.rs @@ -9,6 +9,7 @@ // except according to those terms. use libc; +use std::cmp; use std::mem; use std::os; use std::ptr; @@ -166,10 +167,18 @@ pub fn connect_timeout(fd: net::sock_t, } } -pub fn await(fd: net::sock_t, deadline: Option, +pub fn await(fds: &[net::sock_t], deadline: Option, status: SocketStatus) -> IoResult<()> { let mut set: c::fd_set = unsafe { mem::zeroed() }; - c::fd_set(&mut set, fd); + let mut max = 0; + for &fd in fds.iter() { + c::fd_set(&mut set, fd); + max = cmp::max(max, fd + 1); + } + if cfg!(windows) { + max = fds.len() as net::sock_t; + } + let (read, write) = match status { Readable => (&mut set as *mut _, ptr::mut_null()), Writable => (ptr::mut_null(), &mut set as *mut _), @@ -188,8 +197,9 @@ pub fn await(fd: net::sock_t, deadline: Option, &mut tv as *mut _ } }; - let n = if cfg!(windows) {1} else {fd as libc::c_int + 1}; - let r = unsafe { c::select(n, read, write, ptr::mut_null(), tvp) }; + let r = unsafe { + c::select(max as libc::c_int, read, write, ptr::mut_null(), tvp) + }; r }) { -1 => Err(last_error()), diff --git a/src/librustrt/rtio.rs b/src/librustrt/rtio.rs index 6525adf07f76f..261d544a24149 100644 --- a/src/librustrt/rtio.rs +++ b/src/librustrt/rtio.rs @@ -246,6 +246,8 @@ pub trait RtioTcpAcceptor : RtioSocket { fn accept_simultaneously(&mut self) -> IoResult<()>; fn dont_accept_simultaneously(&mut self) -> IoResult<()>; fn set_timeout(&mut self, timeout: Option); + fn clone(&self) -> Box; + fn close_accept(&mut self) -> IoResult<()>; } pub trait RtioTcpStream : RtioSocket { @@ -335,6 +337,8 @@ pub trait RtioUnixListener { pub trait RtioUnixAcceptor { fn accept(&mut self) -> IoResult>; fn set_timeout(&mut self, timeout: Option); + fn clone(&self) -> Box; + fn close_accept(&mut self) -> IoResult<()>; } pub trait RtioTTY { diff --git a/src/librustuv/access.rs b/src/librustuv/access.rs index 9bd8af6419e0b..290293cf086c1 100644 --- a/src/librustuv/access.rs +++ b/src/librustuv/access.rs @@ -22,38 +22,40 @@ use std::cell::UnsafeCell; use homing::HomingMissile; -pub struct Access { - inner: Arc>, +pub struct Access { + inner: Arc>>, } -pub struct Guard<'a> { - access: &'a mut Access, +pub struct Guard<'a, T> { + access: &'a mut Access, missile: Option, } -struct Inner { +struct Inner { queue: Vec<(BlockedTask, uint)>, held: bool, closed: bool, + data: T, } -impl Access { - pub fn new() -> Access { +impl Access { + pub fn new(data: T) -> Access { Access { inner: Arc::new(UnsafeCell::new(Inner { queue: vec![], held: false, closed: false, + data: data, })) } } pub fn grant<'a>(&'a mut self, token: uint, - missile: HomingMissile) -> Guard<'a> { + missile: HomingMissile) -> Guard<'a, T> { // This unsafety is actually OK because the homing missile argument // guarantees that we're on the same event loop as all the other objects // attempting to get access granted. - let inner: &mut Inner = unsafe { &mut *self.inner.get() }; + let inner = unsafe { &mut *self.inner.get() }; if inner.held { let t: Box = Local::take(); @@ -69,6 +71,15 @@ impl Access { Guard { access: self, missile: Some(missile) } } + pub fn unsafe_get(&self) -> *mut T { + unsafe { &mut (*self.inner.get()).data as *mut _ } + } + + // Safe version which requires proof that you are on the home scheduler. + pub fn get_mut<'a>(&'a mut self, _missile: &HomingMissile) -> &'a mut T { + unsafe { &mut *self.unsafe_get() } + } + pub fn close(&self, _missile: &HomingMissile) { // This unsafety is OK because with a homing missile we're guaranteed to // be the only task looking at the `closed` flag (and are therefore @@ -82,21 +93,27 @@ impl Access { // is only safe to invoke while on the home event loop, and there is no // guarantee that this i being invoked on the home event loop. pub unsafe fn dequeue(&mut self, token: uint) -> Option { - let inner: &mut Inner = &mut *self.inner.get(); + let inner = &mut *self.inner.get(); match inner.queue.iter().position(|&(_, t)| t == token) { Some(i) => Some(inner.queue.remove(i).unwrap().val0()), None => None, } } + + /// Test whether this access is closed, using a homing missile to prove + /// that it's safe + pub fn is_closed(&self, _missile: &HomingMissile) -> bool { + unsafe { (*self.inner.get()).closed } + } } -impl Clone for Access { - fn clone(&self) -> Access { +impl Clone for Access { + fn clone(&self) -> Access { Access { inner: self.inner.clone() } } } -impl<'a> Guard<'a> { +impl<'a, T: Send> Guard<'a, T> { pub fn is_closed(&self) -> bool { // See above for why this unsafety is ok, it just applies to the read // instead of the write. @@ -104,13 +121,27 @@ impl<'a> Guard<'a> { } } +impl<'a, T: Send> Deref for Guard<'a, T> { + fn deref<'a>(&'a self) -> &'a T { + // A guard represents exclusive access to a piece of data, so it's safe + // to hand out shared and mutable references + unsafe { &(*self.access.inner.get()).data } + } +} + +impl<'a, T: Send> DerefMut for Guard<'a, T> { + fn deref_mut<'a>(&'a mut self) -> &'a mut T { + unsafe { &mut (*self.access.inner.get()).data } + } +} + #[unsafe_destructor] -impl<'a> Drop for Guard<'a> { +impl<'a, T> Drop for Guard<'a, T> { fn drop(&mut self) { // This guard's homing missile is still armed, so we're guaranteed to be // on the same I/O event loop, so this unsafety should be ok. assert!(self.missile.is_some()); - let inner: &mut Inner = unsafe { + let inner: &mut Inner = unsafe { mem::transmute(self.access.inner.get()) }; @@ -133,7 +164,8 @@ impl<'a> Drop for Guard<'a> { } } -impl Drop for Inner { +#[unsafe_destructor] +impl Drop for Inner { fn drop(&mut self) { assert!(!self.held); assert_eq!(self.queue.len(), 0); diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 0da8d0d2108ea..84ef9deaf922f 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -22,7 +22,7 @@ use stream::StreamWatcher; use super::{Loop, Request, UvError, Buf, status_to_io_result, uv_error_to_io_error, UvHandle, slice_to_uv_buf, wait_until_woken_after, wakeup}; -use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx}; +use timeout::{AccessTimeout, ConnectCtx, AcceptTimeout}; use uvio::UvIoFactory; use uvll; @@ -158,20 +158,20 @@ pub struct TcpWatcher { // stream object, so we use these access guards in order to arbitrate among // multiple concurrent reads and writes. Note that libuv *can* read and // write simultaneously, it just can't read and read simultaneously. - read_access: AccessTimeout, - write_access: AccessTimeout, + read_access: AccessTimeout<()>, + write_access: AccessTimeout<()>, } pub struct TcpListener { home: HomeHandle, - handle: *mut uvll::uv_pipe_t, - outgoing: Sender, IoError>>, - incoming: Receiver, IoError>>, + handle: *mut uvll::uv_tcp_t, } pub struct TcpAcceptor { - listener: Box, - timeout: AcceptTimeout, + home: HomeHandle, + handle: *mut uvll::uv_tcp_t, + access: AcceptTimeout>, + refcount: Refcount, } // TCP watchers (clients/streams) @@ -192,8 +192,8 @@ impl TcpWatcher { handle: handle, stream: StreamWatcher::new(handle, true), refcount: Refcount::new(), - read_access: AccessTimeout::new(), - write_access: AccessTimeout::new(), + read_access: AccessTimeout::new(()), + write_access: AccessTimeout::new(()), } } @@ -291,7 +291,7 @@ impl rtio::RtioTcpStream for TcpWatcher { let task = { let m = self.fire_homing_missile(); self.read_access.access.close(&m); - self.stream.cancel_read(uvll::EOF as libc::ssize_t) + self.stream.cancel_read(uvll::EOF as libc::ssize_t) }; let _ = task.map(|t| t.reawaken()); Ok(()) @@ -354,12 +354,9 @@ impl TcpListener { assert_eq!(unsafe { uvll::uv_tcp_init(io.uv_loop(), handle) }, 0); - let (tx, rx) = channel(); let l = box TcpListener { home: io.make_handle(), handle: handle, - outgoing: tx, - incoming: rx, }; let mut storage = unsafe { mem::zeroed() }; let _len = addr_to_sockaddr(address, &mut storage); @@ -390,17 +387,21 @@ impl rtio::RtioSocket for TcpListener { } impl rtio::RtioTcpListener for TcpListener { - fn listen(self: Box) + fn listen(mut self: Box) -> Result, IoError> { + let _m = self.fire_homing_missile(); + // create the acceptor object from ourselves - let mut acceptor = box TcpAcceptor { - listener: self, - timeout: AcceptTimeout::new(), - }; + let acceptor = (box TcpAcceptor { + handle: self.handle, + home: self.home.clone(), + access: AcceptTimeout::new(), + refcount: Refcount::new(), + }).install(); + self.handle = 0 as *mut _; - let _m = acceptor.fire_homing_missile(); // FIXME: the 128 backlog should be configurable - match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } { + match unsafe { uvll::uv_listen(acceptor.handle, 128, listen_cb) } { 0 => Ok(acceptor as Box), n => Err(uv_error_to_io_error(UvError(n))), } @@ -409,7 +410,7 @@ impl rtio::RtioTcpListener for TcpListener { extern fn listen_cb(server: *mut uvll::uv_stream_t, status: c_int) { assert!(status != uvll::ECANCELED); - let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) }; + let tcp: &mut TcpAcceptor = unsafe { UvHandle::from_uv_handle(&server) }; let msg = match status { 0 => { let loop_ = Loop::wrap(unsafe { @@ -421,11 +422,15 @@ extern fn listen_cb(server: *mut uvll::uv_stream_t, status: c_int) { } n => Err(uv_error_to_io_error(UvError(n))) }; - tcp.outgoing.send(msg); + + // If we're running then we have exclusive access, so the unsafe_get() is ok + unsafe { tcp.access.push(msg); } } impl Drop for TcpListener { fn drop(&mut self) { + if self.handle.is_null() { return } + let _m = self.fire_homing_missile(); self.close(); } @@ -434,40 +439,68 @@ impl Drop for TcpListener { // TCP acceptors (bound servers) impl HomingIO for TcpAcceptor { - fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() } + fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home } } impl rtio::RtioSocket for TcpAcceptor { fn socket_name(&mut self) -> Result { let _m = self.fire_homing_missile(); - socket_name(Tcp, self.listener.handle) + socket_name(Tcp, self.handle) } } +impl UvHandle for TcpAcceptor { + fn uv_handle(&self) -> *mut uvll::uv_tcp_t { self.handle } +} + impl rtio::RtioTcpAcceptor for TcpAcceptor { fn accept(&mut self) -> Result, IoError> { - self.timeout.accept(&self.listener.incoming) + let m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.access.accept(m, &loop_) } fn accept_simultaneously(&mut self) -> Result<(), IoError> { let _m = self.fire_homing_missile(); status_to_io_result(unsafe { - uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1) + uvll::uv_tcp_simultaneous_accepts(self.handle, 1) }) } fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { let _m = self.fire_homing_missile(); status_to_io_result(unsafe { - uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0) + uvll::uv_tcp_simultaneous_accepts(self.handle, 0) }) } fn set_timeout(&mut self, ms: Option) { let _m = self.fire_homing_missile(); - match ms { - None => self.timeout.clear(), - Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener), + let loop_ = self.uv_loop(); + self.access.set_timeout(ms, &loop_, &self.home); + } + + fn clone(&self) -> Box { + box TcpAcceptor { + refcount: self.refcount.clone(), + home: self.home.clone(), + handle: self.handle, + access: self.access.clone(), + } as Box + } + + fn close_accept(&mut self) -> Result<(), IoError> { + let m = self.fire_homing_missile(); + self.access.close(m); + Ok(()) + } +} + +impl Drop for TcpAcceptor { + fn drop(&mut self) { + let _m = self.fire_homing_missile(); + if self.refcount.decrement() { + self.close(); } } } @@ -482,8 +515,8 @@ pub struct UdpWatcher { // See above for what these fields are refcount: Refcount, - read_access: AccessTimeout, - write_access: AccessTimeout, + read_access: AccessTimeout<()>, + write_access: AccessTimeout<()>, blocked_sender: Option, } @@ -507,8 +540,8 @@ impl UdpWatcher { handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) }, home: io.make_handle(), refcount: Refcount::new(), - read_access: AccessTimeout::new(), - write_access: AccessTimeout::new(), + read_access: AccessTimeout::new(()), + write_access: AccessTimeout::new(()), blocked_sender: None, }; assert_eq!(unsafe { diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index f0a57546ed43e..9ece6525e1e82 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -31,20 +31,20 @@ pub struct PipeWatcher { refcount: Refcount, // see comments in TcpWatcher for why these exist - write_access: AccessTimeout, - read_access: AccessTimeout, + write_access: AccessTimeout<()>, + read_access: AccessTimeout<()>, } pub struct PipeListener { home: HomeHandle, pipe: *mut uvll::uv_pipe_t, - outgoing: Sender>>, - incoming: Receiver>>, } pub struct PipeAcceptor { - listener: Box, - timeout: AcceptTimeout, + home: HomeHandle, + handle: *mut uvll::uv_pipe_t, + access: AcceptTimeout>, + refcount: Refcount, } // PipeWatcher implementation and traits @@ -71,8 +71,8 @@ impl PipeWatcher { home: home, defused: false, refcount: Refcount::new(), - read_access: AccessTimeout::new(), - write_access: AccessTimeout::new(), + read_access: AccessTimeout::new(()), + write_access: AccessTimeout::new(()), } } @@ -233,12 +233,9 @@ impl PipeListener { // If successful, unwrap the PipeWatcher because we control how // we close the pipe differently. We can't rely on // StreamWatcher's default close method. - let (tx, rx) = channel(); let p = box PipeListener { home: io.make_handle(), pipe: pipe.unwrap(), - incoming: rx, - outgoing: tx, }; Ok(p.install()) } @@ -248,17 +245,21 @@ impl PipeListener { } impl rtio::RtioUnixListener for PipeListener { - fn listen(self: Box) + fn listen(mut self: Box) -> IoResult> { + let _m = self.fire_homing_missile(); + // create the acceptor object from ourselves - let mut acceptor = box PipeAcceptor { - listener: self, - timeout: AcceptTimeout::new(), - }; + let acceptor = (box PipeAcceptor { + handle: self.pipe, + home: self.home.clone(), + access: AcceptTimeout::new(), + refcount: Refcount::new(), + }).install(); + self.pipe = 0 as *mut _; - let _m = acceptor.fire_homing_missile(); // FIXME: the 128 backlog should be configurable - match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } { + match unsafe { uvll::uv_listen(acceptor.handle, 128, listen_cb) } { 0 => Ok(acceptor as Box), n => Err(uv_error_to_io_error(UvError(n))), } @@ -276,7 +277,7 @@ impl UvHandle for PipeListener { extern fn listen_cb(server: *mut uvll::uv_stream_t, status: libc::c_int) { assert!(status != uvll::ECANCELED); - let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) }; + let pipe: &mut PipeAcceptor = unsafe { UvHandle::from_uv_handle(&server) }; let msg = match status { 0 => { let loop_ = Loop::wrap(unsafe { @@ -288,11 +289,15 @@ extern fn listen_cb(server: *mut uvll::uv_stream_t, status: libc::c_int) { } n => Err(uv_error_to_io_error(UvError(n))) }; - pipe.outgoing.send(msg); + + // If we're running then we have exclusive access, so the unsafe_get() is ok + unsafe { pipe.access.push(msg); } } impl Drop for PipeListener { fn drop(&mut self) { + if self.pipe.is_null() { return } + let _m = self.fire_homing_missile(); self.close(); } @@ -302,19 +307,48 @@ impl Drop for PipeListener { impl rtio::RtioUnixAcceptor for PipeAcceptor { fn accept(&mut self) -> IoResult> { - self.timeout.accept(&self.listener.incoming) + let m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.access.accept(m, &loop_) } - fn set_timeout(&mut self, timeout_ms: Option) { - match timeout_ms { - None => self.timeout.clear(), - Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener), - } + fn set_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.access.set_timeout(ms, &loop_, &self.home); + } + + fn clone(&self) -> Box { + box PipeAcceptor { + refcount: self.refcount.clone(), + home: self.home.clone(), + handle: self.handle, + access: self.access.clone(), + } as Box + } + + fn close_accept(&mut self) -> IoResult<()> { + let m = self.fire_homing_missile(); + self.access.close(m); + Ok(()) } } impl HomingIO for PipeAcceptor { - fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home } + fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home } +} + +impl UvHandle for PipeAcceptor { + fn uv_handle(&self) -> *mut uvll::uv_pipe_t { self.handle } +} + +impl Drop for PipeAcceptor { + fn drop(&mut self) { + let _m = self.fire_homing_missile(); + if self.refcount.decrement() { + self.close(); + } + } } #[cfg(test)] diff --git a/src/librustuv/timeout.rs b/src/librustuv/timeout.rs index 1caaf5e0fc75d..32d7395241675 100644 --- a/src/librustuv/timeout.rs +++ b/src/librustuv/timeout.rs @@ -14,7 +14,7 @@ use std::rt::task::BlockedTask; use std::rt::rtio::IoResult; use access; -use homing::{HomeHandle, HomingMissile, HomingIO}; +use homing::{HomeHandle, HomingMissile}; use timer::TimerWatcher; use uvll; use uvio::UvIoFactory; @@ -22,15 +22,15 @@ use {Loop, UvError, uv_error_to_io_error, Request, wakeup}; use {UvHandle, wait_until_woken_after}; /// Management of a timeout when gaining access to a portion of a duplex stream. -pub struct AccessTimeout { +pub struct AccessTimeout { state: TimeoutState, timer: Option>, - pub access: access::Access, + pub access: access::Access, } -pub struct Guard<'a> { +pub struct Guard<'a, T> { state: &'a mut TimeoutState, - pub access: access::Guard<'a>, + pub access: access::Guard<'a, T>, pub can_timeout: bool, } @@ -49,17 +49,18 @@ enum ClientState { } struct TimerContext { - timeout: *mut AccessTimeout, - callback: fn(uint) -> Option, - payload: uint, + timeout: *mut AccessTimeout<()>, + callback: fn(*mut AccessTimeout<()>, &TimerContext), + user_unblock: fn(uint) -> Option, + user_payload: uint, } -impl AccessTimeout { - pub fn new() -> AccessTimeout { +impl AccessTimeout { + pub fn new(data: T) -> AccessTimeout { AccessTimeout { state: NoTimeout, timer: None, - access: access::Access::new(), + access: access::Access::new(data), } } @@ -68,7 +69,7 @@ impl AccessTimeout { /// On success, Ok(Guard) is returned and access has been granted to the /// stream. If a timeout occurs, then Err is returned with an appropriate /// error. - pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult> { + pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult> { // First, flag that we're attempting to acquire access. This will allow // us to cancel the pending grant if we timeout out while waiting for a // grant. @@ -94,6 +95,13 @@ impl AccessTimeout { }) } + pub fn timed_out(&self) -> bool { + match self.state { + TimedOut => true, + _ => false, + } + } + /// Sets the pending timeout to the value specified. /// /// The home/loop variables are used to construct a timer if one has not @@ -120,9 +128,10 @@ impl AccessTimeout { if self.timer.is_none() { let mut timer = box TimerWatcher::new_home(loop_, home.clone()); let mut cx = box TimerContext { - timeout: self as *mut _, - callback: cb, - payload: data, + timeout: self as *mut _ as *mut AccessTimeout<()>, + callback: real_cb::, + user_unblock: cb, + user_payload: data, }; unsafe { timer.set_data(&mut *cx); @@ -135,8 +144,8 @@ impl AccessTimeout { unsafe { let cx = uvll::get_data_for_uv_handle(timer.handle); let cx = cx as *mut TimerContext; - (*cx).callback = cb; - (*cx).payload = data; + (*cx).user_unblock = cb; + (*cx).user_payload = data; } timer.stop(); timer.start(timer_cb, ms, 0); @@ -146,7 +155,12 @@ impl AccessTimeout { let cx: &TimerContext = unsafe { &*(uvll::get_data_for_uv_handle(timer) as *const TimerContext) }; - let me = unsafe { &mut *cx.timeout }; + (cx.callback)(cx.timeout, cx); + } + + fn real_cb(timeout: *mut AccessTimeout<()>, cx: &TimerContext) { + let timeout = timeout as *mut AccessTimeout; + let me = unsafe { &mut *timeout }; match mem::replace(&mut me.state, TimedOut) { TimedOut | NoTimeout => unreachable!(), @@ -158,7 +172,7 @@ impl AccessTimeout { } } TimeoutPending(RequestPending) => { - match (cx.callback)(cx.payload) { + match (cx.user_unblock)(cx.user_payload) { Some(task) => task.reawaken(), None => unreachable!(), } @@ -168,8 +182,8 @@ impl AccessTimeout { } } -impl Clone for AccessTimeout { - fn clone(&self) -> AccessTimeout { +impl Clone for AccessTimeout { + fn clone(&self) -> AccessTimeout { AccessTimeout { access: self.access.clone(), state: NoTimeout, @@ -179,7 +193,7 @@ impl Clone for AccessTimeout { } #[unsafe_destructor] -impl<'a> Drop for Guard<'a> { +impl<'a, T> Drop for Guard<'a, T> { fn drop(&mut self) { match *self.state { TimeoutPending(NoWaiter) | TimeoutPending(AccessPending) => @@ -193,7 +207,8 @@ impl<'a> Drop for Guard<'a> { } } -impl Drop for AccessTimeout { +#[unsafe_destructor] +impl Drop for AccessTimeout { fn drop(&mut self) { match self.timer { Some(ref timer) => unsafe { @@ -215,12 +230,6 @@ pub struct ConnectCtx { pub timer: Option>, } -pub struct AcceptTimeout { - timer: Option, - timeout_tx: Option>, - timeout_rx: Option>, -} - impl ConnectCtx { pub fn connect( mut self, obj: T, timeout: Option, io: &mut UvIoFactory, @@ -306,88 +315,97 @@ impl ConnectCtx { } } -impl AcceptTimeout { - pub fn new() -> AcceptTimeout { - AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None } - } +pub struct AcceptTimeout { + access: AccessTimeout>, +} - pub fn accept(&mut self, c: &Receiver>) -> IoResult { - match self.timeout_rx { - None => c.recv(), - Some(ref rx) => { - use std::comm::Select; - - // Poll the incoming channel first (don't rely on the order of - // select just yet). If someone's pending then we should return - // them immediately. - match c.try_recv() { - Ok(data) => return data, - Err(..) => {} - } +struct AcceptorState { + blocked_acceptor: Option, + pending: Vec>, +} - // Use select to figure out which channel gets ready first. We - // do some custom handling of select to ensure that we never - // actually drain the timeout channel (we'll keep seeing the - // timeout message in the future). - let s = Select::new(); - let mut timeout = s.handle(rx); - let mut data = s.handle(c); - unsafe { - timeout.add(); - data.add(); - } - if s.wait() == timeout.id() { - Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) - } else { - c.recv() - } - } +impl AcceptTimeout { + pub fn new() -> AcceptTimeout { + AcceptTimeout { + access: AccessTimeout::new(AcceptorState { + blocked_acceptor: None, + pending: Vec::new(), + }) } } - pub fn clear(&mut self) { - match self.timeout_rx { - Some(ref t) => { let _ = t.try_recv(); } - None => {} + pub fn accept(&mut self, + missile: HomingMissile, + loop_: &Loop) -> IoResult { + // If we've timed out but we're not closed yet, poll the state of the + // queue to see if we can peel off a connection. + if self.access.timed_out() && !self.access.access.is_closed(&missile) { + let tmp = self.access.access.get_mut(&missile); + return match tmp.pending.remove(0) { + Some(msg) => msg, + None => Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) + } } - match self.timer { - Some(ref mut t) => t.stop(), + + // Now that we're not polling, attempt to gain access and then peel off + // a connection. If we have no pending connections, then we need to go + // to sleep and wait for one. + // + // Note that if we're woken up for a pending connection then we're + // guaranteed that the check above will not steal our connection due to + // the single-threaded nature of the event loop. + let mut guard = try!(self.access.grant(missile)); + if guard.access.is_closed() { + return Err(uv_error_to_io_error(UvError(uvll::EOF))) + } + + match guard.access.pending.remove(0) { + Some(msg) => return msg, None => {} } + + wait_until_woken_after(&mut guard.access.blocked_acceptor, loop_, || {}); + + match guard.access.pending.remove(0) { + _ if guard.access.is_closed() => { + Err(uv_error_to_io_error(UvError(uvll::EOF))) + } + Some(msg) => msg, + None => Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) + } } - pub fn set_timeout + HomingIO>( - &mut self, ms: u64, t: &mut T - ) { - // If we have a timeout, lazily initialize the timer which will be used - // to fire when the timeout runs out. - if self.timer.is_none() { - let loop_ = Loop::wrap(unsafe { - uvll::get_loop_for_uv_handle(t.uv_handle()) - }); - let mut timer = TimerWatcher::new_home(&loop_, t.home().clone()); + pub unsafe fn push(&mut self, t: IoResult) { + let state = self.access.access.unsafe_get(); + (*state).pending.push(t); + let _ = (*state).blocked_acceptor.take().map(|t| t.reawaken()); + } + + pub fn set_timeout(&mut self, + ms: Option, + loop_: &Loop, + home: &HomeHandle) { + self.access.set_timeout(ms, home, loop_, cancel_accept::, + self as *mut _ as uint); + + fn cancel_accept(me: uint) -> Option { unsafe { - timer.set_data(self as *mut _); + let me: &mut AcceptTimeout = mem::transmute(me); + (*me.access.access.unsafe_get()).blocked_acceptor.take() } - self.timer = Some(timer); } + } - // Once we've got a timer, stop any previous timeout, reset it for the - // current one, and install some new channels to send/receive data on - let timer = self.timer.get_mut_ref(); - timer.stop(); - timer.start(timer_cb, ms, 0); - let (tx, rx) = channel(); - self.timeout_tx = Some(tx); - self.timeout_rx = Some(rx); + pub fn close(&mut self, m: HomingMissile) { + self.access.access.close(&m); + let task = self.access.access.get_mut(&m).blocked_acceptor.take(); + drop(m); + let _ = task.map(|t| t.reawaken()); + } +} - extern fn timer_cb(timer: *mut uvll::uv_timer_t) { - let acceptor: &mut AcceptTimeout = unsafe { - &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout) - }; - // This send can never fail because if this timer is active then the - // receiving channel is guaranteed to be alive - acceptor.timeout_tx.get_ref().send(()); - } +impl Clone for AcceptTimeout { + fn clone(&self) -> AcceptTimeout { + AcceptTimeout { access: self.access.clone() } } } diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index 7055b9d7a4738..a6fdceaa3739f 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -442,6 +442,53 @@ impl TcpAcceptor { #[experimental = "the type of the argument and name of this function are \ subject to change"] pub fn set_timeout(&mut self, ms: Option) { self.obj.set_timeout(ms); } + + /// Closes the accepting capabilities of this acceptor. + /// + /// This function is similar to `TcpStream`'s `close_{read,write}` methods + /// in that it will affect *all* cloned handles of this acceptor's original + /// handle. + /// + /// Once this function succeeds, all future calls to `accept` will return + /// immediately with an error, preventing all future calls to accept. The + /// underlying socket will not be relinquished back to the OS until all + /// acceptors have been deallocated. + /// + /// This is useful for waking up a thread in an accept loop to indicate that + /// it should exit. + /// + /// # Example + /// + /// ``` + /// # #![allow(experimental)] + /// use std::io::{TcpListener, Listener, Acceptor, EndOfFile}; + /// + /// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap(); + /// let a2 = a.clone(); + /// + /// spawn(proc() { + /// let mut a2 = a2; + /// for socket in a2.incoming() { + /// match socket { + /// Ok(s) => { /* handle s */ } + /// Err(ref e) if e.kind == EndOfFile => break, // closed + /// Err(e) => fail!("unexpected error: {}", e), + /// } + /// } + /// }); + /// + /// # fn wait_for_sigint() {} + /// // Now that our accept loop is running, wait for the program to be + /// // requested to exit. + /// wait_for_sigint(); + /// + /// // Signal our accept loop to exit + /// assert!(a.close_accept().is_ok()); + /// ``` + #[experimental] + pub fn close_accept(&mut self) -> IoResult<()> { + self.obj.close_accept().map_err(IoError::from_rtio_error) + } } impl Acceptor for TcpAcceptor { @@ -453,6 +500,25 @@ impl Acceptor for TcpAcceptor { } } +impl Clone for TcpAcceptor { + /// Creates a new handle to this TCP acceptor, allowing for simultaneous + /// accepts. + /// + /// The underlying TCP acceptor will not be closed until all handles to the + /// acceptor have been deallocated. Incoming connections will be received on + /// at most once acceptor, the same connection will not be accepted twice. + /// + /// The `close_accept` method will shut down *all* acceptors cloned from the + /// same original acceptor, whereas the `set_timeout` method only affects + /// the selector that it is called on. + /// + /// This function is useful for creating a handle to invoke `close_accept` + /// on to wake up any other task blocked in `accept`. + fn clone(&self) -> TcpAcceptor { + TcpAcceptor { obj: self.obj.clone() } + } +} + #[cfg(test)] #[allow(experimental)] mod test { @@ -1411,4 +1477,69 @@ mod test { rxdone.recv(); rxdone.recv(); }) + + iotest!(fn clone_accept_smoke() { + let addr = next_test_ip4(); + let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port); + let mut a = l.listen().unwrap(); + let mut a2 = a.clone(); + + spawn(proc() { + let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port); + }); + spawn(proc() { + let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port); + }); + + assert!(a.accept().is_ok()); + assert!(a2.accept().is_ok()); + }) + + iotest!(fn clone_accept_concurrent() { + let addr = next_test_ip4(); + let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port); + let a = l.listen().unwrap(); + let a2 = a.clone(); + + let (tx, rx) = channel(); + let tx2 = tx.clone(); + + spawn(proc() { let mut a = a; tx.send(a.accept()) }); + spawn(proc() { let mut a = a2; tx2.send(a.accept()) }); + + spawn(proc() { + let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port); + }); + spawn(proc() { + let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port); + }); + + assert!(rx.recv().is_ok()); + assert!(rx.recv().is_ok()); + }) + + iotest!(fn close_accept_smoke() { + let addr = next_test_ip4(); + let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port); + let mut a = l.listen().unwrap(); + + a.close_accept().unwrap(); + assert_eq!(a.accept().err().unwrap().kind, EndOfFile); + }) + + iotest!(fn close_accept_concurrent() { + let addr = next_test_ip4(); + let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port); + let a = l.listen().unwrap(); + let mut a2 = a.clone(); + + let (tx, rx) = channel(); + spawn(proc() { + let mut a = a; + tx.send(a.accept()); + }); + a2.close_accept().unwrap(); + + assert_eq!(rx.recv().err().unwrap().kind, EndOfFile); + }) } diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index eb25107541839..3bd31c6a839ed 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -212,6 +212,15 @@ impl UnixAcceptor { pub fn set_timeout(&mut self, timeout_ms: Option) { self.obj.set_timeout(timeout_ms) } + + /// Closes the accepting capabilities of this acceptor. + /// + /// This function has the same semantics as `TcpAcceptor::close_accept`, and + /// more information can be found in that documentation. + #[experimental] + pub fn close_accept(&mut self) -> IoResult<()> { + self.obj.close_accept().map_err(IoError::from_rtio_error) + } } impl Acceptor for UnixAcceptor { @@ -222,6 +231,25 @@ impl Acceptor for UnixAcceptor { } } +impl Clone for UnixAcceptor { + /// Creates a new handle to this unix acceptor, allowing for simultaneous + /// accepts. + /// + /// The underlying unix acceptor will not be closed until all handles to the + /// acceptor have been deallocated. Incoming connections will be received on + /// at most once acceptor, the same connection will not be accepted twice. + /// + /// The `close_accept` method will shut down *all* acceptors cloned from the + /// same original acceptor, whereas the `set_timeout` method only affects + /// the selector that it is called on. + /// + /// This function is useful for creating a handle to invoke `close_accept` + /// on to wake up any other task blocked in `accept`. + fn clone(&self) -> UnixAcceptor { + UnixAcceptor { obj: self.obj.clone() } + } +} + #[cfg(test)] #[allow(experimental)] mod tests { @@ -702,4 +730,73 @@ mod tests { rx2.recv(); }) + + #[cfg(not(windows))] + iotest!(fn clone_accept_smoke() { + let addr = next_test_unix(); + let l = UnixListener::bind(&addr); + let mut a = l.listen().unwrap(); + let mut a2 = a.clone(); + + let addr2 = addr.clone(); + spawn(proc() { + let _ = UnixStream::connect(&addr2); + }); + spawn(proc() { + let _ = UnixStream::connect(&addr); + }); + + assert!(a.accept().is_ok()); + drop(a); + assert!(a2.accept().is_ok()); + }) + + iotest!(fn clone_accept_concurrent() { + let addr = next_test_unix(); + let l = UnixListener::bind(&addr); + let a = l.listen().unwrap(); + let a2 = a.clone(); + + let (tx, rx) = channel(); + let tx2 = tx.clone(); + + spawn(proc() { let mut a = a; tx.send(a.accept()) }); + spawn(proc() { let mut a = a2; tx2.send(a.accept()) }); + + let addr2 = addr.clone(); + spawn(proc() { + let _ = UnixStream::connect(&addr2); + }); + spawn(proc() { + let _ = UnixStream::connect(&addr); + }); + + assert!(rx.recv().is_ok()); + assert!(rx.recv().is_ok()); + }) + + iotest!(fn close_accept_smoke() { + let addr = next_test_unix(); + let l = UnixListener::bind(&addr); + let mut a = l.listen().unwrap(); + + a.close_accept().unwrap(); + assert_eq!(a.accept().err().unwrap().kind, EndOfFile); + }) + + iotest!(fn close_accept_concurrent() { + let addr = next_test_unix(); + let l = UnixListener::bind(&addr); + let a = l.listen().unwrap(); + let mut a2 = a.clone(); + + let (tx, rx) = channel(); + spawn(proc() { + let mut a = a; + tx.send(a.accept()); + }); + a2.close_accept().unwrap(); + + assert_eq!(rx.recv().err().unwrap().kind, EndOfFile); + }) } diff --git a/src/test/run-pass/tcp-accept-stress.rs b/src/test/run-pass/tcp-accept-stress.rs new file mode 100644 index 0000000000000..b8470ef7b8fac --- /dev/null +++ b/src/test/run-pass/tcp-accept-stress.rs @@ -0,0 +1,94 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#![feature(phase)] + +#[phase(plugin)] +extern crate green; +extern crate native; + +use std::io::{TcpListener, Listener, Acceptor, EndOfFile, TcpStream}; +use std::sync::{atomic, Arc}; +use std::task::TaskBuilder; +use native::NativeTaskBuilder; + +static N: uint = 8; +static M: uint = 100; + +green_start!(main) + +fn main() { + test(); + + let (tx, rx) = channel(); + TaskBuilder::new().native().spawn(proc() { + tx.send(test()); + }); + rx.recv(); +} + +fn test() { + let mut l = TcpListener::bind("127.0.0.1", 0).unwrap(); + let addr = l.socket_name().unwrap(); + let mut a = l.listen().unwrap(); + let cnt = Arc::new(atomic::AtomicUint::new(0)); + + let (tx, rx) = channel(); + for _ in range(0, N) { + let a = a.clone(); + let cnt = cnt.clone(); + let tx = tx.clone(); + spawn(proc() { + let mut a = a; + let mut mycnt = 0u; + loop { + match a.accept() { + Ok(..) => { + mycnt += 1; + if cnt.fetch_add(1, atomic::SeqCst) == N * M - 1 { + break + } + } + Err(ref e) if e.kind == EndOfFile => break, + Err(e) => fail!("{}", e), + } + } + assert!(mycnt > 0); + tx.send(()); + }); + } + + for _ in range(0, N) { + let tx = tx.clone(); + spawn(proc() { + for _ in range(0, M) { + let _s = TcpStream::connect(addr.ip.to_string().as_slice(), + addr.port).unwrap(); + } + tx.send(()); + }); + } + + // wait for senders + assert_eq!(rx.iter().take(N).count(), N); + + // wait for one acceptor to die + let _ = rx.recv(); + + // Notify other receivers should die + a.close_accept().unwrap(); + + // wait for receivers + assert_eq!(rx.iter().take(N - 1).count(), N - 1); + + // Everything should have been accepted. + assert_eq!(cnt.load(atomic::SeqCst), N * M); +} +