Skip to content

Commit

Permalink
Stability improvements (#4204)
Browse files Browse the repository at this point in the history
* Fixed an issue where the socket address family was being corrupted due
to padding fields being incorrect used.

* Added a comment

* Faster compiles for debug by using release version of cranelift

* Added additional fixes for some missing parts of sockets

* Fixed a blocking issue inside an async and added sha256 caching of webc files

* Faster checksums for debug

* Removed error messages when a WASI instance shuts down and doing cleanups in parallel

* Reduced the logging level on WasiEnv cleanup code

* Fixed linter, wasi-web build, Windows build, the tests failling because of the lack of tockio multi threaded runtime and test-js

---------

Co-authored-by: ptitSeb <[email protected]>
  • Loading branch information
john-sharratt and ptitSeb authored Sep 6, 2023
1 parent f4eaecd commit 94f27f8
Show file tree
Hide file tree
Showing 60 changed files with 428 additions and 97 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ opt-level = 3
opt-level = 3
[profile.dev.package.weezl]
opt-level = 3
[profile.dev.package.sha256]
opt-level = 3
[profile.dev.package.digest]
opt-level = 3

[[bench]]
name = "static_and_dynamic_functions"
Expand Down
30 changes: 16 additions & 14 deletions lib/virtual-io/src/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,25 @@ impl Selector {
return;
}

let interest = if event.is_readable() {
InterestType::Readable
} else if event.is_writable() {
InterestType::Writable
} else if event.is_read_closed() || event.is_write_closed() {
InterestType::Closed
} else if event.is_error() {
InterestType::Error
} else {
continue;
};
tracing::trace!(token = ?token, interest = ?interest, "host epoll");

// Otherwise this is a waker we need to wake
let s = event.token().0 as *mut HandlerWrapper;
let mut handler = ManuallyDrop::new(unsafe { Box::from_raw(s) });
handler.0.interest(interest);
if event.is_readable() {
tracing::trace!(token = ?token, interest = ?InterestType::Readable, "host epoll");
handler.0.interest(InterestType::Readable);
}
if event.is_writable() {
tracing::trace!(token = ?token, interest = ?InterestType::Writable, "host epoll");
handler.0.interest(InterestType::Writable);
}
if event.is_read_closed() || event.is_write_closed() {
tracing::trace!(token = ?token, interest = ?InterestType::Closed, "host epoll");
handler.0.interest(InterestType::Closed);
}
if event.is_error() {
tracing::trace!(token = ?token, interest = ?InterestType::Error, "host epoll");
handler.0.interest(InterestType::Error);
}
}
}
}
Expand Down
30 changes: 30 additions & 0 deletions lib/virtual-net/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,36 @@ impl VirtualTcpSocket for RemoteSocket {
}
}

fn set_keepalive(&mut self, keep_alive: bool) -> Result<()> {
self.io_socket_fire_and_forget(RequestType::SetKeepAlive(keep_alive))
}

fn keepalive(&self) -> Result<bool> {
match InlineWaker::block_on(self.io_socket(RequestType::GetKeepAlive)) {
ResponseType::Err(err) => Err(err),
ResponseType::Flag(val) => Ok(val),
res => {
tracing::debug!("invalid response to get nodelay request - {res:?}");
Err(NetworkError::IOError)
}
}
}

fn set_dontroute(&mut self, dont_route: bool) -> Result<()> {
self.io_socket_fire_and_forget(RequestType::SetDontRoute(dont_route))
}

fn dontroute(&self) -> Result<bool> {
match InlineWaker::block_on(self.io_socket(RequestType::GetDontRoute)) {
ResponseType::Err(err) => Err(err),
ResponseType::Flag(val) => Ok(val),
res => {
tracing::debug!("invalid response to get nodelay request - {res:?}");
Err(NetworkError::IOError)
}
}
}

fn addr_peer(&self) -> Result<SocketAddr> {
match InlineWaker::block_on(self.io_socket(RequestType::GetAddrPeer)) {
ResponseType::Err(err) => Err(err),
Expand Down
79 changes: 74 additions & 5 deletions lib/virtual-net/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use derivative::Derivative;
use std::io::{Read, Write};
use std::mem::MaybeUninit;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, Shutdown, SocketAddr};
#[cfg(not(target_os = "windows"))]
use std::os::fd::AsRawFd;
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Handle;
Expand Down Expand Up @@ -62,6 +64,8 @@ impl VirtualNetworking for LocalNetworking {
stream: mio::net::TcpListener::from_std(sock),
selector: self.selector.clone(),
handler_guard: None,
no_delay: None,
keep_alive: None,
})
})
.map_err(io_err_into_net_error)?;
Expand Down Expand Up @@ -94,11 +98,8 @@ impl VirtualNetworking for LocalNetworking {
if let Ok(p) = stream.peer_addr() {
peer = p;
}
Ok(Box::new(LocalTcpStream::new(
self.selector.clone(),
stream,
peer,
)))
let socket = Box::new(LocalTcpStream::new(self.selector.clone(), stream, peer));
Ok(socket)
}

