diff --git a/plugin/Cargo.toml b/plugin/Cargo.toml index d7f4f0a..d21e696 100644 --- a/plugin/Cargo.toml +++ b/plugin/Cargo.toml @@ -11,10 +11,11 @@ keywords = [ "plugin", "cln", "rpc", "lightning", "bitcoin" ] readme = "README.md" [dependencies] +clightningrpc-common = { version = "0.3.0-beta.4" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -clightningrpc-common = { version = "0.3.0-beta.4" } log = { version = "0.4.17", optional = true } +mio = { version = "0.8.10", features = ["os-ext"] } [features] log = ["dep:log"] diff --git a/plugin/examples/foo_plugin.rs b/plugin/examples/foo_plugin.rs index 2c01ad6..42c1692 100644 --- a/plugin/examples/foo_plugin.rs +++ b/plugin/examples/foo_plugin.rs @@ -1,5 +1,7 @@ extern crate clightningrpc_plugin; +use std::io; + use clightningrpc_plugin::types::LogLevel; use clightningrpc_plugin::{commands::RPCCommand, errors::PluginError, plugin::Plugin}; use serde_json::{json, Value}; @@ -35,7 +37,7 @@ impl RPCCommand for OnShutdown { } } -fn main() { +fn main() -> io::Result<()> { let plugin = Plugin::::new(PluginState(()), true) .add_rpc_method( "hello", @@ -56,5 +58,6 @@ fn main() { json!({}) }) .clone(); - plugin.start(); + plugin.start()?; + Ok(()) } diff --git a/plugin/src/commands/builtin.rs b/plugin/src/commands/builtin.rs index c1eebe9..7dd2366 100644 --- a/plugin/src/commands/builtin.rs +++ b/plugin/src/commands/builtin.rs @@ -51,14 +51,21 @@ impl RPCCommand for ManifestRPC { #[derive(Clone)] /// Type to define the init method and its attributes, used in plugin pub struct InitRPC { + #[allow(clippy::type_complexity)] pub(crate) on_init: Option) -> Value>>, } impl InitRPC { fn parse_option(&self, plugin: &mut Plugin, options: &HashMap) { for option_name in options.keys() { + // SAFETY: We are iterating over the key this never None + #[allow(clippy::unwrap_used)] let option = options.get(option_name).unwrap(); - plugin.option.get_mut(option_name).unwrap().value = Some(option.to_owned()); + // SAFETY: we put them into it so it is safe to unwrap. + // If we panic this mean that there is a bug + #[allow(clippy::unwrap_used)] + let opt = plugin.option.get_mut(option_name).unwrap(); + opt.value = Some(option.to_owned()); } } } @@ -66,6 +73,8 @@ impl InitRPC { impl RPCCommand for InitRPC { fn call<'c>(&self, plugin: &mut Plugin, request: Value) -> Result { let mut response = init_payload(); + // SAFETY: Shouwl be valid json so should be safe to unwrap + #[allow(clippy::unwrap_used)] let init: InitConf = serde_json::from_value(request.to_owned()).unwrap(); plugin.configuration = Some(init.configuration); self.parse_option(plugin, &init.options); diff --git a/plugin/src/commands/mod.rs b/plugin/src/commands/mod.rs index 304c4dc..4caee2a 100644 --- a/plugin/src/commands/mod.rs +++ b/plugin/src/commands/mod.rs @@ -15,7 +15,7 @@ use crate::errors::PluginError; /// in contrast, it is more complex but the plugin_macros package will help to simplify the API. pub trait RPCCommand: RPCCommandClone { /// call is a generic method that it is used to simulate the callback. - fn call<'c>( + fn call( &self, _: &mut Plugin, _: serde_json::Value, @@ -24,7 +24,7 @@ pub trait RPCCommand: RPCCommandClone { } /// void call is a generic method that it is used to simulate a callback with a void return type - fn call_void<'c>(&self, _plugin: &mut Plugin, _request: &'c serde_json::Value) {} + fn call_void(&self, _plugin: &mut Plugin, _request: &serde_json::Value) {} } // Splitting RPCCommandClone into its own trait allows us to provide a blanket diff --git a/plugin/src/io.rs b/plugin/src/io.rs new file mode 100644 index 0000000..74f57df --- /dev/null +++ b/plugin/src/io.rs @@ -0,0 +1,116 @@ +//! async io module of the plugin io. +//! +//! Vincenzo Palazzo +use std::io::Write; +use std::io::{self, Read}; +use std::os::fd::AsRawFd; +use std::time::Duration; + +const SERVER_TOKEN: mio::Token = mio::Token(0); + +pub struct AsyncIO { + poll: mio::Poll, + events: mio::Events, +} + +impl AsyncIO { + pub fn new() -> io::Result { + Ok(Self { + poll: mio::Poll::new()?, + events: mio::Events::with_capacity(1024), + }) + } + + pub fn register(&self) -> io::Result<()> { + let stdin = std::io::stdin().as_raw_fd(); + let mut stdin = mio::unix::SourceFd(&stdin); + self.poll + .registry() + .register(&mut stdin, SERVER_TOKEN, mio::Interest::READABLE)?; + Ok(()) + } + + #[allow(clippy::wrong_self_convention)] + pub fn into_loop(&mut self, mut async_callback: F) -> io::Result<()> + where + F: FnMut(String) -> Option, + { + loop { + self.poll + .poll(&mut self.events, Some(Duration::from_millis(100)))?; + for event in self.events.iter() { + match event.token() { + SERVER_TOKEN => { + if event.is_readable() { + self.handle_connection(&mut async_callback)?; + } + } + _ => unreachable!(), + } + } + } + } + + fn handle_connection(&self, async_callback: &mut F) -> io::Result<()> + where + F: FnMut(String) -> Option, + { + loop { + let mut reader = io::stdin().lock(); + let mut buffer = String::new(); + loop { + let mut byte = [0; 1]; + crate::poll_check!(reader.read_exact(&mut byte))?; + + // Append the byte to the buffer + buffer.push(byte[0] as char); + + // Check if the buffer ends with double newline + if buffer.ends_with("\n\n") { + break; // Exit the loop + } + } + + if let Some(resp) = async_callback(buffer.clone()) { + let mut writer = io::stdout().lock(); + crate::poll_check!(writer.write_all(resp.as_bytes()))?; + crate::poll_check!(writer.flush())?; + } + } + Ok(()) + } +} + +#[macro_export] +macro_rules! poll_check { + ($expr:expr) => {{ + match $expr { + Ok(val) => Ok::<_, std::io::Error>(val), + Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => { + // Handle WouldBlock error + // For example, wait for readiness event and retry + // You may need to use mio's event loop to wait for readiness + // and then retry the operation + // For simplicity, we'll just continue the loop here + break; + } + Err(err) => Err(err.into()), + } + }}; +} + +#[macro_export] +macro_rules! poll_loop { + ($code:block) => {{ + while let Err(ref err) = $code { + if err.kind() == std::io::ErrorKind::WouldBlock { + // Handle WouldBlock error + // For example, wait for readiness event and retry + // You may need to use mio's event loop to wait for readiness + // and then retry the operation + // For simplicity, we'll just continue the loop here + continue; + } + } + }}; +} diff --git a/plugin/src/lib.rs b/plugin/src/lib.rs index c10d189..5bb3d2c 100644 --- a/plugin/src/lib.rs +++ b/plugin/src/lib.rs @@ -7,8 +7,10 @@ //! //! author and mantainer: Vincenzo Palazzo https://github.com/vincenzopalazzo #![crate_name = "clightningrpc_plugin"] +#![deny(clippy::unwrap_used)] pub mod commands; pub mod errors; +mod io; pub mod macros; pub mod plugin; pub mod types; diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index ccca087..ab6522f 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -7,14 +7,16 @@ use std::io::Write; use std::string::String; use std::sync::Arc; +use serde_json::Value; + use clightningrpc_common::json_utils::{add_str, init_payload, init_success_response}; use clightningrpc_common::types::Request; -use serde_json::Value; use crate::commands::builtin::{InitRPC, ManifestRPC}; use crate::commands::types::{CLNConf, RPCHookInfo, RPCMethodInfo}; use crate::commands::RPCCommand; use crate::errors::PluginError; +use crate::io::AsyncIO; use crate::types::{LogLevel, RpcOption}; #[cfg(feature = "log")] @@ -64,10 +66,10 @@ impl log::Log for Log { fn log(&self, record: &Record) { if self.enabled(record.metadata()) { + let mut writer = io::stdout().lock(); let level: LogLevel = record.level().into(); let msg = record.args(); - let mut writer = io::stdout(); let mut payload = init_payload(); add_str(&mut payload, "level", &level.to_string()); add_str(&mut payload, "message", &format!("{msg}")); @@ -77,10 +79,11 @@ impl log::Log for Log { method: "log".to_owned(), params: payload, }; - writer - .write_all(serde_json::to_string(&request).unwrap().as_bytes()) - .unwrap(); - writer.flush().unwrap(); + + crate::poll_loop!({ + writer.write_all(serde_json::to_string(&request).unwrap().as_bytes()) + }); + crate::poll_loop!({ writer.flush() }); } } @@ -112,7 +115,7 @@ impl<'a, T: 'a + Clone> Plugin { } pub fn log(&self, level: LogLevel, msg: &str) { - let mut writer = io::stdout(); + let mut writer = io::stdout().lock(); let mut payload = init_payload(); add_str(&mut payload, "level", &level.to_string()); add_str(&mut payload, "message", msg); @@ -122,10 +125,12 @@ impl<'a, T: 'a + Clone> Plugin { method: "log".to_owned(), params: payload, }; - writer - .write_all(serde_json::to_string(&request).unwrap().as_bytes()) - .unwrap(); - writer.flush().unwrap(); + crate::poll_loop!({ + // SAFETY: it is valid json and if we panic there is a buf somewhere + #[allow(clippy::unwrap_used)] + writer.write_all(serde_json::to_string(&request).unwrap().as_bytes()) + }); + crate::poll_loop!({ writer.flush() }); } /// register the plugin option. @@ -139,10 +144,14 @@ impl<'a, T: 'a + Clone> Plugin { ) -> &mut Self { let def_val = match opt_type { "flag" | "bool" => { - def_val.and_then(|val| Some(serde_json::json!(val.parse::().unwrap()))) + // FIXME: remove unwrap and return the error + #[allow(clippy::unwrap_used)] + def_val.map(|val| serde_json::json!(val.parse::().unwrap())) } - "int" => def_val.and_then(|val| Some(serde_json::json!(val.parse::().unwrap()))), - "string" => def_val.and_then(|val| Some(serde_json::json!(val))), + // FIXME: remove unwrap and return the error + #[allow(clippy::unwrap_used)] + "int" => def_val.map(|val| serde_json::json!(val.parse::().unwrap())), + "string" => def_val.map(|val| serde_json::json!(val)), _ => unreachable!("{opt_type} not supported"), }; self.option.insert( @@ -209,6 +218,9 @@ impl<'a, T: 'a + Clone> Plugin { } fn handle_notification(&'a mut self, name: &str, params: serde_json::Value) { + // SAFETY: we register the notification and if we do not have inside the map + // this is a bug. + #[allow(clippy::unwrap_used)] let notification = self.rpc_notification.get(name).unwrap().clone(); notification.call_void(self, ¶ms); } @@ -250,16 +262,15 @@ impl<'a, T: 'a + Clone> Plugin { match result { Ok(json_resp) => response["result"] = json_resp.to_owned(), Err(json_err) => { + // SAFETY: should be valud json + #[allow(clippy::unwrap_used)] let err_resp = serde_json::to_value(json_err).unwrap(); response["error"] = err_resp; } } } - pub fn start(mut self) { - let reader = io::stdin(); - let mut writer = io::stdout(); - let mut buffer = String::new(); + pub fn start(mut self) -> io::Result<()> { #[cfg(feature = "log")] { use std::str::FromStr; @@ -276,29 +287,37 @@ impl<'a, T: 'a + Clone> Plugin { on_init: self.on_init.clone(), }), ); - // FIXME: core lightning end with the double endline, so this can cause - // problem for some input reader. - // we need to parse the writer, and avoid this while loop - while let Ok(_) = reader.read_line(&mut buffer) { - let req_str = buffer.to_string(); - buffer.clear(); - let Ok(request) = serde_json::from_str::>(&req_str) else { - continue; - }; + let mut asyncio = AsyncIO::new()?; + asyncio.register()?; + asyncio.into_loop(|buffer| { + #[cfg(feature = "log")] + log::info!("looping around the string: {buffer}"); + // SAFETY: should be valud json + #[allow(clippy::unwrap_used)] + let request: Request = serde_json::from_str(&buffer).unwrap(); if let Some(id) = request.id { // when the id is specified this is a RPC or Hook, so we need to return a response let response = self.call_rpc_method(&request.method, request.params); let mut rpc_response = init_success_response(id); self.write_respose(&response, &mut rpc_response); - writer - .write_all(serde_json::to_string(&rpc_response).unwrap().as_bytes()) - .unwrap(); - writer.flush().unwrap(); + #[cfg(feature = "log")] + log::info!( + "rpc or hook: {} with reponse {:?}", + request.method, + rpc_response + ); + // SAFETY: should be valud json + #[allow(clippy::unwrap_used)] + Some(serde_json::to_string(&rpc_response).unwrap()) } else { // in case of the id is None, we are receiving the notification, so the server is not // interested in the answer. self.handle_notification(&request.method, request.params); + #[cfg(feature = "log")] + log::info!("notification: {}", request.method); + None } - } + })?; + Ok(()) } } diff --git a/plugin_macros/src/attr_parser.rs b/plugin_macros/src/attr_parser.rs index 3f2a0f6..8287c33 100644 --- a/plugin_macros/src/attr_parser.rs +++ b/plugin_macros/src/attr_parser.rs @@ -36,7 +36,7 @@ fn parse_key_values( trace!(tracer, "removing the `,` tok"); check!(",", stream.advance())?; } - let value = value.to_string().replace("\"", ""); + let value = value.to_string().replace('\"', ""); trace!(tracer, "key {key} = value {value}"); hash_map.insert(key.to_string(), value.to_string()); trace!(tracer, "map is {:?}", hash_map); diff --git a/plugin_macros/src/plugin.rs b/plugin_macros/src/plugin.rs index 2fc3d51..f8d89d1 100644 --- a/plugin_macros/src/plugin.rs +++ b/plugin_macros/src/plugin.rs @@ -5,7 +5,7 @@ use kproc_parser::kproc_macros::KTokenStream; use kproc_parser::proc_macro::{TokenStream, TokenTree}; use kproc_parser::{build_error, check, trace}; -#[derive(Debug)] +#[derive(Debug, Default)] pub struct PluginDeclaration { pub state: Option, pub dynamic: Option, @@ -27,7 +27,7 @@ impl std::fmt::Display for PluginDeclaration { .map_or(String::from("false"), |val| val.to_string()) )?; if let Some(ref inner) = self.notificatios { - let mut inner = KTokenStream::new(&inner); + let mut inner = KTokenStream::new(inner); while !inner.is_end() { let notification = inner.advance(); writeln!(f, "let call = {}();", notification)?; @@ -42,7 +42,7 @@ impl std::fmt::Display for PluginDeclaration { } } if let Some(ref inner) = self.rpc_methods { - let mut inner = KTokenStream::new(&inner); + let mut inner = KTokenStream::new(inner); while !inner.is_end() { let rpc = inner.advance(); writeln!(f, "let call = {}();", rpc)?; @@ -58,18 +58,6 @@ impl std::fmt::Display for PluginDeclaration { } } -impl Default for PluginDeclaration { - fn default() -> Self { - Self { - state: None, - dynamic: None, - notificatios: None, - hooks: None, - rpc_methods: None, - } - } -} - /// proc macro syntax is something like this /// /// ```ignore diff --git a/tests/src/lib.rs b/tests/src/lib.rs index 5daf845..b0a2fa3 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -36,14 +36,14 @@ pub mod fixtures { pub fn lightningd() -> cln::Node { init(); let pwd = std::env::var("PWD").unwrap(); - let cln = async_run!(cln::Node::with_params(&format!("--developer --plugin={pwd}/target/debug/examples/foo_plugin --plugin={pwd}/target/debug/examples/macros_ex"), "regtest")).unwrap(); - cln + + async_run!(cln::Node::with_params(&format!("--developer --plugin={pwd}/target/debug/examples/foo_plugin --plugin={pwd}/target/debug/examples/macros_ex"), "regtest")).unwrap() } #[fixture] pub fn lightningd_second() -> cln::Node { init(); - let cln = async_run!(cln::Node::with_params("--developer", "regtest")).unwrap(); - cln + + async_run!(cln::Node::with_params("--developer", "regtest")).unwrap() } }