From 2b4cccaa9172b3ad89360fd0f17377dcd100ac0c Mon Sep 17 00:00:00 2001 From: Vincenzo Palazzo Date: Sun, 24 Mar 2024 16:57:41 +0100 Subject: [PATCH] io: manage would block event Signed-off-by: Vincenzo Palazzo --- plugin/src/io.rs | 128 ++++++++++++++++++++++++++++--------------- plugin/src/plugin.rs | 14 +++-- 2 files changed, 92 insertions(+), 50 deletions(-) diff --git a/plugin/src/io.rs b/plugin/src/io.rs index 9192d13..9a2bcfc 100644 --- a/plugin/src/io.rs +++ b/plugin/src/io.rs @@ -1,77 +1,115 @@ //! async io module of the plugin io. //! //! Vincenzo Palazzo -use std::io; -use std::io::{Read, Write}; +use std::io::Write; +use std::io::{self, Read}; use std::os::fd::AsRawFd; +use std::time::Duration; -const IN: mio::Token = mio::Token(0); +const SERVER_TOKEN: mio::Token = mio::Token(0); -pub(crate) struct AsyncIO { +pub struct AsyncIO { poll: mio::Poll, + events: mio::Events, } impl AsyncIO { - /// Create a new instance of an AsyncIO pub fn new() -> io::Result { Ok(Self { poll: mio::Poll::new()?, + events: mio::Events::with_capacity(1024), }) } - pub fn register(&mut self) -> io::Result<()> { + pub fn register(&self) -> io::Result<()> { let stdin = std::io::stdin().as_raw_fd(); let mut stdin = mio::unix::SourceFd(&stdin); - - self.poll.registry().register( - &mut stdin, - IN, - mio::Interest::READABLE | mio::Interest::WRITABLE, - )?; + self.poll + .registry() + .register(&mut stdin, SERVER_TOKEN, mio::Interest::READABLE)?; Ok(()) } - pub fn into_loop Option>( - &mut self, - mut async_callback: F, - ) -> io::Result<()> { - let mut events = mio::Events::with_capacity(1024); + pub fn into_loop(&mut self, mut async_callback: F) -> io::Result<()> + where + F: FnMut(String) -> Option, + { loop { - self.poll.poll(&mut events, None)?; - for event in events.iter() { - #[cfg(feature = "log")] - log::info!("getting the event: {:?}", event); + self.poll + .poll(&mut self.events, Some(Duration::from_millis(100)))?; + for event in self.events.iter() { match event.token() { - IN => { + SERVER_TOKEN => { if event.is_readable() { - let mut reader = io::stdin().lock(); - let mut buffer = String::new(); - loop { - let mut byte = [0; 1]; - reader.read_exact(&mut byte).unwrap(); - - // Append the byte to the buffer - buffer.push(byte[0] as char); - - // Check if the buffer ends with double newline - if buffer.ends_with("\n\n") { - drop(reader); - break; // Exit the loop - } - } - let Some(resp) = async_callback(buffer.clone()) else { - continue; - }; - let mut writer = io::stdout().lock(); - writer.write_all(resp.as_bytes())?; - writer.flush()?; + self.handle_connection(&mut async_callback)?; } } _ => unreachable!(), } - #[cfg(feature = "log")] - log::info!("event handled: {:?}", event); } } } + + fn handle_connection(&self, async_callback: &mut F) -> io::Result<()> + where + F: FnMut(String) -> Option, + { + loop { + let mut reader = io::stdin().lock(); + let mut buffer = String::new(); + loop { + let mut byte = [0; 1]; + crate::poll_check!(reader.read_exact(&mut byte))?; + + // Append the byte to the buffer + buffer.push(byte[0] as char); + + // Check if the buffer ends with double newline + if buffer.ends_with("\n\n") { + break; // Exit the loop + } + } + + if let Some(resp) = async_callback(buffer.clone()) { + let mut writer = io::stdout().lock(); + crate::poll_check!(writer.write_all(resp.as_bytes()))?; + crate::poll_check!(writer.flush())?; + } + } + Ok(()) + } +} + +#[macro_export] +macro_rules! poll_check { + ($expr:expr) => {{ + match $expr { + Ok(val) => Ok::<_, std::io::Error>(val), + Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => { + // Handle WouldBlock error + // For example, wait for readiness event and retry + // You may need to use mio's event loop to wait for readiness + // and then retry the operation + // For simplicity, we'll just continue the loop here + break; + } + Err(err) => Err(err.into()), + } + }}; +} + +#[macro_export] +macro_rules! poll_loop { + ($code:block) => {{ + while let Err(ref err) = $code { + if err.kind() == std::io::ErrorKind::WouldBlock { + // Handle WouldBlock error + // For example, wait for readiness event and retry + // You may need to use mio's event loop to wait for readiness + // and then retry the operation + // For simplicity, we'll just continue the loop here + continue; + } + } + }}; } diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 28c0d4d..8000557 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -79,8 +79,11 @@ impl log::Log for Log { method: "log".to_owned(), params: payload, }; - let _ = writer.write_all(serde_json::to_string(&request).unwrap().as_bytes()); - let _ = writer.flush(); + + crate::poll_loop!({ + writer.write_all(serde_json::to_string(&request).unwrap().as_bytes()) + }); + crate::poll_loop!({ writer.flush() }); } } @@ -122,9 +125,10 @@ impl<'a, T: 'a + Clone> Plugin { method: "log".to_owned(), params: payload, }; - // We do not like unwrap there - let _ = writer.write_all(serde_json::to_string(&request).unwrap().as_bytes()); - let _ = writer.flush(); + crate::poll_loop!({ + writer.write_all(serde_json::to_string(&request).unwrap().as_bytes()) + }); + crate::poll_loop!({ writer.flush() }); } /// register the plugin option.