async fn resolve(
Expand Down Expand Up @@ -127,6 +128,8 @@ pub struct LocalTcpListener {
stream: mio::net::TcpListener,
selector: Arc<Selector>,
handler_guard: Option<InterestGuard>,
no_delay: Option<bool>,
keep_alive: Option<bool>,
}

impl VirtualTcpListener for LocalTcpListener {
Expand All @@ -138,6 +141,12 @@ impl VirtualTcpListener for LocalTcpListener {
.ok();
let mut socket = LocalTcpStream::new(self.selector.clone(), stream, addr);
socket.set_first_handler_writeable();
if let Some(no_delay) = self.no_delay {
socket.set_nodelay(no_delay).ok();
}
if let Some(keep_alive) = self.keep_alive {
socket.set_keepalive(keep_alive).ok();
}
Ok((Box::new(socket), addr))
}
Err(NetworkError::WouldBlock) => Err(NetworkError::WouldBlock),
Expand Down Expand Up @@ -242,6 +251,66 @@ impl VirtualTcpSocket for LocalTcpStream {
self.stream.nodelay().map_err(io_err_into_net_error)
}

fn set_keepalive(&mut self, keepalive: bool) -> Result<()> {
socket2::SockRef::from(&self.stream)
.set_keepalive(true)
.map_err(io_err_into_net_error)?;
Ok(())
}

fn keepalive(&self) -> Result<bool> {
let ret = socket2::SockRef::from(&self.stream)
.keepalive()
.map_err(io_err_into_net_error)?;
Ok(ret)
}

#[cfg(not(target_os = "windows"))]
fn set_dontroute(&mut self, val: bool) -> Result<()> {
let val = val as libc::c_int;
let payload = &val as *const libc::c_int as *const libc::c_void;
let err = unsafe {
libc::setsockopt(
self.stream.as_raw_fd(),
libc::SOL_SOCKET,
libc::SO_DONTROUTE,
payload,
std::mem::size_of::<libc::c_int>() as libc::socklen_t,
)
};
if err == -1 {
return Err(io_err_into_net_error(std::io::Error::last_os_error()));
}
Ok(())
}
#[cfg(target_os = "windows")]
fn set_dontroute(&mut self, val: bool) -> Result<()> {
Err(NetworkError::Unsupported)
}

#[cfg(not(target_os = "windows"))]
fn dontroute(&self) -> Result<bool> {
let mut payload: MaybeUninit<libc::c_int> = MaybeUninit::uninit();
let mut len = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
let err = unsafe {
libc::getsockopt(
self.stream.as_raw_fd(),
libc::SOL_SOCKET,
libc::SO_DONTROUTE,
payload.as_mut_ptr().cast(),
&mut len,
)
};
if err == -1 {
return Err(io_err_into_net_error(std::io::Error::last_os_error()));
}
Ok(unsafe { payload.assume_init() != 0 })
}
#[cfg(target_os = "windows")]
fn dontroute(&self) -> Result<bool> {
Err(NetworkError::Unsupported)
}

fn addr_peer(&self) -> Result<SocketAddr> {
Ok(self.addr)
}
Expand Down
18 changes: 18 additions & 0 deletions lib/virtual-net/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,24 @@ pub trait VirtualTcpSocket: VirtualConnectedSocket + fmt::Debug + Send + Sync +
/// latency but increases encapsulation overhead.
fn nodelay(&self) -> Result<bool>;

/// When KEEP_ALIVE is set the connection will periodically send
/// an empty data packet to the server to make sure the connection
/// stays alive.
fn set_keepalive(&mut self, keepalive: bool) -> Result<()>;

/// Indicates if the KEEP_ALIVE flag is set which means that the
/// socket will periodically send an empty data packet to keep
/// the connection alive.
fn keepalive(&self) -> Result<bool>;

/// When DONT_ROUTE is set the packet will be sent directly
/// to the interface without passing through the routing logic.
fn set_dontroute(&mut self, keepalive: bool) -> Result<()>;

/// Indicates if the packet will pass straight through to
/// the interface bypassing the routing logic.
fn dontroute(&self) -> Result<bool>;

/// Returns the address (IP and Port) of the peer socket that this
/// is conencted to
fn addr_peer(&self) -> Result<SocketAddr>;
Expand Down
14 changes: 14 additions & 0 deletions lib/virtual-net/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,20 @@ pub enum RequestType {
/// is immediately sent to the peer without waiting. This reduces
/// latency but increases encapsulation overhead.
GetNoDelay,
/// When KEEP_ALIVE is set the connection will periodically send
/// an empty data packet to the server to make sure the connection
/// stays alive.
SetKeepAlive(bool),
/// Indicates if the KEEP_ALIVE flag is set which means that the
/// socket will periodically send an empty data packet to keep
/// the connection alive.
GetKeepAlive,
/// When DONT_ROUTE is set the packet will be sent directly
/// to the interface without passing through the routing logic.
SetDontRoute(bool),
/// Indicates if the packet will pass straight through to
/// the interface bypassing the routing logic.
GetDontRoute,
/// Shuts down either the READER or WRITER sides of the socket
/// connection.
Shutdown(Shutdown),
Expand Down
40 changes: 40 additions & 0 deletions lib/virtual-net/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,46 @@ impl RemoteNetworkingServerDriver {
socket_id,
req_id,
),
RequestType::SetKeepAlive(val) => self.process_inner_noop(
move |socket| match socket {
RemoteAdapterSocket::TcpSocket(s) => s.set_keepalive(val),
_ => Err(NetworkError::Unsupported),
},
socket_id,
req_id,
),
RequestType::GetKeepAlive => self.process_inner(
move |socket| match socket {
RemoteAdapterSocket::TcpSocket(s) => s.keepalive(),
_ => Err(NetworkError::Unsupported),
},
|ret| match ret {
Ok(flag) => ResponseType::Flag(flag),
Err(err) => ResponseType::Err(err),
},
socket_id,
req_id,
),
RequestType::SetDontRoute(val) => self.process_inner_noop(
move |socket| match socket {
RemoteAdapterSocket::TcpSocket(s) => s.set_dontroute(val),
_ => Err(NetworkError::Unsupported),
},
socket_id,
req_id,
),
RequestType::GetDontRoute => self.process_inner(
move |socket| match socket {
RemoteAdapterSocket::TcpSocket(s) => s.dontroute(),
_ => Err(NetworkError::Unsupported),
},
|ret| match ret {
Ok(flag) => ResponseType::Flag(flag),
Err(err) => ResponseType::Err(err),
},
socket_id,
req_id,
),
RequestType::Shutdown(shutdown) => self.process_inner_noop(
move |socket| match socket {
RemoteAdapterSocket::TcpSocket(s) => s.shutdown(match shutdown {
Expand Down
3 changes: 3 additions & 0 deletions lib/wasi-types/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ pub mod net {
#[repr(C)]
pub struct __wasi_addr_t {
pub tag: Addressfamily,
// C will add a padding byte here which must be set to zero otherwise the tag will corrupt
pub _padding: u8,
pub u: __wasi_addr_u,
}
Expand All @@ -247,6 +248,7 @@ pub mod net {
#[repr(C)]
pub struct __wasi_addr_port_t {
pub tag: Addressfamily,
// C will add a padding byte here which must be set to zero otherwise the tag will corrupt
pub _padding: u8,
pub u: __wasi_addr_port_u,
}
Expand All @@ -261,6 +263,7 @@ pub mod net {
#[repr(C)]
pub struct __wasi_cidr_t {
pub tag: Addressfamily,
// C will add a padding byte here which must be set to zero otherwise the tag will corrupt
pub _padding: u8,
pub u: __wasi_cidr_u,
}
Expand Down
25 changes: 14 additions & 11 deletions lib/wasix/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,9 @@ impl WasiFs {
pub async fn close_all(&self) {
// TODO: this should close all uniquely owned files instead of just flushing.

// Make sure the STDOUT and STDERR are explicitely flushed
self.flush(__WASI_STDOUT_FILENO).await.ok();
self.flush(__WASI_STDERR_FILENO).await.ok();
if let Ok(mut map) = self.fd_map.write() {
map.clear();
}

let to_close = {
if let Ok(map) = self.fd_map.read() {
Expand All @@ -474,14 +474,17 @@ impl WasiFs {
}
};

for fd in to_close {
self.flush(fd).await.ok();
self.close_fd(fd).ok();
}

if let Ok(mut map) = self.fd_map.write() {
map.clear();
}
let _ = tokio::join!(
// Make sure the STDOUT and STDERR are explicitely flushed
self.flush(__WASI_STDOUT_FILENO),
self.flush(__WASI_STDERR_FILENO),
async {
for fd in to_close {
self.flush(fd).await.ok();
self.close_fd(fd).ok();
}
}
);
}

/// Will conditionally union the binary file system with this one
Expand Down
Loading

0 comments on commit 94f27f8

Please sign in to comment.