From 506d070af64f5df7eda90421eb1125f25d133843 Mon Sep 17 00:00:00 2001 From: Vincenzo Palazzo Date: Sun, 24 Mar 2024 16:57:41 +0100 Subject: [PATCH] io: manage would block event Signed-off-by: Vincenzo Palazzo --- plugin/src/io.rs | 41 +++++++++++++++++++++++++++++++++++++---- plugin/src/plugin.rs | 14 +++++++++----- 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/plugin/src/io.rs b/plugin/src/io.rs index 9192d13..0ffc08d 100644 --- a/plugin/src/io.rs +++ b/plugin/src/io.rs @@ -7,6 +7,40 @@ use std::os::fd::AsRawFd; const IN: mio::Token = mio::Token(0); +#[macro_export] +macro_rules! pool_check { + ($expr:expr) => {{ + match $expr { + Ok(val) => Ok::<_, std::io::Error>(val), + Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => { + // Handle WouldBlock error + // For example, wait for readiness event and retry + // You may need to use mio's event loop to wait for readiness + // and then retry the operation + // For simplicity, we'll just continue the loop here + continue; + } + Err(err) => Err(err.into()), + } + }}; +} + +#[macro_export] +macro_rules! poll_loop { + ($code:block) => {{ + while let Err(ref err) = $code { + if err.kind() == std::io::ErrorKind::WouldBlock { + // Handle WouldBlock error + // For example, wait for readiness event and retry + // You may need to use mio's event loop to wait for readiness + // and then retry the operation + // For simplicity, we'll just continue the loop here + continue; + } + } + }}; +} + pub(crate) struct AsyncIO { poll: mio::Poll, } @@ -48,14 +82,13 @@ impl AsyncIO { let mut buffer = String::new(); loop { let mut byte = [0; 1]; - reader.read_exact(&mut byte).unwrap(); + pool_check!(reader.read_exact(&mut byte))?; // Append the byte to the buffer buffer.push(byte[0] as char); // Check if the buffer ends with double newline if buffer.ends_with("\n\n") { - drop(reader); break; // Exit the loop } } @@ -63,8 +96,8 @@ impl AsyncIO { continue; }; let mut writer = io::stdout().lock(); - writer.write_all(resp.as_bytes())?; - writer.flush()?; + pool_check!(writer.write_all(resp.as_bytes()))?; + pool_check!(writer.flush())?; } } _ => unreachable!(), diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 28c0d4d..8000557 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -79,8 +79,11 @@ impl log::Log for Log { method: "log".to_owned(), params: payload, }; - let _ = writer.write_all(serde_json::to_string(&request).unwrap().as_bytes()); - let _ = writer.flush(); + + crate::poll_loop!({ + writer.write_all(serde_json::to_string(&request).unwrap().as_bytes()) + }); + crate::poll_loop!({ writer.flush() }); } } @@ -122,9 +125,10 @@ impl<'a, T: 'a + Clone> Plugin { method: "log".to_owned(), params: payload, }; - // We do not like unwrap there - let _ = writer.write_all(serde_json::to_string(&request).unwrap().as_bytes()); - let _ = writer.flush(); + crate::poll_loop!({ + writer.write_all(serde_json::to_string(&request).unwrap().as_bytes()) + }); + crate::poll_loop!({ writer.flush() }); } /// register the plugin option.