Skip to content

Commit

Permalink
Added fixes for the existing poll_oneoff loop
Browse files Browse the repository at this point in the history
  • Loading branch information
john-sharratt committed Jun 27, 2023
1 parent fab5c7b commit a7f4460
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 83 deletions.
2 changes: 1 addition & 1 deletion lib/cli/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl Output {
pub fn initialize_logging(&self) {
let fmt_layer = fmt::layer()
.with_target(true)
.with_span_events(fmt::format::FmtSpan::CLOSE)
.with_span_events(fmt::format::FmtSpan::CLOSE | fmt::format::FmtSpan::ENTER)
.with_ansi(self.should_emit_colors())
.with_thread_ids(true)
.with_writer(std::io::stderr)
Expand Down
93 changes: 81 additions & 12 deletions lib/virtual-io/src/interest.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{
collections::HashSet,
sync::{Arc, Mutex},
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
task::{Context, RawWaker, RawWakerVTable, Waker},
};

Expand All @@ -24,7 +27,7 @@ impl From<&Waker> for Box<dyn InterestHandler + Send + Sync> {
waker: Waker,
}
impl InterestHandler for WakerHandler {
fn interest(&mut self, _: InterestType) {
fn interest(&mut self, _interest: InterestType) {
self.waker.wake_by_ref();
}
}
Expand Down Expand Up @@ -94,28 +97,94 @@ const VTABLE: RawWakerVTable = unsafe {
)
};

#[derive(Derivative, Default, Clone)]
#[derivative(Debug)]
pub struct FilteredHandlerSubscriptions {
#[derivative(Debug = "ignore")]
mapping: Arc<Mutex<HashMap<InterestType, Box<dyn InterestHandler + Send + Sync>>>>,
}
impl FilteredHandlerSubscriptions {
pub fn add_interest(
&self,
interest: InterestType,
handler: Box<dyn InterestHandler + Send + Sync>,
) {
let mut guard = self.mapping.lock().unwrap();
guard.insert(interest, handler);
}
}

pub struct FilteredHandler {
handler: Box<dyn InterestHandler + Send + Sync>,
types: HashSet<InterestType>,
subs: FilteredHandlerSubscriptions,
}

impl FilteredHandler {
pub fn new(handler: Box<dyn InterestHandler + Send + Sync>) -> Box<Self> {
pub fn new() -> Box<Self> {
Box::new(Self {
handler,
types: HashSet::default(),
subs: Default::default(),
})
}
pub fn add_interest(mut self: Box<Self>, interest: InterestType) -> Box<Self> {
self.types.insert(interest);
pub fn add_interest(
self: Box<Self>,
interest: InterestType,
handler: Box<dyn InterestHandler + Send + Sync>,
) -> Box<Self> {
self.subs.add_interest(interest, handler);
self
}
pub fn subscriptions(&self) -> &FilteredHandlerSubscriptions {
&self.subs
}
}

impl InterestHandler for FilteredHandler {
fn interest(&mut self, interest: InterestType) {
if self.types.contains(&interest) {
self.handler.interest(interest);
let mut guard = self.subs.mapping.lock().unwrap();
if let Some(handler) = guard.get_mut(&interest) {
handler.interest(interest);
}
}
}

#[derive(Debug, Clone)]
pub struct StatefulHandlerValue {
value: Arc<AtomicBool>,
}

impl StatefulHandlerValue {
pub fn new() -> Self {
Self {
value: Arc::new(AtomicBool::new(false)),
}
}
pub fn value(&self) -> bool {
self.value.load(Ordering::SeqCst)
}
pub fn set(&self) {
self.value.store(true, Ordering::SeqCst);
}
}

pub struct StatefulHandler {
handler: Box<dyn InterestHandler + Send + Sync>,
triggered: StatefulHandlerValue,
}

impl StatefulHandler {
pub fn new(handler: Box<dyn InterestHandler + Send + Sync>) -> Box<Self> {
Box::new(Self {
handler,
triggered: StatefulHandlerValue::new(),
})
}
pub fn triggered(&self) -> &StatefulHandlerValue {
&self.triggered
}
}

impl InterestHandler for StatefulHandler {
fn interest(&mut self, interest: InterestType) {
self.triggered.set();
self.handler.interest(interest)
}
}
126 changes: 77 additions & 49 deletions lib/wasix/src/fs/inode_guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
use futures::future::BoxFuture;
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite};
use virtual_fs::{FsError, Pipe as VirtualPipe, VirtualFile};
use virtual_io::{FilteredHandler, InterestType};
use virtual_io::{InterestType, StatefulHandler, StatefulHandlerValue};
use virtual_net::NetworkError;
use wasmer_wasix_types::{
types::Eventtype,
Expand Down Expand Up @@ -124,6 +124,8 @@ pub struct InodeValFilePollGuardJoin {
fd: u32,
peb: PollEventSet,
subscription: Subscription,
handler_read: Option<StatefulHandlerValue>,
handler_write: Option<StatefulHandlerValue>,
}

impl InodeValFilePollGuardJoin {
Expand All @@ -133,6 +135,8 @@ impl InodeValFilePollGuardJoin {
fd: guard.fd,
peb: guard.peb,
subscription: guard.subscription,
handler_read: None,
handler_write: None,
}
}
pub(crate) fn fd(&self) -> u32 {
Expand Down Expand Up @@ -194,34 +198,44 @@ impl Future for InodeValFilePollGuardJoin {
}
InodeValFilePollGuardMode::EventNotifications(inner) => inner.poll(waker).map(Ok),
InodeValFilePollGuardMode::Socket { ref inner } => {
let mut guard = inner.protected.write().unwrap();
let res = guard
.set_handler(
FilteredHandler::new(cx.waker().into())
.add_interest(InterestType::Readable),
)
.map_err(net_error_into_io_err);
match res {
Err(err) if is_err_closed(&err) => {
tracing::trace!("socket read ready error (fd={}) - {}", fd, err);
if !replace(&mut guard.notifications.closed, true) {
Poll::Ready(Ok(0))
} else {
Poll::Pending
let inner = {
let i = inner.clone();
drop(inner);
i
};

if self.handler_read.as_ref().iter().any(|t| t.value()) {
Poll::Ready(Ok(1))
} else {
let handler = StatefulHandler::new(cx.waker().into());
self.handler_read.replace(handler.triggered().clone());

let mut guard = inner.protected.write().unwrap();
let res = guard
.add_handler(handler, InterestType::Readable)
.map_err(net_error_into_io_err);
match res {
Err(err) if is_err_closed(&err) => {
tracing::trace!("socket read ready error (fd={}) - {}", fd, err);
if !replace(&mut guard.notifications.closed, true) {
Poll::Ready(Ok(0))
} else {
Poll::Pending
}
}
}
Err(err) => {
tracing::debug!("poll socket error - {}", err);
if !replace(&mut guard.notifications.failed, true) {
Poll::Ready(Ok(0))
} else {
Err(err) => {
tracing::debug!("poll socket error - {}", err);
if !replace(&mut guard.notifications.failed, true) {
Poll::Ready(Ok(0))
} else {
Poll::Pending
}
}
Ok(()) => {
drop(guard);
Poll::Pending
}
}
Ok(()) => {
drop(guard);
Poll::Pending
}
}
}
InodeValFilePollGuardMode::Pipe { pipe } => {
Expand Down Expand Up @@ -303,34 +317,48 @@ impl Future for InodeValFilePollGuardJoin {
}
InodeValFilePollGuardMode::EventNotifications(inner) => inner.poll(waker).map(Ok),
InodeValFilePollGuardMode::Socket { ref inner } => {
let mut guard = inner.protected.write().unwrap();
let res = guard
.set_handler(
FilteredHandler::new(cx.waker().into())
.add_interest(InterestType::Writable),
)
.map_err(net_error_into_io_err);
match res {
Err(err) if is_err_closed(&err) => {
tracing::trace!("socket write ready error (fd={}) - err={}", fd, err);
if !replace(&mut guard.notifications.closed, true) {
Poll::Ready(Ok(0))
} else {
Poll::Pending
let inner = {
let i = inner.clone();
drop(inner);
i
};

if self.handler_write.as_ref().iter().any(|t| t.value()) {
Poll::Ready(Ok(1))
} else {
let handler = StatefulHandler::new(cx.waker().into());
self.handler_write.replace(handler.triggered().clone());

let mut guard = inner.protected.write().unwrap();
let res = guard
.add_handler(cx.waker().into(), InterestType::Writable)
.map_err(net_error_into_io_err);
match res {
Err(err) if is_err_closed(&err) => {
tracing::trace!(
"socket write ready error (fd={}) - err={}",
fd,
err
);
if !replace(&mut guard.notifications.closed, true) {
Poll::Ready(Ok(0))
} else {
Poll::Pending
}
}
}
Err(err) => {
tracing::debug!("poll socket error - {}", err);
if !replace(&mut guard.notifications.failed, true) {
Poll::Ready(Ok(0))
} else {
Err(err) => {
tracing::debug!("poll socket error - {}", err);
if !replace(&mut guard.notifications.failed, true) {
Poll::Ready(Ok(0))
} else {
Poll::Pending
}
}
Ok(()) => {
drop(guard);
Poll::Pending
}
}
Ok(()) => {
drop(guard);
Poll::Pending
}
}
}
InodeValFilePollGuardMode::Pipe { pipe } => {
Expand Down
Loading

0 comments on commit a7f4460

Please sign in to comment.