From a7f4460573d8c58562ac998993ec5627c03a93c1 Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Tue, 27 Jun 2023 14:23:09 +1000 Subject: [PATCH] Added fixes for the existing poll_oneoff loop --- lib/cli/src/logging.rs | 2 +- lib/virtual-io/src/interest.rs | 93 ++++++++++++++++++++--- lib/wasix/src/fs/inode_guard.rs | 126 +++++++++++++++++++------------- lib/wasix/src/net/socket.rs | 46 ++++++------ 4 files changed, 184 insertions(+), 83 deletions(-) diff --git a/lib/cli/src/logging.rs b/lib/cli/src/logging.rs index 80d8121cb2f..a97ffeb1502 100644 --- a/lib/cli/src/logging.rs +++ b/lib/cli/src/logging.rs @@ -30,7 +30,7 @@ impl Output { pub fn initialize_logging(&self) { let fmt_layer = fmt::layer() .with_target(true) - .with_span_events(fmt::format::FmtSpan::CLOSE) + .with_span_events(fmt::format::FmtSpan::CLOSE | fmt::format::FmtSpan::ENTER) .with_ansi(self.should_emit_colors()) .with_thread_ids(true) .with_writer(std::io::stderr) diff --git a/lib/virtual-io/src/interest.rs b/lib/virtual-io/src/interest.rs index cb03fd23816..d878d0cac2b 100644 --- a/lib/virtual-io/src/interest.rs +++ b/lib/virtual-io/src/interest.rs @@ -1,6 +1,9 @@ use std::{ - collections::HashSet, - sync::{Arc, Mutex}, + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, task::{Context, RawWaker, RawWakerVTable, Waker}, }; @@ -24,7 +27,7 @@ impl From<&Waker> for Box { waker: Waker, } impl InterestHandler for WakerHandler { - fn interest(&mut self, _: InterestType) { + fn interest(&mut self, _interest: InterestType) { self.waker.wake_by_ref(); } } @@ -94,28 +97,94 @@ const VTABLE: RawWakerVTable = unsafe { ) }; +#[derive(Derivative, Default, Clone)] +#[derivative(Debug)] +pub struct FilteredHandlerSubscriptions { + #[derivative(Debug = "ignore")] + mapping: Arc>>>, +} +impl FilteredHandlerSubscriptions { + pub fn add_interest( + &self, + interest: InterestType, + handler: Box, + ) { + let mut guard = self.mapping.lock().unwrap(); + guard.insert(interest, handler); + } +} + pub struct FilteredHandler { - handler: Box, - types: HashSet, + subs: FilteredHandlerSubscriptions, } impl FilteredHandler { - pub fn new(handler: Box) -> Box { + pub fn new() -> Box { Box::new(Self { - handler, - types: HashSet::default(), + subs: Default::default(), }) } - pub fn add_interest(mut self: Box, interest: InterestType) -> Box { - self.types.insert(interest); + pub fn add_interest( + self: Box, + interest: InterestType, + handler: Box, + ) -> Box { + self.subs.add_interest(interest, handler); self } + pub fn subscriptions(&self) -> &FilteredHandlerSubscriptions { + &self.subs + } } impl InterestHandler for FilteredHandler { fn interest(&mut self, interest: InterestType) { - if self.types.contains(&interest) { - self.handler.interest(interest); + let mut guard = self.subs.mapping.lock().unwrap(); + if let Some(handler) = guard.get_mut(&interest) { + handler.interest(interest); + } + } +} + +#[derive(Debug, Clone)] +pub struct StatefulHandlerValue { + value: Arc, +} + +impl StatefulHandlerValue { + pub fn new() -> Self { + Self { + value: Arc::new(AtomicBool::new(false)), } } + pub fn value(&self) -> bool { + self.value.load(Ordering::SeqCst) + } + pub fn set(&self) { + self.value.store(true, Ordering::SeqCst); + } +} + +pub struct StatefulHandler { + handler: Box, + triggered: StatefulHandlerValue, +} + +impl StatefulHandler { + pub fn new(handler: Box) -> Box { + Box::new(Self { + handler, + triggered: StatefulHandlerValue::new(), + }) + } + pub fn triggered(&self) -> &StatefulHandlerValue { + &self.triggered + } +} + +impl InterestHandler for StatefulHandler { + fn interest(&mut self, interest: InterestType) { + self.triggered.set(); + self.handler.interest(interest) + } } diff --git a/lib/wasix/src/fs/inode_guard.rs b/lib/wasix/src/fs/inode_guard.rs index 7fdae0a9186..1e6362450a6 100644 --- a/lib/wasix/src/fs/inode_guard.rs +++ b/lib/wasix/src/fs/inode_guard.rs @@ -11,7 +11,7 @@ use std::{ use futures::future::BoxFuture; use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite}; use virtual_fs::{FsError, Pipe as VirtualPipe, VirtualFile}; -use virtual_io::{FilteredHandler, InterestType}; +use virtual_io::{InterestType, StatefulHandler, StatefulHandlerValue}; use virtual_net::NetworkError; use wasmer_wasix_types::{ types::Eventtype, @@ -124,6 +124,8 @@ pub struct InodeValFilePollGuardJoin { fd: u32, peb: PollEventSet, subscription: Subscription, + handler_read: Option, + handler_write: Option, } impl InodeValFilePollGuardJoin { @@ -133,6 +135,8 @@ impl InodeValFilePollGuardJoin { fd: guard.fd, peb: guard.peb, subscription: guard.subscription, + handler_read: None, + handler_write: None, } } pub(crate) fn fd(&self) -> u32 { @@ -194,34 +198,44 @@ impl Future for InodeValFilePollGuardJoin { } InodeValFilePollGuardMode::EventNotifications(inner) => inner.poll(waker).map(Ok), InodeValFilePollGuardMode::Socket { ref inner } => { - let mut guard = inner.protected.write().unwrap(); - let res = guard - .set_handler( - FilteredHandler::new(cx.waker().into()) - .add_interest(InterestType::Readable), - ) - .map_err(net_error_into_io_err); - match res { - Err(err) if is_err_closed(&err) => { - tracing::trace!("socket read ready error (fd={}) - {}", fd, err); - if !replace(&mut guard.notifications.closed, true) { - Poll::Ready(Ok(0)) - } else { - Poll::Pending + let inner = { + let i = inner.clone(); + drop(inner); + i + }; + + if self.handler_read.as_ref().iter().any(|t| t.value()) { + Poll::Ready(Ok(1)) + } else { + let handler = StatefulHandler::new(cx.waker().into()); + self.handler_read.replace(handler.triggered().clone()); + + let mut guard = inner.protected.write().unwrap(); + let res = guard + .add_handler(handler, InterestType::Readable) + .map_err(net_error_into_io_err); + match res { + Err(err) if is_err_closed(&err) => { + tracing::trace!("socket read ready error (fd={}) - {}", fd, err); + if !replace(&mut guard.notifications.closed, true) { + Poll::Ready(Ok(0)) + } else { + Poll::Pending + } } - } - Err(err) => { - tracing::debug!("poll socket error - {}", err); - if !replace(&mut guard.notifications.failed, true) { - Poll::Ready(Ok(0)) - } else { + Err(err) => { + tracing::debug!("poll socket error - {}", err); + if !replace(&mut guard.notifications.failed, true) { + Poll::Ready(Ok(0)) + } else { + Poll::Pending + } + } + Ok(()) => { + drop(guard); Poll::Pending } } - Ok(()) => { - drop(guard); - Poll::Pending - } } } InodeValFilePollGuardMode::Pipe { pipe } => { @@ -303,34 +317,48 @@ impl Future for InodeValFilePollGuardJoin { } InodeValFilePollGuardMode::EventNotifications(inner) => inner.poll(waker).map(Ok), InodeValFilePollGuardMode::Socket { ref inner } => { - let mut guard = inner.protected.write().unwrap(); - let res = guard - .set_handler( - FilteredHandler::new(cx.waker().into()) - .add_interest(InterestType::Writable), - ) - .map_err(net_error_into_io_err); - match res { - Err(err) if is_err_closed(&err) => { - tracing::trace!("socket write ready error (fd={}) - err={}", fd, err); - if !replace(&mut guard.notifications.closed, true) { - Poll::Ready(Ok(0)) - } else { - Poll::Pending + let inner = { + let i = inner.clone(); + drop(inner); + i + }; + + if self.handler_write.as_ref().iter().any(|t| t.value()) { + Poll::Ready(Ok(1)) + } else { + let handler = StatefulHandler::new(cx.waker().into()); + self.handler_write.replace(handler.triggered().clone()); + + let mut guard = inner.protected.write().unwrap(); + let res = guard + .add_handler(cx.waker().into(), InterestType::Writable) + .map_err(net_error_into_io_err); + match res { + Err(err) if is_err_closed(&err) => { + tracing::trace!( + "socket write ready error (fd={}) - err={}", + fd, + err + ); + if !replace(&mut guard.notifications.closed, true) { + Poll::Ready(Ok(0)) + } else { + Poll::Pending + } } - } - Err(err) => { - tracing::debug!("poll socket error - {}", err); - if !replace(&mut guard.notifications.failed, true) { - Poll::Ready(Ok(0)) - } else { + Err(err) => { + tracing::debug!("poll socket error - {}", err); + if !replace(&mut guard.notifications.failed, true) { + Poll::Ready(Ok(0)) + } else { + Poll::Pending + } + } + Ok(()) => { + drop(guard); Poll::Pending } } - Ok(()) => { - drop(guard); - Poll::Pending - } } } InodeValFilePollGuardMode::Pipe { pipe } => { diff --git a/lib/wasix/src/net/socket.rs b/lib/wasix/src/net/socket.rs index 7e98809a41f..0277770ef4a 100644 --- a/lib/wasix/src/net/socket.rs +++ b/lib/wasix/src/net/socket.rs @@ -10,7 +10,7 @@ use std::{ #[cfg(feature = "enable-serde")] use serde_derive::{Deserialize, Serialize}; -use virtual_io::{FilteredHandler, InterestHandler, InterestType}; +use virtual_io::{FilteredHandler, FilteredHandlerSubscriptions, InterestHandler, InterestType}; use virtual_net::{ NetworkError, VirtualIcmpSocket, VirtualNetworking, VirtualRawSocket, VirtualTcpListener, VirtualTcpSocket, VirtualUdpSocket, @@ -154,6 +154,7 @@ pub enum TimeType { pub(crate) struct InodeSocketProtected { pub kind: InodeSocketKind, pub notifications: InodeSocketNotifications, + pub aggregate_handler: Option, } #[derive(Debug, Default)] @@ -182,6 +183,7 @@ impl InodeSocket { protected: RwLock::new(InodeSocketProtected { kind, notifications: Default::default(), + aggregate_handler: None, }), }), } @@ -352,10 +354,7 @@ impl InodeSocket { Poll::Ready(Err(Errno::Again)) } Err(NetworkError::WouldBlock) if self.handler_registered == false => { - let res = socket.set_handler( - FilteredHandler::new(cx.waker().into()) - .add_interest(InterestType::Readable), - ); + let res = socket.set_handler(cx.waker().into()); if let Err(err) = res { return Poll::Ready(Err(net_error_into_wasi_err(err))); } @@ -932,10 +931,7 @@ impl InodeSocket { Poll::Ready(Err(Errno::Again)) } Err(NetworkError::WouldBlock) if self.handler_registered == false => { - let res = inner.set_handler( - FilteredHandler::new(cx.waker().into()) - .add_interest(InterestType::Writable), - ); + let res = inner.set_handler(cx.waker().into()); if let Err(err) = res { return Poll::Ready(Err(net_error_into_wasi_err(err))); } @@ -1013,10 +1009,7 @@ impl InodeSocket { Poll::Ready(Err(Errno::Again)) } Err(NetworkError::WouldBlock) if self.handler_registered == false => { - let res = inner.set_handler( - FilteredHandler::new(cx.waker().into()) - .add_interest(InterestType::Writable), - ); + let res = inner.set_handler(cx.waker().into()); if let Err(err) = res { return Poll::Ready(Err(net_error_into_wasi_err(err))); } @@ -1102,10 +1095,7 @@ impl InodeSocket { Poll::Ready(Err(Errno::Again)) } Err(NetworkError::WouldBlock) if self.handler_registered == false => { - let res = inner.set_handler( - FilteredHandler::new(cx.waker().into()) - .add_interest(InterestType::Readable), - ); + let res = inner.set_handler(cx.waker().into()); if let Err(err) = res { return Poll::Ready(Err(net_error_into_wasi_err(err))); } @@ -1182,10 +1172,7 @@ impl InodeSocket { Poll::Ready(Err(Errno::Again)) } Err(NetworkError::WouldBlock) if self.handler_registered == false => { - let res = inner.set_handler( - FilteredHandler::new(cx.waker().into()) - .add_interest(InterestType::Readable), - ); + let res = inner.set_handler(cx.waker().into()); if let Err(err) = res { return Poll::Ready(Err(net_error_into_wasi_err(err))); } @@ -1267,6 +1254,23 @@ impl InodeSocketProtected { InodeSocketKind::PreSocket { .. } => Err(virtual_net::NetworkError::NotConnected), } } + + pub fn add_handler( + &mut self, + handler: Box, + interest: InterestType, + ) -> virtual_net::Result<()> { + if self.aggregate_handler.is_none() { + let upper = FilteredHandler::new(); + let subs = upper.subscriptions().clone(); + + self.set_handler(upper)?; + self.aggregate_handler.replace(subs); + } + let upper = self.aggregate_handler.as_mut().unwrap(); + upper.add_interest(interest, handler); + Ok(()) + } } #[derive(Default)]