Skip to content

Commit

Permalink
feat(plugin): add async io for the plugin
Browse files Browse the repository at this point in the history
Adding the support of the async io for reading to the
std io.

Link: #98
Signed-off-by: Vincenzo Palazzo <[email protected]>
  • Loading branch information
vincenzopalazzo committed Dec 25, 2023
1 parent 020cc5f commit 90f17b3
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 20 deletions.
3 changes: 2 additions & 1 deletion plugin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ keywords = [ "plugin", "cln", "rpc", "lightning", "bitcoin" ]
readme = "README.md"

[dependencies]
clightningrpc-common = { version = "0.3.0-beta.4" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
clightningrpc-common = { version = "0.3.0-beta.4" }
log = { version = "0.4.17", optional = true }
mio = "0.8.10"

[dev-dependencies]
rstest = "0.10.0"
Expand Down
138 changes: 138 additions & 0 deletions plugin/src/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
//! async io module of the plugin io.
//!
//! Vincenzo Palazzo <[email protected]>
use std::{
fs::File,
io::{self, Read},
os::{
fd::{AsRawFd, FromRawFd},
unix::prelude::FileExt,
},
sync::Arc,
};

const STDOUT: mio::Token = mio::Token(0);
const STDIN: mio::Token = mio::Token(1);

pub(crate) struct AsyncIO {
poll: mio::Poll,
write_ready: bool,
read_ready: bool,
stdin: Option<Arc<File>>,
stdout: Option<Arc<File>>,
}

impl AsyncIO {
/// Create a new instance of an AsyncIO
pub fn new() -> io::Result<Self> {
Ok(Self {
poll: mio::Poll::new()?,
write_ready: false,
read_ready: false,
stdin: None,
stdout: None,
})
}

pub fn register(&mut self) -> io::Result<()> {
let stdin = std::io::stdin().as_raw_fd();
let mut stdin = mio::unix::SourceFd(&stdin);
let stdin_file = unsafe { File::from_raw_fd(*stdin.0) };

let stdout = std::io::stdout().as_raw_fd();
let mut stdout = mio::unix::SourceFd(&stdout);
let stdout_file = unsafe { File::from_raw_fd(*stdout.0) };

self.poll
.registry()
.register(&mut stdin, STDIN, mio::Interest::READABLE)?;
self.poll
.registry()
.register(&mut stdout, STDOUT, mio::Interest::WRITABLE)?;
self.stdout = Some(Arc::new(stdout_file));
self.stdin = Some(Arc::new(stdin_file));
Ok(())
}

fn stdin(&self) -> Arc<File> {
self.stdin.clone().unwrap()
}

fn stdout(&self) -> Arc<File> {
self.stdout.clone().unwrap()
}

pub fn into_loop<F: FnMut(String) -> String>(&mut self, mut async_callback: F) {

Check warning on line 65 in plugin/src/io.rs

View workflow job for this annotation

GitHub Actions / clippy

methods called `into_*` usually take `self` by value

warning: methods called `into_*` usually take `self` by value --> plugin/src/io.rs:65:50 | 65 | pub fn into_loop<F: FnMut(String) -> String>(&mut self, mut async_callback: F) { | ^^^^^^^^^ | = help: consider choosing a less ambiguous name = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#wrong_self_convention = note: `#[warn(clippy::wrong_self_convention)]` on by default
let mut events = mio::Events::with_capacity(1024);
let mut buf = vec![];
let mut cursor = 0;
'outer: loop {
let ret = self.poll.poll(&mut events, None);
if let Err(x) = ret {
if x.kind() == io::ErrorKind::Interrupted {
continue;
}
break;
}

for event in events.iter() {
match event.token() {
STDIN => {
self.read_ready = true;
}
STDOUT => {
self.write_ready = true;
}
_ => unreachable!(),
}
}

loop {
if self.read_ready {
#[allow(unused_assignments)]
match self.stdin().read_to_end(&mut buf) {
Ok(0) => {
buf = async_callback(String::from_utf8(buf).unwrap())
.as_bytes()
.to_vec();
break 'outer;
}
Ok(x) => {
cursor += x;
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
continue;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.read_ready = false;
}
Err(_) => {
break 'outer;
}
}
} else if self.write_ready && cursor > 0 {
match self.stdout().write_at(&buf, cursor as u64) {
Ok(x) => {
unsafe {
buf.set_len(cursor);
}
let _ = buf.drain(0..x);
cursor -= x;
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
continue;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.write_ready = false;
}
Err(_) => {
break 'outer;
}
}
} else {
break;
}
}
}
}
}
1 change: 1 addition & 0 deletions plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#![crate_name = "clightningrpc_plugin"]
pub mod commands;
pub mod errors;
mod io;
pub mod macros;
pub mod plugin;
pub mod types;
29 changes: 10 additions & 19 deletions plugin/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::commands::builtin::{InitRPC, ManifestRPC};
use crate::commands::types::{CLNConf, RPCHookInfo, RPCMethodInfo};
use crate::commands::RPCCommand;
use crate::errors::PluginError;
use crate::io::AsyncIO;
use crate::types::{LogLevel, RpcOption};

#[cfg(feature = "log")]
Expand Down Expand Up @@ -241,9 +242,6 @@ impl<'a, T: 'a + Clone> Plugin<T> {
}

pub fn start(mut self) {
let reader = io::stdin();
let mut writer = io::stdout();
let mut buffer = String::new();
#[cfg(feature = "log")]
{
use std::str::FromStr;
Expand All @@ -260,31 +258,24 @@ impl<'a, T: 'a + Clone> Plugin<T> {
on_init: self.on_init.clone(),
}),
);
// FIXME: core lightning end with the double endline, so this can cause
// problem for some input reader.
// we need to parse the writer, and avoid this while loop
loop {
let _ = reader.read_line(&mut buffer);
let req_str = buffer.to_string();
if req_str.trim().is_empty() {
continue;
}
buffer.clear();
let request: Request<serde_json::Value> = serde_json::from_str(&req_str).unwrap();

let mut asyncio = AsyncIO::new().unwrap();
asyncio.register().unwrap();
asyncio.into_loop(|buffer| {
self.log(LogLevel::Info, "looping around the string");
let request: Request<serde_json::Value> = serde_json::from_str(&buffer).unwrap();
if let Some(id) = request.id {
// when the id is specified this is a RPC or Hook, so we need to return a response
let response = self.call_rpc_method(&request.method, request.params);
let mut rpc_response = init_success_response(id);
self.write_respose(&response, &mut rpc_response);
writer
.write_all(serde_json::to_string(&rpc_response).unwrap().as_bytes())
.unwrap();
writer.flush().unwrap();
return serde_json::to_string(&rpc_response).unwrap();

Check warning on line 272 in plugin/src/plugin.rs

View workflow job for this annotation

GitHub Actions / clippy

unneeded `return` statement

warning: unneeded `return` statement --> plugin/src/plugin.rs:272:17 | 272 | return serde_json::to_string(&rpc_response).unwrap(); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_return = note: `#[warn(clippy::needless_return)]` on by default help: remove `return` | 272 - return serde_json::to_string(&rpc_response).unwrap(); 272 + serde_json::to_string(&rpc_response).unwrap() |
} else {
// in case of the id is None, we are receiving the notification, so the server is not
// interested in the answer.
self.handle_notification(&request.method, request.params);
return String::new();

Check warning on line 277 in plugin/src/plugin.rs

View workflow job for this annotation

GitHub Actions / clippy

unneeded `return` statement

warning: unneeded `return` statement --> plugin/src/plugin.rs:277:17 | 277 | return String::new(); | ^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_return help: remove `return` | 277 - return String::new(); 277 + String::new() |
}
}
});
}
}

0 comments on commit 90f17b3

Please sign in to comment.