Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stability improvements #4204

Merged
merged 17 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ opt-level = 3
opt-level = 3
[profile.dev.package.weezl]
opt-level = 3
[profile.dev.package.sha256]
opt-level = 3
[profile.dev.package.digest]
opt-level = 3

[[bench]]
name = "static_and_dynamic_functions"
Expand Down
30 changes: 16 additions & 14 deletions lib/virtual-io/src/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,25 @@ impl Selector {
return;
}

let interest = if event.is_readable() {
InterestType::Readable
} else if event.is_writable() {
InterestType::Writable
} else if event.is_read_closed() || event.is_write_closed() {
InterestType::Closed
} else if event.is_error() {
InterestType::Error
} else {
continue;
};
tracing::trace!(token = ?token, interest = ?interest, "host epoll");

// Otherwise this is a waker we need to wake
let s = event.token().0 as *mut HandlerWrapper;
let mut handler = ManuallyDrop::new(unsafe { Box::from_raw(s) });
handler.0.interest(interest);
if event.is_readable() {
tracing::trace!(token = ?token, interest = ?InterestType::Readable, "host epoll");
handler.0.interest(InterestType::Readable);
}
if event.is_writable() {
tracing::trace!(token = ?token, interest = ?InterestType::Writable, "host epoll");
handler.0.interest(InterestType::Writable);
}
if event.is_read_closed() || event.is_write_closed() {
tracing::trace!(token = ?token, interest = ?InterestType::Closed, "host epoll");
handler.0.interest(InterestType::Closed);
}
if event.is_error() {
tracing::trace!(token = ?token, interest = ?InterestType::Error, "host epoll");
handler.0.interest(InterestType::Error);
}
}
}
}
Expand Down
30 changes: 30 additions & 0 deletions lib/virtual-net/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,36 @@ impl VirtualTcpSocket for RemoteSocket {
}
}

fn set_keepalive(&mut self, keep_alive: bool) -> Result<()> {
self.io_socket_fire_and_forget(RequestType::SetKeepAlive(keep_alive))
}

fn keepalive(&self) -> Result<bool> {
match InlineWaker::block_on(self.io_socket(RequestType::GetKeepAlive)) {
ResponseType::Err(err) => Err(err),
ResponseType::Flag(val) => Ok(val),
res => {
tracing::debug!("invalid response to get nodelay request - {res:?}");
Err(NetworkError::IOError)
}
}
}

fn set_dontroute(&mut self, dont_route: bool) -> Result<()> {
self.io_socket_fire_and_forget(RequestType::SetDontRoute(dont_route))
}

fn dontroute(&self) -> Result<bool> {
match InlineWaker::block_on(self.io_socket(RequestType::GetDontRoute)) {
ResponseType::Err(err) => Err(err),
ResponseType::Flag(val) => Ok(val),
res => {
tracing::debug!("invalid response to get nodelay request - {res:?}");
Err(NetworkError::IOError)
}
}
}

fn addr_peer(&self) -> Result<SocketAddr> {
match InlineWaker::block_on(self.io_socket(RequestType::GetAddrPeer)) {
ResponseType::Err(err) => Err(err),
Expand Down
79 changes: 74 additions & 5 deletions lib/virtual-net/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use derivative::Derivative;
use std::io::{Read, Write};
use std::mem::MaybeUninit;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, Shutdown, SocketAddr};
#[cfg(not(target_os = "windows"))]
use std::os::fd::AsRawFd;
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Handle;
Expand Down Expand Up @@ -62,6 +64,8 @@ impl VirtualNetworking for LocalNetworking {
stream: mio::net::TcpListener::from_std(sock),
selector: self.selector.clone(),
handler_guard: None,
no_delay: None,
keep_alive: None,
})
})
.map_err(io_err_into_net_error)?;
Expand Down Expand Up @@ -94,11 +98,8 @@ impl VirtualNetworking for LocalNetworking {
if let Ok(p) = stream.peer_addr() {
peer = p;
}
Ok(Box::new(LocalTcpStream::new(
self.selector.clone(),
stream,
peer,
)))
let socket = Box::new(LocalTcpStream::new(self.selector.clone(), stream, peer));
Ok(socket)
}

