Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

std: Add timeouts to unix connect/accept #13723

Merged
merged 2 commits into from
Apr 25, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/liblibc/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ pub use funcs::bsd43::{shutdown};
#[cfg(windows)] pub use consts::os::extra::{PIPE_UNLIMITED_INSTANCES, ERROR_ACCESS_DENIED};
#[cfg(windows)] pub use consts::os::extra::{FILE_WRITE_ATTRIBUTES, FILE_READ_ATTRIBUTES};
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_BUSY, ERROR_IO_PENDING};
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED};
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED, WAIT_OBJECT_0};
#[cfg(windows)] pub use types::os::common::bsd44::{SOCKET};
#[cfg(windows)] pub use types::os::common::posix01::{stat, utimbuf};
#[cfg(windows)] pub use types::os::arch::extra::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES};
Expand Down
2 changes: 2 additions & 0 deletions src/libnative/io/c_win32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,6 @@ extern "system" {
optname: libc::c_int,
optval: *mut libc::c_char,
optlen: *mut libc::c_int) -> libc::c_int;

pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL;
}
6 changes: 4 additions & 2 deletions src/libnative/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub use self::process::Process;
pub mod addrinfo;
pub mod net;
pub mod process;
mod util;

#[cfg(unix)]
#[path = "file_unix.rs"]
Expand Down Expand Up @@ -177,8 +178,9 @@ impl rtio::IoFactory for IoFactory {
fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener:Send> {
pipe::UnixListener::bind(path).map(|s| ~s as ~RtioUnixListener:Send)
}
fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send> {
pipe::UnixStream::connect(path).map(|s| ~s as ~RtioPipe:Send)
fn unix_connect(&mut self, path: &CString,
timeout: Option<u64>) -> IoResult<~RtioPipe:Send> {
pipe::UnixStream::connect(path, timeout).map(|s| ~s as ~RtioPipe:Send)
}
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
hint: Option<ai::Hint>) -> IoResult<~[ai::Info]> {
Expand Down
128 changes: 6 additions & 122 deletions src/libnative/io/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@ use std::cast;
use std::io::net::ip;
use std::io;
use std::mem;
use std::os;
use std::ptr;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;

use super::{IoResult, retry, keep_going};
use super::c;
use super::util;

////////////////////////////////////////////////////////////////////////////////
// sockaddr and misc bindings
Expand Down Expand Up @@ -118,8 +117,8 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
}
}

fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
val: libc::c_int) -> IoResult<T> {
pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
val: libc::c_int) -> IoResult<T> {
unsafe {
let mut slot: T = mem::init();
let mut len = mem::size_of::<T>() as libc::socklen_t;
Expand All @@ -145,21 +144,6 @@ fn last_error() -> io::IoError {
super::last_error()
}

fn ms_to_timeval(ms: u64) -> libc::timeval {
libc::timeval {
tv_sec: (ms / 1000) as libc::time_t,
tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
}
}

fn timeout(desc: &'static str) -> io::IoError {
io::IoError {
kind: io::TimedOut,
desc: desc,
detail: None,
}
}

#[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
#[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }

Expand Down Expand Up @@ -270,7 +254,7 @@ impl TcpStream {
let addrp = &addr as *_ as *libc::sockaddr;
match timeout {
Some(timeout) => {
try!(TcpStream::connect_timeout(fd, addrp, len, timeout));
try!(util::connect_timeout(fd, addrp, len, timeout));
Ok(ret)
},
None => {
Expand All @@ -282,84 +266,6 @@ impl TcpStream {
}
}

// See http://developerweb.net/viewtopic.php?id=3196 for where this is
// derived from.
fn connect_timeout(fd: sock_t,
addrp: *libc::sockaddr,
len: libc::socklen_t,
timeout_ms: u64) -> IoResult<()> {
#[cfg(unix)] use INPROGRESS = libc::EINPROGRESS;
#[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
#[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK;
#[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK;

// Make sure the call to connect() doesn't block
try!(set_nonblocking(fd, true));

let ret = match unsafe { libc::connect(fd, addrp, len) } {
// If the connection is in progress, then we need to wait for it to
// finish (with a timeout). The current strategy for doing this is
// to use select() with a timeout.
-1 if os::errno() as int == INPROGRESS as int ||
os::errno() as int == WOULDBLOCK as int => {
let mut set: c::fd_set = unsafe { mem::init() };
c::fd_set(&mut set, fd);
match await(fd, &mut set, timeout_ms) {
0 => Err(timeout("connection timed out")),
-1 => Err(last_error()),
_ => {
let err: libc::c_int = try!(
getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
if err == 0 {
Ok(())
} else {
Err(io::IoError::from_errno(err as uint, true))
}
}
}
}

-1 => Err(last_error()),
_ => Ok(()),
};

// be sure to turn blocking I/O back on
try!(set_nonblocking(fd, false));
return ret;

#[cfg(unix)]
fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
let set = nb as libc::c_int;
super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
}
#[cfg(windows)]
fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
let mut set = nb as libc::c_ulong;
if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
Err(last_error())
} else {
Ok(())
}
}

#[cfg(unix)]
fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
let start = ::io::timer::now();
retry(|| unsafe {
// Recalculate the timeout each iteration (it is generally
// undefined what the value of the 'tv' is after select
// returns EINTR).
let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv)
})
}
#[cfg(windows)]
fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
let tv = ms_to_timeval(timeout);
unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
}
}

pub fn fd(&self) -> sock_t {
// This unsafety is fine because it's just a read-only arc
unsafe { (*self.inner.get()).fd }
Expand Down Expand Up @@ -533,7 +439,7 @@ impl TcpAcceptor {

pub fn native_accept(&mut self) -> IoResult<TcpStream> {
if self.deadline != 0 {
try!(self.accept_deadline());
try!(util::accept_deadline(self.fd(), self.deadline));
}
unsafe {
let mut storage: libc::sockaddr_storage = mem::init();
Expand All @@ -550,25 +456,6 @@ impl TcpAcceptor {
}
}
}

fn accept_deadline(&mut self) -> IoResult<()> {
let mut set: c::fd_set = unsafe { mem::init() };
c::fd_set(&mut set, self.fd());

match retry(|| {
// If we're past the deadline, then pass a 0 timeout to select() so
// we can poll the status of the socket.
let now = ::io::timer::now();
let ms = if self.deadline > now {0} else {self.deadline - now};
let tv = ms_to_timeval(ms);
let n = if cfg!(windows) {1} else {self.fd() as libc::c_int + 1};
unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
}) {
-1 => Err(last_error()),
0 => Err(timeout("accept timed out")),
_ => return Ok(()),
}
}
}

impl rtio::RtioSocket for TcpAcceptor {
Expand All @@ -585,10 +472,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = match timeout {
None => 0,
Some(t) => ::io::timer::now() + t,
};
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
}

Expand Down
Loading