Skip to content

Commit 02271f3

Browse files
feat(plugin): add async io for the plugin
Adding the support of the async io for reading to the std io. Link: #98 Signed-off-by: Vincenzo Palazzo <[email protected]>
1 parent 56b385f commit 02271f3

File tree

4 files changed

+85
-18
lines changed

4 files changed

+85
-18
lines changed

plugin/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@ keywords = [ "plugin", "cln", "rpc", "lightning", "bitcoin" ]
1111
readme = "README.md"
1212

1313
[dependencies]
14+
clightningrpc-common = { version = "0.3.0-beta.4" }
1415
serde = { version = "1.0", features = ["derive"] }
1516
serde_json = "1.0"
16-
clightningrpc-common = { version = "0.3.0-beta.4" }
1717
log = { version = "0.4.17", optional = true }
18+
mio = "0.8.10"
1819

1920
[features]
2021
log = ["dep:log"]

plugin/src/io.rs

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
//! async io module of the plugin io.
2+
//!
3+
//! Vincenzo Palazzo <[email protected]>
4+
use std::io;
5+
use std::io::{Read, Write};
6+
use std::os::fd::AsRawFd;
7+
8+
const SERVER: mio::Token = mio::Token(0);
9+
10+
pub(crate) struct AsyncIO {
11+
poll: mio::Poll,
12+
}
13+
14+
impl AsyncIO {
15+
/// Create a new instance of an AsyncIO
16+
pub fn new() -> io::Result<Self> {
17+
Ok(Self {
18+
poll: mio::Poll::new()?,
19+
})
20+
}
21+
22+
pub fn register(&mut self) -> io::Result<()> {
23+
let stdin = std::io::stdin().as_raw_fd();
24+
let mut stdin = mio::unix::SourceFd(&stdin);
25+
let stdout = std::io::stdout().as_raw_fd();
26+
let mut stdout = mio::unix::SourceFd(&stdout);
27+
28+
self.poll
29+
.registry()
30+
.register(&mut stdin, SERVER, mio::Interest::READABLE)?;
31+
self.poll
32+
.registry()
33+
.register(&mut stdout, SERVER, mio::Interest::WRITABLE)?;
34+
Ok(())
35+
}
36+
37+
pub fn into_loop<F: FnMut(String) -> String>(&mut self, mut async_callback: F) {
38+
let mut events = mio::Events::with_capacity(1024);
39+
loop {
40+
self.poll.poll(&mut events, None).unwrap();
41+
42+
for event in events.iter() {
43+
match event.token() {
44+
SERVER => {
45+
if event.is_readable() {
46+
let mut reader = io::stdin().lock();
47+
let mut buffer = String::new();
48+
loop {
49+
let mut byte = [0; 1];
50+
reader.read_exact(&mut byte).unwrap();
51+
52+
// Append the byte to the buffer
53+
buffer.push(byte[0] as char);
54+
55+
// Check if the buffer ends with double newline
56+
if buffer.ends_with("\n\n") {
57+
break; // Exit the loop
58+
}
59+
}
60+
let resp = async_callback(buffer.clone());
61+
io::stdout().write_all(resp.as_bytes()).unwrap();
62+
io::stdout().flush().unwrap();
63+
}
64+
}
65+
_ => unreachable!(),
66+
}
67+
}
68+
}
69+
}
70+
}

plugin/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#![crate_name = "clightningrpc_plugin"]
1010
pub mod commands;
1111
pub mod errors;
12+
mod io;
1213
pub mod macros;
1314
pub mod plugin;
1415
pub mod types;

plugin/src/plugin.rs

+12-17
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::commands::builtin::{InitRPC, ManifestRPC};
1515
use crate::commands::types::{CLNConf, RPCHookInfo, RPCMethodInfo};
1616
use crate::commands::RPCCommand;
1717
use crate::errors::PluginError;
18+
use crate::io::AsyncIO;
1819
use crate::types::{LogLevel, RpcOption};
1920

2021
#[cfg(feature = "log")]
@@ -257,9 +258,6 @@ impl<'a, T: 'a + Clone> Plugin<T> {
257258
}
258259

259260
pub fn start(mut self) {
260-
let reader = io::stdin();
261-
let mut writer = io::stdout();
262-
let mut buffer = String::new();
263261
#[cfg(feature = "log")]
264262
{
265263
use std::str::FromStr;
@@ -276,29 +274,26 @@ impl<'a, T: 'a + Clone> Plugin<T> {
276274
on_init: self.on_init.clone(),
277275
}),
278276
);
279-
// FIXME: core lightning end with the double endline, so this can cause
280-
// problem for some input reader.
281-
// we need to parse the writer, and avoid this while loop
282-
while let Ok(_) = reader.read_line(&mut buffer) {
283-
let req_str = buffer.to_string();
284-
buffer.clear();
285-
let Ok(request) = serde_json::from_str::<Request<serde_json::Value>>(&req_str) else {
286-
continue;
287-
};
277+
let mut asyncio = AsyncIO::new().unwrap();
278+
asyncio.register().unwrap();
279+
asyncio.into_loop(|buffer| {
280+
self.log(
281+
LogLevel::Info,
282+
&format!("looping around the string: {buffer}"),
283+
);
284+
let request: Request<serde_json::Value> = serde_json::from_str(&buffer).unwrap();
288285
if let Some(id) = request.id {
289286
// when the id is specified this is a RPC or Hook, so we need to return a response
290287
let response = self.call_rpc_method(&request.method, request.params);
291288
let mut rpc_response = init_success_response(id);
292289
self.write_respose(&response, &mut rpc_response);
293-
writer
294-
.write_all(serde_json::to_string(&rpc_response).unwrap().as_bytes())
295-
.unwrap();
296-
writer.flush().unwrap();
290+
return serde_json::to_string(&rpc_response).unwrap();
297291
} else {
298292
// in case of the id is None, we are receiving the notification, so the server is not
299293
// interested in the answer.
300294
self.handle_notification(&request.method, request.params);
295+
return String::new();
301296
}
302-
}
297+
});
303298
}
304299
}

0 commit comments

Comments
 (0)