async fn resolve(
Expand Down Expand Up @@ -127,6 +128,8 @@ pub struct LocalTcpListener {
stream: mio::net::TcpListener,
selector: Arc<Selector>,
handler_guard: Option<InterestGuard>,
no_delay: Option<bool>,
keep_alive: Option<bool>,
}

impl VirtualTcpListener for LocalTcpListener {
Expand All @@ -138,6 +141,12 @@ impl VirtualTcpListener for LocalTcpListener {
.ok();
let mut socket = LocalTcpStream::new(self.selector.clone(), stream, addr);
socket.set_first_handler_writeable();
if let Some(no_delay) = self.no_delay {
socket.set_nodelay(no_delay).ok();
}
if let Some(keep_alive) = self.keep_alive {
socket.set_keepalive(keep_alive).ok();
}
Ok((Box::new(socket), addr))
}
Err(NetworkError::WouldBlock) => Err(NetworkError::WouldBlock),
Expand Down Expand Up @@ -242,6 +251,66 @@ impl VirtualTcpSocket for LocalTcpStream {
self.stream.nodelay().map_err(io_err_into_net_error)
}

fn set_keepalive(&mut self, keepalive: bool) -> Result<()> {
socket2::SockRef::from(&self.stream)
.set_keepalive(true)
.map_err(io_err_into_net_error)?;
Ok(())
}

fn keepalive(&self) -> Result<bool> {
let ret = socket2::SockRef::from(&self.stream)
.keepalive()
.map_err(io_err_into_net_error)?;
Ok(ret)
}

#[cfg(not(target_os = "windows"))]
fn set_dontroute(&mut self, val: bool) -> Result<()> {
let val = val as libc::c_int;
let payload = &val as *const libc::c_int as *const libc::c_void;
let err = unsafe {
libc::setsockopt(
self.stream.as_raw_fd(),
libc::SOL_SOCKET,
libc::SO_DONTROUTE,
payload,
std::mem::size_of::<libc::c_int>() as libc::socklen_t,
)
};
if err == -1 {
return Err(io_err_into_net_error(std::io::Error::last_os_error()));
}
Ok(())
}
#[cfg(target_os = "windows")]
fn set_dontroute(&mut self, val: bool) -> Result<()> {
Err(NetworkError::Unsupported)
}

#[cfg(not(target_os = "windows"))]
fn dontroute(&self) -> Result<bool> {
let mut payload: MaybeUninit<libc::c_int> = MaybeUninit::uninit();
let mut len = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
let err = unsafe {
libc::getsockopt(
self.stream.as_raw_fd(),
libc::SOL_SOCKET,
libc::SO_DONTROUTE,
payload.as_mut_ptr().cast(),
&mut len,
)
};
if err == -1 {
return Err(io_err_into_net_error(std::io::Error::last_os_error()));
}
Ok(unsafe { payload.assume_init() != 0 })
}
#[cfg(target_os = "windows")]
fn dontroute(&self) -> Result<bool> {
Err(NetworkError::Unsupported)
}

fn addr_peer(&self) -> Result<SocketAddr> {
Ok(self.addr)
}
Expand Down
18 changes: 18 additions & 0 deletions lib/virtual-net/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,24 @@ pub trait VirtualTcpSocket: VirtualConnectedSocket + fmt::Debug + Send + Sync +
/// latency but increases encapsulation overhead.
fn nodelay(&self) -> Result<bool>;

/// When KEEP_ALIVE is set the connection will periodically send
/// an empty data packet to the server to make sure the connection
/// stays alive.
fn set_keepalive(&mut self, keepalive: bool) -> Result<()>;

/// Indicates if the KEEP_ALIVE flag is set which means that the
/// socket will periodically send an empty data packet to keep
/// the connection alive.
fn keepalive(&self) -> Result<bool>;

/// When DONT_ROUTE is set the packet will be sent directly
/// to the interface without passing through the routing logic.
fn set_dontroute(&mut self, keepalive: bool) -> Result<()>;

/// Indicates if the packet will pass straight through to
/// the interface bypassing the routing logic.
fn dontroute(&self) -> Result<bool>;

/// Returns the address (IP and Port) of the peer socket that this
/// is conencted to
fn addr_peer(&self) -> Result<SocketAddr>;
Expand Down
14 changes: 14 additions & 0 deletions lib/virtual-net/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,20 @@ pub enum RequestType {
/// is immediately sent to the peer without waiting. This reduces
/// latency but increases encapsulation overhead.
GetNoDelay,
/// When KEEP_ALIVE is set the connection will periodically send
/// an empty data packet to the server to make sure the connection
/// stays alive.
SetKeepAlive(bool),
/// Indicates if the KEEP_ALIVE flag is set which means that the
/// socket will periodically send an empty data packet to keep
/// the connection alive.
GetKeepAlive,
/// When DONT_ROUTE is set the packet will be sent directly
/// to the interface without passing through the routing logic.
SetDontRoute(bool),
/// Indicates if the packet will pass straight through to
/// the interface bypassing the routing logic.
GetDontRoute,
/// Shuts down either the READER or WRITER sides of the socket
/// connection.
Shutdown(Shutdown),
Expand Down
40 changes: 40 additions & 0 deletions lib/virtual-net/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,46 @@ impl RemoteNetworkingServerDriver {
socket_id,
req_id,
),
RequestType::SetKeepAlive(val) => self.process_inner_noop(
move |socket| match socket {
RemoteAdapterSocket::TcpSocket(s) => s.set_keepalive(val),
_ => Err(NetworkError::Unsupported),
},
socket_id,
req_id,
),
RequestType::GetKeepAlive => self.process_inner(
move |socket| match socket {
RemoteAdapterSocket::TcpSocket(s) => s.keepalive(),
_ => Err(NetworkError::Unsupported),
},
|ret| match ret {
Ok(flag) => ResponseType::Flag(flag),
Err(err) => ResponseType::Err(err),
},
socket_id,
req_id,
),
RequestType::SetDontRoute(val) => self.process_inner_noop(
move |socket| match socket {
RemoteAdapterSocket::TcpSocket(s) => s.set_dontroute(val),
_ => Err(NetworkError::Unsupported),
},
socket_id,
req_id,
),
RequestType::GetDontRoute => self.process_inner(
move |socket| match socket {
RemoteAdapterSocket::TcpSocket(s) => s.dontroute(),
_ => Err(NetworkError::Unsupported),
},
|ret| match ret {
Ok(flag) => ResponseType::Flag(flag),
Err(err) => ResponseType::Err(err),
},
socket_id,
req_id,
),
RequestType::Shutdown(shutdown) => self.process_inner_noop(
move |socket| match socket {
RemoteAdapterSocket::TcpSocket(s) => s.shutdown(match shutdown {
Expand Down
3 changes: 3 additions & 0 deletions lib/wasi-types/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ pub mod net {
#[repr(C)]
pub struct __wasi_addr_t {
pub tag: Addressfamily,
// C will add a padding byte here which must be set to zero otherwise the tag will corrupt
pub _padding: u8,
pub u: __wasi_addr_u,
}
Expand All @@ -247,6 +248,7 @@ pub mod net {
#[repr(C)]
pub struct __wasi_addr_port_t {
pub tag: Addressfamily,
// C will add a padding byte here which must be set to zero otherwise the tag will corrupt
pub _padding: u8,
pub u: __wasi_addr_port_u,
}
Expand All @@ -261,6 +263,7 @@ pub mod net {
#[repr(C)]
pub struct __wasi_cidr_t {
pub tag: Addressfamily,
// C will add a padding byte here which must be set to zero otherwise the tag will corrupt
pub _padding: u8,
pub u: __wasi_cidr_u,
}
Expand Down
25 changes: 14 additions & 11 deletions lib/wasix/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,9 @@ impl WasiFs {
pub async fn close_all(&self) {
// TODO: this should close all uniquely owned files instead of just flushing.

// Make sure the STDOUT and STDERR are explicitely flushed
self.flush(__WASI_STDOUT_FILENO).await.ok();
self.flush(__WASI_STDERR_FILENO).await.ok();
if let Ok(mut map) = self.fd_map.write() {
map.clear();
}

let to_close = {
if let Ok(map) = self.fd_map.read() {
Expand All @@ -468,14 +468,17 @@ impl WasiFs {
}
};

for fd in to_close {
self.flush(fd).await.ok();
self.close_fd(fd).ok();
}

if let Ok(mut map) = self.fd_map.write() {
map.clear();
}
let _ = tokio::join!(
// Make sure the STDOUT and STDERR are explicitely flushed
self.flush(__WASI_STDOUT_FILENO),
self.flush(__WASI_STDERR_FILENO),
async {
for fd in to_close {
self.flush(fd).await.ok();
self.close_fd(fd).ok();
}
}
);
}

/// Will conditionally union the binary file system with this one
Expand Down
Loading