diff --git a/Cargo.lock b/Cargo.lock index 70a0b37998a5..a56ec51970e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -415,6 +415,23 @@ dependencies = [ "tonic", ] +[[package]] +name = "cln-lsps" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "cln-plugin", + "cln-rpc", + "hex", + "log", + "rand 0.9.1", + "serde", + "serde_json", + "thiserror 2.0.11", + "tokio", +] + [[package]] name = "cln-plugin" version = "0.4.0" @@ -594,7 +611,7 @@ dependencies = [ "hyper 1.5.2", "hyper-util", "pin-project-lite", - "rand", + "rand 0.8.5", "serde", "serde_json", "smallvec", @@ -778,7 +795,19 @@ checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", +] + +[[package]] +name = "getrandom" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.2+wasi-0.2.4", ] [[package]] @@ -1223,6 +1252,16 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "lockfree-object-pool" version = "0.1.6" @@ -1309,7 +1348,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] @@ -1403,6 +1442,29 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "pem" version = "3.0.4" @@ -1557,6 +1619,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" + [[package]] name = "rand" version = "0.8.5" @@ -1564,8 +1632,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", ] [[package]] @@ -1575,7 +1653,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", ] [[package]] @@ -1584,7 +1672,16 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom 0.2.15", +] + +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.2", ] [[package]] @@ -1601,6 +1698,15 @@ dependencies = [ "yasna", ] +[[package]] +name = "redox_syscall" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2f103c6d277498fbceb16e84d317e2a400f160f46904d5f5410848c829511a3" +dependencies = [ + "bitflags 2.8.0", +] + [[package]] name = "regex" version = "1.11.1" @@ -1653,7 +1759,7 @@ checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" dependencies = [ "cc", "cfg-if", - "getrandom", + "getrandom 0.2.15", "libc", "spin", "untrusted", @@ -1805,6 +1911,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "sct" version = "0.7.1" @@ -1926,6 +2038,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +dependencies = [ + "libc", +] + [[package]] name = "simd-adler32" version = "0.3.7" @@ -2067,7 +2188,7 @@ checksum = "9a8a559c81686f576e8cd0290cd2a24a2a9ad80c98b3478856500fcbd7acd704" dependencies = [ "cfg-if", "fastrand", - "getrandom", + "getrandom 0.2.15", "once_cell", "rustix", "windows-sys 0.59.0", @@ -2175,15 +2296,17 @@ dependencies = [ [[package]] name = "tokio" -version = "1.43.0" +version = "1.44.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" +checksum = "e6b88822cbe49de4185e3a4cbf8321dd487cf5fe0c5c65695fef6346371e9c48" dependencies = [ "backtrace", "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.52.0", @@ -2335,7 +2458,7 @@ dependencies = [ "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", + "rand 0.8.5", "slab", "tokio", "tokio-util", @@ -2466,7 +2589,7 @@ dependencies = [ "http 1.2.0", "httparse", "log", - "rand", + "rand 0.8.5", "sha1", "thiserror 1.0.69", "utf-8", @@ -2611,6 +2734,15 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "winapi" version = "0.3.9" @@ -2724,6 +2856,15 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags 2.8.0", +] + [[package]] name = "write16" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index ae9afb7bcf6f..a24609f9f5f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,9 +4,10 @@ strip = "debuginfo" [workspace] resolver = "2" members = [ - "cln-rpc", - "cln-grpc", - "plugins", - "plugins/grpc-plugin", - "plugins/rest-plugin" + "cln-rpc", + "cln-grpc", + "plugins", + "plugins/grpc-plugin", + "plugins/rest-plugin", + "plugins/lsps-plugin", ] diff --git a/plugins/.gitignore b/plugins/.gitignore index ae3fd1fb98c6..6d09b413b503 100644 --- a/plugins/.gitignore +++ b/plugins/.gitignore @@ -20,3 +20,5 @@ cln-askrene recklessrpc exposesecret cln-xpay +cln-lsps-client +cln-lsps-service diff --git a/plugins/Makefile b/plugins/Makefile index 747f629e1a91..4c5cc84b4bb5 100644 --- a/plugins/Makefile +++ b/plugins/Makefile @@ -147,8 +147,12 @@ plugins/cln-grpc: target/${RUST_PROFILE}/cln-grpc @cp $< $@ plugins/clnrest: target/${RUST_PROFILE}/clnrest @cp $< $@ +plugins/cln-lsps-client: target/${RUST_PROFILE}/cln-lsps-client + @cp $< $@ +plugins/cln-lsps-service: target/${RUST_PROFILE}/cln-lsps-service + @cp $< $@ -PLUGINS += plugins/cln-grpc plugins/clnrest +PLUGINS += plugins/cln-grpc plugins/clnrest plugins/cln-lsps-client plugins/cln-lsps-service endif PLUGIN_COMMON_OBJS := \ @@ -300,15 +304,20 @@ CLN_PLUGIN_EXAMPLES := \ CLN_PLUGIN_SRC = $(shell find plugins/src -name "*.rs") CLN_GRPC_PLUGIN_SRC = $(shell find plugins/grpc-plugin/src -name "*.rs") CLN_REST_PLUGIN_SRC = $(shell find plugins/rest-plugin/src -name "*.rs") +CLN_LSPS_PLUGIN_SRC = $(shell find plugins/lsps-plugin/src -name "*.rs") target/${RUST_PROFILE}/cln-grpc: ${CLN_PLUGIN_SRC} ${CLN_GRPC_PLUGIN_SRC} $(MSGGEN_GENALL) $(MSGGEN_GEN_ALL) cargo build ${CARGO_OPTS} --bin cln-grpc target/${RUST_PROFILE}/clnrest: ${CLN_REST_PLUGIN_SRC} cargo build ${CARGO_OPTS} --bin clnrest +target/${RUST_PROFILE}/cln-lsps-client: ${CLN_LSPS_PLUGIN_SRC} + cargo build ${CARGO_OPTS} --bin cln-lsps-client +target/${RUST_PROFILE}/cln-lsps-service: ${CLN_LSPS_PLUGIN_SRC} + cargo build ${CARGO_OPTS} --bin cln-lsps-service ifneq ($(RUST),0) include plugins/rest-plugin/Makefile -DEFAULT_TARGETS += $(CLN_PLUGIN_EXAMPLES) plugins/cln-grpc plugins/clnrest +DEFAULT_TARGETS += $(CLN_PLUGIN_EXAMPLES) plugins/cln-grpc plugins/clnrest plugins/cln-lsps-client plugins/cln-lsps-service endif clean: plugins-clean diff --git a/plugins/lsps-plugin/Cargo.toml b/plugins/lsps-plugin/Cargo.toml new file mode 100644 index 000000000000..bbdebe9365d8 --- /dev/null +++ b/plugins/lsps-plugin/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "cln-lsps" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "cln-lsps-client" +path = "src/client.rs" + +[[bin]] +name = "cln-lsps-service" +path = "src/service.rs" + +[dependencies] +anyhow = "1.0" +async-trait = "0.1" +cln-plugin = { version = "0.4", path = "../" } +cln-rpc = { version = "0.4", path = "../../cln-rpc" } +hex = "0.4" +log = "0.4" +rand = "0.9" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +thiserror = "2.0" +tokio = { version = "1.44", features = ["full"] } diff --git a/plugins/lsps-plugin/src/client.rs b/plugins/lsps-plugin/src/client.rs new file mode 100644 index 000000000000..c54699c67e2b --- /dev/null +++ b/plugins/lsps-plugin/src/client.rs @@ -0,0 +1,64 @@ +use cln_lsps::jsonrpc::client::JsonRpcClient; +use cln_lsps::lsps0::{ + self, + transport::{Bolt8Transport, CustomMessageHookManager, WithCustomMessageHookManager}, +}; +use serde::Deserialize; +use std::path::Path; + +#[derive(Clone)] +struct State { + hook_manager: CustomMessageHookManager, +} + +impl WithCustomMessageHookManager for State { + fn get_custommsg_hook_manager(&self) -> &CustomMessageHookManager { + &self.hook_manager + } +} + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + let hook_manager = CustomMessageHookManager::new(); + let state = State { hook_manager }; + + if let Some(plugin) = cln_plugin::Builder::new(tokio::io::stdin(), tokio::io::stdout()) + .hook("custommsg", CustomMessageHookManager::on_custommsg::) + .rpcmethod( + "lsps-listprotocols", + "list protocols supported by lsp", + on_lsps_listprotocols, + ) + .start(state) + .await? + { + plugin.join().await + } else { + Ok(()) + } +} + +async fn on_lsps_listprotocols( + p: cln_plugin::Plugin, + v: serde_json::Value, +) -> Result { + #[derive(Deserialize)] + struct Request { + peer: String, + } + let dir = p.configuration().lightning_dir; + let rpc_path = Path::new(&dir).join(&p.configuration().rpc_file); + + let req: Request = serde_json::from_value(v).unwrap(); + + let client = JsonRpcClient::new(Bolt8Transport::new( + &req.peer, + rpc_path, + p.state().hook_manager.clone(), + None, + )?); + let res: lsps0::model::Lsps0listProtocolsResponse = client + .call_typed(lsps0::model::Lsps0listProtocolsRequest {}) + .await?; + Ok(serde_json::to_value(res)?) +} diff --git a/plugins/lsps-plugin/src/jsonrpc/client.rs b/plugins/lsps-plugin/src/jsonrpc/client.rs new file mode 100644 index 000000000000..5e0fe167d66c --- /dev/null +++ b/plugins/lsps-plugin/src/jsonrpc/client.rs @@ -0,0 +1,351 @@ +use async_trait::async_trait; +use core::fmt::Debug; +use log::{debug, error}; +use rand::rngs::OsRng; +use rand::TryRngCore; +use serde::{de::DeserializeOwned, Serialize}; +use serde_json::Value; +use std::sync::Arc; + +use crate::jsonrpc::{ + Error, JsonRpcRequest, JsonRpcResponse, RequestObject, ResponseObject, Result, +}; + +/// Defines the interface for transporting JSON-RPC messages. +/// +/// Implementors of this trait are responsible for actually sending the JSON-RPC +/// request over some transport mechanism (RPC, Bolt8, etc.) +#[async_trait] +pub trait Transport { + async fn send(&self, request: String) -> core::result::Result; + async fn notify(&self, request: String) -> core::result::Result<(), Error>; +} + +/// A typed JSON-RPC client that works with any transport implementation. +/// +/// This client handles the JSON-RPC protocol details including message +/// formatting, request ID generation, and response parsing. +#[derive(Clone)] +pub struct JsonRpcClient { + transport: Arc, +} + +impl JsonRpcClient { + pub fn new(transport: T) -> Self { + Self { + transport: Arc::new(transport), + } + } + + /// Makes a JSON-RPC method call with raw JSON parameters and returns a raw + /// JSON result. + pub async fn call_raw(&self, method: &str, params: Option) -> Result { + let id = generate_random_id(); + + debug!("Preparing request: method={}, id={}", method, id); + let request = RequestObject { + jsonrpc: "2.0".into(), + method: method.into(), + params, + id: Some(id.clone().into()), + }; + let res_obj = self.send_request(method, &request, id).await?; + Value::from_response(res_obj) + } + + /// Makes a typed JSON-RPC method call with a request object and returns a + /// typed response. + /// + /// This method provides type safety by using request and response types + /// that implement the necessary traits. + pub async fn call_typed(&self, request: RQ) -> Result + where + RQ: JsonRpcRequest + Serialize + Send + Sync, + RS: DeserializeOwned + Serialize + Debug + Send + Sync, + { + let method = RQ::METHOD; + let id = generate_random_id(); + + debug!("Preparing request: method={}, id={}", method, id); + let request = request.into_request(Some(id.clone().into())); + let res_obj = self.send_request(method, &request, id).await?; + RS::from_response(res_obj) + } + + /// Sends a notification with raw JSON parameters (no response expected). + pub async fn notify_raw(&self, method: &str, params: Option) -> Result<()> { + debug!("Preparing notification: method={}", method); + let request = RequestObject { + jsonrpc: "2.0".into(), + method: method.into(), + params, + id: None, + }; + Ok(self.send_notification(method, &request).await?) + } + + /// Sends a typed notification (no response expected). + pub async fn notify_typed(&self, request: RQ) -> Result<()> + where + RQ: JsonRpcRequest + Serialize + Send + Sync, + { + let method = RQ::METHOD; + + debug!("Preparing notification: method={}", method); + let request = request.into_request(None); + Ok(self.send_notification(method, &request).await?) + } + + async fn send_request( + &self, + method: &str, + payload: &RP, + id: String, + ) -> Result> + where + RP: Serialize + Send + Sync, + RS: DeserializeOwned + Serialize + Debug + Send + Sync, + { + let request_json = serde_json::to_string(&payload)?; + debug!( + "Sending request: method={}, id={}, request={:?}", + method, id, &request_json + ); + let start = tokio::time::Instant::now(); + let res_str = self.transport.send(request_json).await?; + let elapsed = start.elapsed(); + debug!( + "Received response: method={}, id={}, response={}, elapsed={}ms", + method, + id, + &res_str, + elapsed.as_millis() + ); + Ok(serde_json::from_str(&res_str)?) + } + + async fn send_notification(&self, method: &str, payload: &RP) -> Result<()> + where + RP: Serialize + Send + Sync, + { + let request_json = serde_json::to_string(&payload)?; + debug!("Sending notification: method={}", method); + let start = tokio::time::Instant::now(); + self.transport.notify(request_json).await?; + let elapsed = start.elapsed(); + debug!( + "Sent notification: method={}, elapsed={}ms", + method, + elapsed.as_millis() + ); + Ok(()) + } +} + +/// Generates a random ID for JSON-RPC requests. +/// +/// Uses a secure random number generator to create a hex-encoded ID. Falls back +/// to a timestamp-based ID if random generation fails. +fn generate_random_id() -> String { + let mut bytes = [0u8; 10]; + match OsRng.try_fill_bytes(&mut bytes) { + Ok(_) => hex::encode(bytes), + Err(e) => { + // Fallback to a timestamp-based ID if random generation fails + error!( + "Failed to generate random ID: {}, falling back to timestamp", + e + ); + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + format!("fallback-{}", timestamp) + } + } +} + +#[cfg(test)] + +mod test_json_rpc { + use serde::Deserialize; + use tokio::sync::OnceCell; + + use super::*; + use crate::jsonrpc::{self, RpcError}; + + #[derive(Clone)] + struct TestTransport { + req: Arc>, + res: Arc>, + err: Arc>, + } + + impl TestTransport { + // Get the last request as parsed JSON + fn last_request_json(&self) -> Option { + self.req + .get() + .and_then(|req_str| serde_json::from_str(req_str).ok()) + } + } + + #[async_trait] + impl Transport for TestTransport { + async fn send(&self, req: String) -> core::result::Result { + // Store the request + let _ = self.req.set(req); + + // Check for error first + if let Some(err) = &*self.err { + return Err(Error::Transport(jsonrpc::TransportError::Other(err.into()))); + } + + // Then check for response + if let Some(res) = &*self.res { + return Ok(res.clone()); + } + + panic!("TestTransport: neither result nor error is set."); + } + + async fn notify(&self, req: String) -> core::result::Result<(), Error> { + // Store the request + let _ = self.req.set(req); + + // Check for error + if let Some(err) = &*self.err { + return Err(Error::Transport(jsonrpc::TransportError::Other(err.into()))); + } + + Ok(()) + } + } + + #[derive(Default, Clone, Serialize, Deserialize, Debug)] + struct DummyCall { + foo: String, + bar: i32, + } + + impl JsonRpcRequest for DummyCall { + const METHOD: &'static str = "dummy_call"; + } + + #[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq)] + struct DummyResponse { + foo: String, + bar: i32, + } + + #[tokio::test] + async fn test_typed_call_w_response() { + let req = DummyCall { + foo: String::from("hello world!"), + bar: 13, + }; + + let expected_res = DummyResponse { + foo: String::from("hello client!"), + bar: 10, + }; + + let res_obj = expected_res + .clone() + .into_response(String::from("unique-id-123")); + let res_str = serde_json::to_string(&res_obj).unwrap(); + + let transport = TestTransport { + req: Arc::new(OnceCell::const_new()), + res: Arc::new(Some(res_str)), + err: Arc::new(None), + }; + + let client_1 = JsonRpcClient::new(transport.clone()); + let res = client_1 + .call_typed::<_, DummyResponse>(req.clone()) + .await + .expect("Should have an OK result"); + assert_eq!(res, expected_res); + let transport_req = transport + .last_request_json() + .expect("Transport should have gotten a request"); + assert_eq!( + transport_req + .get("jsonrpc") + .and_then(|v| v.as_str()) + .unwrap(), + "2.0" + ); + assert_eq!( + transport_req + .get("params") + .and_then(|v| v.as_object()) + .unwrap(), + serde_json::to_value(&req).unwrap().as_object().unwrap() + ); + } + + #[tokio::test] + async fn test_typed_call_w_rpc_error() { + let req = DummyCall { + foo: "hello world!".into(), + bar: 13, + }; + + let err_res = RpcError::custom_error_with_data( + -32099, + "got a custom error", + serde_json::json!({"got": "some"}), + ); + + let res_obj = err_res.clone().into_response("unique-id-123".into()); + let res_str = serde_json::to_string(&res_obj).unwrap(); + + let transport = TestTransport { + req: Arc::new(OnceCell::const_new()), + res: Arc::new(Some(res_str)), + err: Arc::new(None), + }; + + let client_1 = JsonRpcClient::new(transport); + let res = client_1 + .call_typed::<_, DummyResponse>(req) + .await + .expect_err("Expected error response"); + assert!(match res { + Error::Rpc(rpc_error) => { + assert_eq!(rpc_error, err_res); + true + } + _ => false, + }); + } + + #[tokio::test] + async fn test_typed_call_w_transport_error() { + let req = DummyCall { + foo: "hello world!".into(), + bar: 13, + }; + + let transport = TestTransport { + req: Arc::new(OnceCell::const_new()), + res: Arc::new(None), + err: Arc::new(Some(String::from("transport error"))), + }; + + let client_1 = JsonRpcClient::new(transport); + let res = client_1 + .call_typed::<_, DummyResponse>(req) + .await + .expect_err("Expected error response"); + assert!(match res { + Error::Transport(err) => { + assert_eq!(err.to_string(), "Other error: transport error"); + true + } + _ => false, + }); + } +} diff --git a/plugins/lsps-plugin/src/jsonrpc/mod.rs b/plugins/lsps-plugin/src/jsonrpc/mod.rs new file mode 100644 index 000000000000..78a4fa1a8514 --- /dev/null +++ b/plugins/lsps-plugin/src/jsonrpc/mod.rs @@ -0,0 +1,509 @@ +pub mod client; +use log::debug; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde_json::{self, Value}; +pub mod server; +use std::fmt; +use thiserror::Error; + +// Constants for JSON-RPC error codes +const PARSE_ERROR: i64 = -32700; +const INVALID_REQUEST: i64 = -32600; +const METHOD_NOT_FOUND: i64 = -32601; +const INVALID_PARAMS: i64 = -32602; +const INTERNAL_ERROR: i64 = -32603; + +/// Error type for JSON-RPC related operations. +/// +/// Encapsulates various error conditions that may occur during JSON-RPC +/// operations, including serialization errors, transport issues, and +/// protocol-specific errors. +#[derive(Error, Debug)] +pub enum Error { + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), + #[error("RPC error: {0}")] + Rpc(#[from] RpcError), + #[error("Transport error: {0}")] + Transport(#[from] TransportError), + #[error("Other error: {0}")] + Other(String), +} + +impl Error { + pub fn other(v: T) -> Self { + return Self::Other(v.to_string()); + } +} + +/// Transport-specific errors that may occur when sending or receiving JSON-RPC +/// messages. +#[derive(Error, Debug)] +pub enum TransportError { + #[error("Timeout")] + Timeout, + #[error("Other error: {0}")] + Other(String), +} + +/// Convenience type alias for Result with the JSON-RPC Error type. +pub type Result = std::result::Result; + +/// Trait for types that can be converted into JSON-RPC request objects. +/// +/// Implementing this trait allows a struct to be used as a typed JSON-RPC +/// request, with an associated method name and automatic conversion to the +/// request format. +pub trait JsonRpcRequest: Serialize { + const METHOD: &'static str; + fn into_request(self, id: impl Into>) -> RequestObject + where + Self: Sized, + { + RequestObject { + jsonrpc: "2.0".into(), + method: Self::METHOD.into(), + params: Some(self), + id: id.into(), + } + } +} + +/// Trait for types that can be converted from JSON-RPC response objects. +/// +/// This trait provides methods for converting between typed response objects +/// and JSON-RPC protocol response envelopes. +pub trait JsonRpcResponse +where + T: DeserializeOwned, +{ + fn into_response(self, id: String) -> ResponseObject + where + Self: Sized + DeserializeOwned, + { + ResponseObject { + jsonrpc: "2.0".into(), + id: id.into(), + result: Some(self), + error: None, + } + } + + fn from_response(resp: ResponseObject) -> Result + where + T: core::fmt::Debug, + { + match (resp.result, resp.error) { + (Some(result), None) => Ok(result), + (None, Some(error)) => Err(Error::Rpc(error)), + _ => { + debug!( + "Invalid JSON-RPC response - missing both result and error fields, or both set: id={}", + resp.id + ); + Err(Error::Rpc(RpcError::internal_error( + "not a valid json respone", + ))) + } + } + } +} + +/// Automatically implements the `JsonRpcResponse` trait for all types that +/// implement `DeserializeOwned`. This simplifies creating JSON-RPC services, +/// as you only need to define data structures that can be deserialized. +impl JsonRpcResponse for T where T: DeserializeOwned {} + +/// # RequestObject +/// +/// Represents a JSON-RPC 2.0 Request object, as defined in section 4 of the +/// specification. This structure encapsulates all necessary information for +/// a remote procedure call. +/// +/// # Type Parameters +/// +/// * `T`: The type of the `params` field. This *MUST* implement `Serialize` +/// to allow it to be encoded as JSON. Typically this will be a struct +/// implementing the `JsonRpcRequest` trait. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct RequestObject +where + T: Serialize, +{ + /// **REQUIRED**. MUST be `"2.0"`. + pub jsonrpc: String, + /// **REQUIRED**. The method to be invoked. + pub method: String, + /// A struct containing the method parameters. + #[serde(skip_serializing_if = "is_none_or_null")] + pub params: Option, + /// An identifier established by the Client that MUST contain a String. + /// # Note: this is special to LSPS0, might change to match the more general + /// JSON-RPC 2.0 sepec if needed. + #[serde(skip_serializing_if = "Option::is_none")] + pub id: Option, +} + +impl RequestObject +where + T: Serialize, +{ + /// Returns the inner data object contained by params for handling or future + /// processing. + pub fn into_inner(self) -> Option { + self.params + } +} + +/// Helper function to check if params is None or would serialize to null. +fn is_none_or_null(opt: &Option) -> bool { + match opt { + None => true, + Some(val) => match serde_json::to_value(&val) { + Ok(Value::Null) => true, + _ => false, + }, + } +} + +/// # ResponseObject +/// +/// Represents a JSON-RPC 2.0 Response object, as defined in section 5.0 of the +/// specification. This structure encapsulates either a successful result or +/// an error. +/// +/// # Type Parameters +/// +/// * `T`: The type of the `result` field, which will be returned upon a +/// succesful execution of the procedure. *MUST* implement both `Serialize` +/// (to allow construction of responses) and `DeserializeOwned` (to allow +/// receipt and parsing of responses). +#[derive(Debug, Serialize, Deserialize)] +#[serde(bound = "T: Serialize + DeserializeOwned")] +pub struct ResponseObject +where + T: DeserializeOwned, +{ + /// **REQUIRED**. MUST be `"2.0"`. + jsonrpc: String, + /// **REQUIRED**. The identifier of the original request this is a response. + id: String, + /// **REQUIRED on success**. The data if there is a request and non-errored. + /// MUST NOT exist if there was an error triggered during invocation. + #[serde(skip_serializing_if = "Option::is_none")] + result: Option, + /// **REQUIRED on error** An error type if there was a failure. + error: Option, +} + +impl ResponseObject +where + T: DeserializeOwned + Serialize + core::fmt::Debug, +{ + /// Returns a potential data (result) if the code execution passed else it + /// returns with RPC error, data (error details) if there was + pub fn into_inner(self) -> Result { + T::from_response(self) + } +} + +/// # RpcError +/// +/// Represents an error object in a JSON-RPC 2.0 Response object (section 5.1). +/// Provides structured information about an error that occurred during the +/// method invocation. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct RpcError { + /// **REQUIRED**. An integer indicating the type of error. + pub code: i64, + /// **REQUIRED**. A string containing a short description of the error. + pub message: String, + /// A primitive that can be either Primitive or Structured type if there + /// were. + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, +} + +impl RpcError { + pub fn into_response(self, id: String) -> ResponseObject { + ResponseObject { + jsonrpc: "2.0".into(), + id: id.into(), + result: None, + error: Some(self), + } + } +} + +impl RpcError { + /// Reserved for implementation-defined server-errors. + pub fn custom_error(code: i64, message: T) -> Self { + RpcError { + code, + message: message.to_string(), + data: None, + } + } + + /// Reserved for implementation-defined server-errors. + pub fn custom_error_with_data( + code: i64, + message: T, + data: serde_json::Value, + ) -> Self { + RpcError { + code, + message: message.to_string(), + data: Some(data), + } + } + + /// Invalid JSON was received by the server. + /// An error occurred on the server while parsing the JSON text. + pub fn parse_error(message: T) -> Self { + Self::custom_error(PARSE_ERROR, message) + } + + /// Invalid JSON was received by the server. + /// An error occurred on the server while parsing the JSON text. + pub fn parse_error_with_data( + message: T, + data: serde_json::Value, + ) -> Self { + Self::custom_error_with_data(PARSE_ERROR, message, data) + } + + /// The JSON sent is not a valid Request object. + pub fn invalid_request(message: T) -> Self { + Self::custom_error(INVALID_REQUEST, message) + } + + /// The JSON sent is not a valid Request object. + pub fn invalid_request_with_data( + message: T, + data: serde_json::Value, + ) -> Self { + Self::custom_error_with_data(INVALID_REQUEST, message, data) + } + + /// The method does not exist / is not available. + pub fn method_not_found(message: T) -> Self { + Self::custom_error(METHOD_NOT_FOUND, message) + } + + /// The method does not exist / is not available. + pub fn method_not_found_with_data( + message: T, + data: serde_json::Value, + ) -> Self { + Self::custom_error_with_data(METHOD_NOT_FOUND, message, data) + } + + /// Invalid method parameter(s). + pub fn invalid_params(message: T) -> Self { + Self::custom_error(INVALID_PARAMS, message) + } + + /// Invalid method parameter(s). + pub fn invalid_params_with_data( + message: T, + data: serde_json::Value, + ) -> Self { + Self::custom_error_with_data(INVALID_PARAMS, message, data) + } + + /// Internal JSON-RPC error. + pub fn internal_error(message: T) -> Self { + Self::custom_error(INTERNAL_ERROR, message) + } + + /// Internal JSON-RPC error. + pub fn internal_error_with_data( + message: T, + data: serde_json::Value, + ) -> Self { + Self::custom_error_with_data(INTERNAL_ERROR, message, data) + } +} + +impl fmt::Display for RpcError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "JSON-RPC Error (code: {}, message: {}, data: {:?})", + self.code, self.message, self.data + ) + } +} + +impl std::error::Error for RpcError {} + +#[cfg(test)] +mod test_message_serialization { + use super::*; + use serde_json::json; + + #[test] + fn test_empty_params_serialization() { + // Empty params should serialize to `"params":{}` instead of + // `"params":null`. + #[derive(Debug, Serialize, Deserialize)] + pub struct SayHelloRequest; + impl JsonRpcRequest for SayHelloRequest { + const METHOD: &'static str = "say_hello"; + } + let rpc_request = SayHelloRequest.into_request(Some("unique-id-123".into())); + assert!(!serde_json::to_string(&rpc_request) + .expect("could not convert to json") + .contains("\"params\"")); + } + + #[test] + fn test_request_serialization_and_deserialization() { + // Ensure that we correctly serialize to a valid JSON-RPC 2.0 request. + #[derive(Default, Debug, Serialize, Deserialize)] + pub struct SayNameRequest { + name: String, + age: i32, + } + impl JsonRpcRequest for SayNameRequest { + const METHOD: &'static str = "say_name"; + } + let rpc_request = SayNameRequest { + name: "Satoshi".to_string(), + age: 99, + } + .into_request(Some("unique-id-123".into())); + + let json_value: serde_json::Value = serde_json::to_value(&rpc_request).unwrap(); + let expected_value: serde_json::Value = serde_json::json!({ + "jsonrpc": "2.0", + "method": "say_name", + "params": { + "name": "Satoshi", + "age": 99 + }, + "id": "unique-id-123" + }); + assert_eq!(json_value, expected_value); + + let request: RequestObject = serde_json::from_value(json_value).unwrap(); + assert_eq!(request.method, "say_name"); + assert_eq!(request.jsonrpc, "2.0"); + + let request: RequestObject = + serde_json::from_value(expected_value).unwrap(); + let inner = request.into_inner(); + assert_eq!(inner.unwrap().name, rpc_request.params.unwrap().name); + } + + #[test] + fn test_response_deserialization() { + // Check that we can convert a JSON-RPC response into a typed result. + #[derive(Debug, Serialize, Deserialize, PartialEq)] + pub struct SayNameResponse { + name: String, + age: i32, + message: String, + } + + let json_response = r#" + { + "jsonrpc": "2.0", + "result": { + "age": 99, + "message": "Hello Satoshi!", + "name": "Satoshi" + }, + "id": "unique-id-123" + }"#; + + let response_object: ResponseObject = + serde_json::from_str(json_response).unwrap(); + + let response: SayNameResponse = response_object.into_inner().unwrap(); + let expected_response = SayNameResponse { + name: "Satoshi".into(), + age: 99, + message: "Hello Satoshi!".into(), + }; + + assert_eq!(response, expected_response); + } + + #[test] + fn test_empty_result() { + // Check that we correctly deserialize an empty result. + #[derive(Debug, Serialize, Deserialize, PartialEq)] + pub struct DummyResponse {} + + let json_response = r#" + { + "jsonrpc": "2.0", + "result": {}, + "id": "unique-id-123" + }"#; + + let response_object: ResponseObject = + serde_json::from_str(json_response).unwrap(); + + let response: DummyResponse = response_object.into_inner().unwrap(); + let expected_response = DummyResponse {}; + + assert_eq!(response, expected_response); + } + #[test] + fn test_error_deserialization() { + // Check that we deserialize an error if we got one. + #[derive(Debug, Serialize, Deserialize, PartialEq)] + pub struct DummyResponse {} + + let json_response = r#" + { + "jsonrpc": "2.0", + "id": "unique-id-123", + "error": { + "code": -32099, + "message": "something bad happened", + "data": { + "f1": "v1", + "f2": 2 + } + } + }"#; + + let response_object: ResponseObject = + serde_json::from_str(json_response).unwrap(); + + let response = response_object.into_inner(); + let err = response.unwrap_err(); + match err { + Error::Rpc(err) => { + assert_eq!(err.code, -32099); + assert_eq!(err.message, "something bad happened"); + assert_eq!( + err.data, + serde_json::from_str("{\"f1\":\"v1\",\"f2\":2}").unwrap() + ); + } + _ => assert!(false), + } + } + + #[test] + fn test_error_serialization() { + let error = RpcError::invalid_request("Invalid request"); + let serialized = serde_json::to_string(&error).unwrap(); + assert_eq!(serialized, r#"{"code":-32600,"message":"Invalid request"}"#); + + let error_with_data = RpcError::internal_error_with_data( + "Internal server error", + json!({"details": "Something went wrong"}), + ); + let serialized_with_data = serde_json::to_string(&error_with_data).unwrap(); + assert_eq!( + serialized_with_data, + r#"{"code":-32603,"message":"Internal server error","data":{"details":"Something went wrong"}}"# + ); + } +} diff --git a/plugins/lsps-plugin/src/jsonrpc/server.rs b/plugins/lsps-plugin/src/jsonrpc/server.rs new file mode 100644 index 000000000000..f9e3334b0059 --- /dev/null +++ b/plugins/lsps-plugin/src/jsonrpc/server.rs @@ -0,0 +1,302 @@ +use crate::jsonrpc::{Result, RpcError}; +use async_trait::async_trait; +use log::{debug, trace}; +use std::{collections::HashMap, sync::Arc}; + +/// Responsible for writing JSON-RPC responses back to clients. +/// +/// This trait abstracts the mechanism for sending responses back to the client, +/// allowing handlers to remain transport-agnostic. Implementations of this +/// trait handle the actual transmission of response data over the underlying +/// transport. +#[async_trait] +pub trait JsonRpcResponseWriter: Send + 'static { + /// Writes the provided payload as a response. + async fn write(&mut self, payload: &[u8]) -> Result<()>; +} + +/// Processes JSON-RPC requests and produces responses. +/// +/// This trait defines the interface for handling specific JSON-RPC methods. +/// Each method supported by the server should have a corresponding handler +/// that implements this trait. +#[async_trait] +pub trait RequestHandler: Send + Sync + 'static { + /// Handles a JSON-RPC request. + async fn handle(&self, payload: &[u8]) -> core::result::Result, RpcError>; +} + +/// Builder for creating JSON-RPC servers. +pub struct JsonRpcServerBuilder { + handlers: HashMap>, +} + +impl JsonRpcServerBuilder { + pub fn new() -> Self { + Self { + handlers: HashMap::new(), + } + } + + /// Registers a handler for a specific JSON-RPC method. + pub fn with_handler(mut self, method: String, handler: Arc) -> Self { + self.handlers.insert(method, handler); + self + } + + /// Builds a JSON-RPC server with the configured handlers. + pub fn build(self) -> JsonRpcServer { + JsonRpcServer { + handlers: Arc::new(self.handlers), + } + } +} + +/// Server for handling JSON-RPC 2.0 requests. +/// +/// Dispatches incoming JSON-RPC requests to the appropriate handlers based on +/// the method name, and manages the response lifecycle. +#[derive(Clone)] +pub struct JsonRpcServer { + handlers: Arc>>, +} + +impl JsonRpcServer { + pub fn builder() -> JsonRpcServerBuilder { + JsonRpcServerBuilder::new() + } + + // Processes a JSON-RPC message and writes the response. + /// + /// This is the main entry point for handling JSON-RPC requests. It: + /// 1. Parses and validates the incoming request + /// 2. Routes the request to the appropriate handler + /// 3. Writes the response back to the client (if needed) + pub async fn handle_message( + &self, + payload: &[u8], + writer: &mut dyn JsonRpcResponseWriter, + ) -> Result<()> { + trace!("Handle request with payload: {:?}", payload); + let value: serde_json::Value = serde_json::from_slice(payload)?; + let id = value.get("id").and_then(|id| id.as_str()); + let method = value.get("method").and_then(|method| method.as_str()); + let jsonrpc = value.get("jsonrpc").and_then(|jrpc| jrpc.as_str()); + + trace!( + "Validate request: id={:?}, method={:?}, jsonrpc={:?}", + id, + method, + jsonrpc + ); + let method = match (jsonrpc, method) { + (Some(jrpc), Some(method)) if jrpc == "2.0" => method, + (_, _) => { + debug!("Got invalid request {}", value); + let err = RpcError { + code: -32600, + message: "Invalid request".into(), + data: None, + }; + return self.maybe_write_error(id, err, writer).await; + } + }; + + trace!("Get handler for id={:?}, method={:?}", id, method); + if let Some(handler) = self.handlers.get(method) { + trace!( + "Call handler for id={:?}, method={:?}, with payload={:?}", + id, + method, + payload + ); + match handler.handle(payload).await { + Ok(res) => return self.maybe_write(id, &res, writer).await, + Err(e) => { + debug!("Handler returned with error: {}", e); + return self.maybe_write_error(id, e, writer).await; + } + }; + } else { + debug!("No handler found for method: {}", method); + let err = RpcError { + code: -32601, + message: "Method not found".into(), + data: None, + }; + return self.maybe_write_error(id, err, writer).await; + } + } + + /// Writes a response if the request has an ID. + /// + /// For notifications (requests without an ID), no response is written. + async fn maybe_write( + &self, + id: Option<&str>, + payload: &[u8], + writer: &mut dyn JsonRpcResponseWriter, + ) -> Result<()> { + // No need to respond when we don't have an id - it's a notification + if id.is_some() { + return writer.write(payload).await; + } + Ok(()) + } + + /// Writes an error response if the request has an ID. + /// + /// For notifications (requests without an ID), no response is written. + async fn maybe_write_error( + &self, + id: Option<&str>, + err: RpcError, + writer: &mut dyn JsonRpcResponseWriter, + ) -> Result<()> { + // No need to respond when we don't have an id - it's a notification + if let Some(id) = id { + let err_res = err.clone().into_response(id.into()); + let err_vec = serde_json::to_vec(&err_res).map_err(|e| RpcError::internal_error(e))?; + return writer.write(&err_vec).await; + } + Ok(()) + } +} + +#[cfg(test)] +mod test_json_rpc_server { + use super::*; + + #[derive(Default)] + struct MockWriter { + log_content: String, + } + + #[async_trait] + impl JsonRpcResponseWriter for MockWriter { + async fn write(&mut self, payload: &[u8]) -> Result<()> { + println!("Write payload={:?}", &payload); + let byte_str = String::from_utf8(payload.to_vec()).unwrap(); + self.log_content = byte_str; + Ok(()) + } + } + + // Echo handler + pub struct Echo; + + #[async_trait] + impl RequestHandler for Echo { + async fn handle(&self, payload: &[u8]) -> core::result::Result, RpcError> { + println!("Called handler with payload: {:?}", &payload); + Ok(payload.to_vec()) + } + } + + #[tokio::test] + async fn test_notification() { + // A notification should not respond to the client so there is no need + // to write payload to the writer; + let server = JsonRpcServer::builder() + .with_handler("echo".to_string(), Arc::new(Echo)) + .build(); + + let mut writer = MockWriter { + log_content: String::default(), + }; + + let msg = r#"{"jsonrpc":"2.0","method":"echo","params":{"age":99,"name":"Satoshi"}}"#; // No id signals a notification. + let res = server.handle_message(msg.as_bytes(), &mut writer).await; + assert!(res.is_ok()); + assert!(writer.log_content.is_empty()); // Was a notification we don't expect a response; + } + + #[tokio::test] + async fn missing_method_field() { + // We verify the request data, check that we return an error when we + // don't understand the request. + let server = JsonRpcServer::builder() + .with_handler("echo".to_string(), Arc::new(Echo)) + .build(); + + let mut writer = MockWriter { + log_content: String::default(), + }; + + let msg = r#"{"jsonrpc":"2.0","params":{"age":99,"name":"Satoshi"},"id":"unique-id-123"}"#; + let res = server.handle_message(msg.as_bytes(), &mut writer).await; + assert!(res.is_ok()); + let expected = r#"{"jsonrpc":"2.0","id":"unique-id-123","error":{"code":-32600,"message":"Invalid request"}}"#; // Unknown method say_hello + assert_eq!(writer.log_content, expected); + } + + #[tokio::test] + async fn wrong_version() { + // We only accept requests that have jsonrpc version 2.0. + let server = JsonRpcServer::builder() + .with_handler("echo".to_string(), Arc::new(Echo)) + .build(); + + let mut writer = MockWriter { + log_content: String::default(), + }; + + let msg = r#"{"jsonrpc":"1.0","method":"echo","params":{"age":99,"name":"Satoshi"},"id":"unique-id-123"}"#; + let res = server.handle_message(msg.as_bytes(), &mut writer).await; + assert!(res.is_ok()); + let expected = r#"{"jsonrpc":"2.0","id":"unique-id-123","error":{"code":-32600,"message":"Invalid request"}}"#; // Unknown method say_hello + assert_eq!(writer.log_content, expected); + } + + #[tokio::test] + async fn propper_request() { + // Check that we call the handler and write back to the writer when + // processing a well-formed request. + let server = JsonRpcServer::builder() + .with_handler("echo".to_string(), Arc::new(Echo)) + .build(); + + let mut writer = MockWriter { + log_content: String::default(), + }; + + let msg = r#"{"jsonrpc":"2.0","method":"echo","params":{"age":99,"name":"Satoshi"},"id":"unique-id-123"}"#; + let res = server.handle_message(msg.as_bytes(), &mut writer).await; + assert!(res.is_ok()); + assert_eq!(writer.log_content, msg.to_string()); + } + + #[tokio::test] + async fn unknown_method() { + // We don't know the method and need to send back an error to the client. + let server = JsonRpcServer::builder() + .with_handler("echo".to_string(), Arc::new(Echo)) + .build(); + + let mut writer = MockWriter { + log_content: String::default(), + }; + + let msg = r#"{"jsonrpc":"2.0","method":"say_hello","params":{"age":99,"name":"Satoshi"},"id":"unique-id-123"}"#; // Unknown method say_hello + let res = server.handle_message(msg.as_bytes(), &mut writer).await; + assert!(res.is_ok()); + let expected = r#"{"jsonrpc":"2.0","id":"unique-id-123","error":{"code":-32601,"message":"Method not found"}}"#; // Unknown method say_hello + assert_eq!(writer.log_content, expected); + } + + #[tokio::test] + async fn test_handler() { + let server = JsonRpcServer::builder() + .with_handler("echo".to_string(), Arc::new(Echo)) + .build(); + + let mut writer = MockWriter { + log_content: String::default(), + }; + + let msg = r#"{"jsonrpc":"2.0","method":"echo","params":{"age":99,"name":"Satoshi"},"id":"unique-id-123"}"#; + let res = server.handle_message(msg.as_bytes(), &mut writer).await; + assert!(res.is_ok()); + assert_eq!(writer.log_content, msg.to_string()); + } +} diff --git a/plugins/lsps-plugin/src/lib.rs b/plugins/lsps-plugin/src/lib.rs new file mode 100644 index 000000000000..8d4044c193e0 --- /dev/null +++ b/plugins/lsps-plugin/src/lib.rs @@ -0,0 +1,2 @@ +pub mod jsonrpc; +pub mod lsps0; diff --git a/plugins/lsps-plugin/src/lsps0/mod.rs b/plugins/lsps-plugin/src/lsps0/mod.rs new file mode 100644 index 000000000000..e7716c921b42 --- /dev/null +++ b/plugins/lsps-plugin/src/lsps0/mod.rs @@ -0,0 +1,2 @@ +pub mod model; +pub mod transport; diff --git a/plugins/lsps-plugin/src/lsps0/model.rs b/plugins/lsps-plugin/src/lsps0/model.rs new file mode 100644 index 000000000000..0327120e08f9 --- /dev/null +++ b/plugins/lsps-plugin/src/lsps0/model.rs @@ -0,0 +1,15 @@ +use serde::{Deserialize, Serialize}; + +use crate::jsonrpc::JsonRpcRequest; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct Lsps0listProtocolsRequest {} + +impl JsonRpcRequest for Lsps0listProtocolsRequest { + const METHOD: &'static str = "lsps0.list_protocols"; +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct Lsps0listProtocolsResponse { + pub protocols: Vec, +} diff --git a/plugins/lsps-plugin/src/lsps0/transport.rs b/plugins/lsps-plugin/src/lsps0/transport.rs new file mode 100644 index 000000000000..b19974e7769d --- /dev/null +++ b/plugins/lsps-plugin/src/lsps0/transport.rs @@ -0,0 +1,533 @@ +use crate::jsonrpc::{client::Transport, Error, TransportError}; +use async_trait::async_trait; +use cln_plugin::Plugin; +use cln_rpc::{primitives::PublicKey, ClnRpc}; +use log::{debug, error, trace}; +use serde::{de::Visitor, Deserialize, Serialize}; +use std::{ + collections::HashMap, + path::PathBuf, + str::FromStr, + sync::{Arc, Weak}, +}; +use tokio::{ + sync::{mpsc, RwLock}, + time::Duration, +}; + +pub const LSPS0_MESSAGE_TYPE: u16 = 37913; +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60); + +/// Trait that must be implemented by plugin state to access the custom message hook manager. +/// +/// This trait allows the hook handler to access the custom message hook manager +/// from the plugin state, enabling proper message routing. +pub trait WithCustomMessageHookManager { + fn get_custommsg_hook_manager(&self) -> &CustomMessageHookManager; +} + +// Manages subscriptions for the custom message hook. +/// +/// The `CustomMessageHookManager` is responsible for: +/// 1. Maintaining a registry of message ID to receiver mappings +/// 2. Processing incoming LSPS0 messages and routing them to subscribers +/// 3. Cleaning up expired subscriptions +/// +/// It uses weak references to avoid memory leaks when timeouts occ +#[derive(Clone)] +pub struct CustomMessageHookManager { + /// Maps message IDs to weak references of response channels + subs: Arc>>>>, +} + +impl CustomMessageHookManager { + /// Creates a new CustomMessageHookManager. + pub fn new() -> Self { + Self { + subs: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Subscribes to receive a message with a specific ID. + /// + /// Registers a weak reference to a channel that will receive the message + /// when it arrives. Using weak references allows for automatic cleanup if + /// the receiver is dropped due to timeout. + async fn subscribe_hook_once>( + &self, + id: I, + channel: Weak>, + ) { + let id = id.into(); + trace!("Subscribe to custom message hook for message id={}", id); + let mut sub_lock = self.subs.write().await; + sub_lock.insert(id, channel); + } + + /// Processes an incoming LSP message. + /// + /// Extracts the message ID from the payload, finds the corresponding + /// subscriber, and forwards the message to them if found. + async fn process_lsp_message(&self, payload: CustomMsg, peer_id: &str) -> bool { + // Convert the binary payload to a string + let lsps_msg_string = match String::from_utf8(payload.payload.clone()) { + Ok(v) => v, + Err(e) => { + error!("Failed to deserialize custommsg payload from {peer_id}: {e}"); + return false; + } + }; + + let id = match extract_message_id(&lsps_msg_string) { + Ok(v) => v, + Err(e) => { + error!("Failed to get id from lsps message from {peer_id}: {e}"); + return false; + } + }; + + let mut subs_lock = self.subs.write().await; + // Clean up any expired subscriptions + subs_lock.retain(|_, v| Weak::strong_count(v) > 0); + subs_lock.shrink_to_fit(); + + // Find send to, and remove the subscriber for this message ID + if let Some(tx) = subs_lock.remove(&id).map(|v| v.upgrade()).flatten() { + if let Err(e) = tx.send(payload).await { + error!("Failed to send custommsg to subscriber for id={}: {e}", id); + return false; + } + return true; + } + + debug!( + "No subscriber found for message with id={} from {peer_id}", + id + ); + false + } + + /// Handles the custommsg hook from Core Lightning. + /// + /// This method should be registered as a hook handler in a Core Lightning + /// plugin. It processes incoming custom messages and routes LSPS0 messages + /// to the appropriate subscribers. + pub async fn on_custommsg( + p: Plugin, + v: serde_json::Value, + ) -> Result + where + S: Clone + Sync + Send + 'static + WithCustomMessageHookManager, + { + // Default response is to continue processing. + let continue_response = Ok(serde_json::json!({ + "result": "continue" + })); + + // Parse the custom message hook return value. + let custommsg: CustomMsgHookReturn = match serde_json::from_value(v) { + Ok(v) => v, + Err(e) => { + error!("Failed to deserialize custommsg: {e}"); + return continue_response; + } + }; + + // Only process LSPS0 message types. + if custommsg.payload.message_type != LSPS0_MESSAGE_TYPE { + debug!( + "Custommsg is not of type LSPS0 (got {}), skipping", + custommsg.payload.message_type + ); + return continue_response; + } + + // Route the message to the appropriate handler. + // Can be moved into a separate task via tokio::spawn if needed; + let hook_watcher = p.state().get_custommsg_hook_manager(); + hook_watcher + .process_lsp_message(custommsg.payload, &custommsg.peer_id) + .await; + return continue_response; + } +} + +/// Transport implementation for JSON-RPC over Lightning Network using BOLT8 +/// and BOLT1 custom messages. +/// +/// The `Bolt8Transport` allows JSON-RPC requests to be transmitted as custom +/// messages between Lightning Network nodes. It uses Core Lightning's +/// `sendcustommsg` RPC call to send messages and the `custommsg` hook to +/// receive responses. +#[derive(Clone)] +pub struct Bolt8Transport { + /// The node ID of the destination node. + endpoint: cln_rpc::primitives::PublicKey, + /// Path to the Core Lightning RPC socket. + rpc_path: PathBuf, + /// Timeout for requests. + request_timeout: Duration, + /// Hook manager for routing messages. + hook_watcher: CustomMessageHookManager, +} + +impl Bolt8Transport { + /// Creates a new Bolt8Transport. + /// + /// # Arguments + /// * `endpoint` - Node ID of the destination node as a hex string + /// * `rpc_path` - Path to the Core Lightning socket + /// * `hook_watcher` - Hook manager to use for message routing + /// * `timeout` - Optional timeout for requests (defaults to DEFAULT_TIMEOUT) + /// + /// # Returns + /// A new `Bolt8Transport` instance or an error if the node ID is invalid + pub fn new( + endpoint: &str, + rpc_path: PathBuf, + hook_watcher: CustomMessageHookManager, + timeout: Option, + ) -> Result { + let endpoint = cln_rpc::primitives::PublicKey::from_str(endpoint) + .map_err(|e| TransportError::Other(e.to_string()))?; + let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT); + Ok(Self { + endpoint, + rpc_path, + request_timeout: timeout, + hook_watcher, + }) + } + + /// Connects to the Core Lightning node. + async fn connect_to_node(&self) -> Result { + ClnRpc::new(&self.rpc_path) + .await + .map_err(|e| Error::Transport(TransportError::Other(e.to_string()))) + } + + /// Sends a custom message to the destination node. + async fn send_custom_msg(&self, client: &mut ClnRpc, payload: Vec) -> Result<(), Error> { + send_custommsg(client, payload, self.endpoint).await + } + + /// Waits for a response with timeout. + async fn wait_for_response( + &self, + mut rx: mpsc::Receiver, + ) -> Result { + tokio::time::timeout(self.request_timeout, rx.recv()) + .await + .map_err(|_| Error::Transport(TransportError::Timeout))? + .ok_or(Error::Transport(TransportError::Other(String::from( + "Channel unexpectedly closed", + )))) + } +} + +/// Sends a custom message to the destination node. +pub async fn send_custommsg( + client: &mut ClnRpc, + payload: Vec, + peer: PublicKey, +) -> Result<(), Error> { + let msg = CustomMsg { + message_type: LSPS0_MESSAGE_TYPE, + payload, + }; + + let request = cln_rpc::model::requests::SendcustommsgRequest { + msg: msg.to_string(), + node_id: peer, + }; + + client + .call_typed(&request) + .await + .map_err(|e| { + Error::Transport(TransportError::Other(format!( + "Failed to send custom msg: {e}" + ))) + }) + .map(|r| { + trace!("Successfully queued custom msg: {}", r.status); + () + }) +} + +#[async_trait] +impl Transport for Bolt8Transport { + /// Sends a JSON-RPC request and waits for a response. + async fn send(&self, request: String) -> core::result::Result { + let id = extract_message_id(&request)?; + let mut client = self.connect_to_node().await?; + + let (tx, rx) = mpsc::channel(1); + trace!( + "Subscribing to custom msg hook manager for request id={}", + id + ); + + // Create a strong reference that will be dropped after timeout. + let tx_arc = Arc::new(tx); + + self.hook_watcher + .subscribe_hook_once(id, Arc::downgrade(&tx_arc)) + .await; + self.send_custom_msg(&mut client, request.into_bytes()) + .await?; + + let res = self.wait_for_response(rx).await?; + + if res.message_type != LSPS0_MESSAGE_TYPE { + return Err(Error::Transport(TransportError::Other(format!( + "unexpected response message type: expected {}, got {}", + LSPS0_MESSAGE_TYPE, res.message_type + )))); + } + + core::str::from_utf8(&res.payload) + .map_err(|e| { + Error::Transport(TransportError::Other(format!( + "failed to decode msg payload {:?}: {}", + res.payload, e + ))) + }) + .map(|s| s.into()) + } + + /// Sends a notification without waiting for a response. + async fn notify(&self, request: String) -> core::result::Result<(), Error> { + let mut client = self.connect_to_node().await?; + self.send_custom_msg(&mut client, request.into_bytes()) + .await + } +} + +// Extracts the message ID from a JSON-RPC message. +fn extract_message_id(msg: &str) -> core::result::Result { + let id_only: IdOnly = serde_json::from_str(msg)?; + Ok(id_only.id) +} + +/// Represents a custom message with type and payload. +#[derive(Clone, Debug, PartialEq)] +pub struct CustomMsg { + pub message_type: u16, + pub payload: Vec, +} + +impl core::fmt::Display for CustomMsg { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut bytes = Vec::with_capacity(2 + self.payload.len()); + bytes.extend_from_slice(&self.message_type.to_be_bytes()); + bytes.extend_from_slice(&self.payload); + write!(f, "{}", hex::encode(bytes)) + } +} + +impl FromStr for CustomMsg { + type Err = Error; + + fn from_str(s: &str) -> Result { + let bytes = hex::decode(s).map_err(Error::other)?; + + if bytes.len() < 2 { + return Err(Error::other( + "hex string too short to contain a valid message_type", + )); + } + + let message_type_bytes: [u8; 2] = bytes[..2].try_into().map_err(Error::other)?; + let message_type = u16::from_be_bytes(message_type_bytes); + let payload = bytes[2..].to_owned(); + Ok(CustomMsg { + message_type, + payload, + }) + } +} + +impl Serialize for CustomMsg { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +/// Visitor for deserializing CustomMsg from strings. +struct CustomMsgVisitor; + +impl<'de> Visitor<'de> for CustomMsgVisitor { + type Value = CustomMsg; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a hex string representing a CustomMsg") + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + CustomMsg::from_str(v).map_err(E::custom) + } +} + +impl<'de> Deserialize<'de> for CustomMsg { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + deserializer.deserialize_str(CustomMsgVisitor) + } +} + +/// Struct to extract just the ID from a JSON-RPC message. +#[derive(Clone, Debug, Serialize, Deserialize)] +struct IdOnly { + id: String, +} + +/// Return type from custommsg hook. +#[derive(Clone, Debug, Serialize, Deserialize)] +struct CustomMsgHookReturn { + peer_id: String, + payload: CustomMsg, +} + +#[cfg(test)] +mod test_transport { + use super::*; + use serde_json::json; + + // Helper to create a test JSON-RPC request + fn create_test_request(id: &str) -> String { + serde_json::to_string(&json!({ + "jsonrpc": "2.0", + "method": "test_method", + "params": {"test": "value"}, + "id": id + })) + .unwrap() + } + + #[tokio::test] + async fn test_deserialize_custommsg() { + let hex_str = r#"94197b226a736f6e727063223a22322e30222c226d6574686f64223a226c737073302e6c6973745f70726f746f636f6c73222c22706172616d73223a7b7d2c226964223a226135633665613536366333383038313936346263227d"#; + let msg = CustomMsg::from_str(hex_str).unwrap(); + assert_eq!(msg.message_type, LSPS0_MESSAGE_TYPE); + } + + #[tokio::test] + async fn test_extract_message_id() { + // Test with string ID + let request = create_test_request("test-id-123"); + let id = extract_message_id(&request).unwrap(); + assert_eq!(id, "test-id-123"); + } + + #[tokio::test] + async fn custom_msg_serialization() { + let original = CustomMsg { + message_type: 0x1234, + payload: b"test payload".to_vec(), + }; + + // Test to_string and parsing from that string + let serialized = original.to_string(); + + // Convert hex to bytes + let bytes = hex::decode(&serialized).unwrap(); + + // Verify structure + assert_eq!(bytes[0], 0x12); + assert_eq!(bytes[1], 0x34); + assert_eq!(&bytes[2..], b"test payload"); + + // Test deserialization + let deserialized: CustomMsg = + serde_json::from_str(&serde_json::to_string(&serialized).unwrap()).unwrap(); + + assert_eq!(deserialized.message_type, original.message_type); + assert_eq!(deserialized.payload, original.payload); + } + + #[tokio::test] + async fn hook_manager_subscribe_and_process() { + let hook_manager = CustomMessageHookManager::new(); + + // Create test message + let test_id = "test-id-456"; + let test_request = create_test_request(test_id); + let test_msg = CustomMsg { + message_type: LSPS0_MESSAGE_TYPE, + payload: test_request.as_bytes().to_vec(), + }; + + // Set up a subscription + let (tx, mut rx) = mpsc::channel(1); + let tx_arc = Arc::new(tx); + hook_manager + .subscribe_hook_once(test_id, Arc::downgrade(&tx_arc)) + .await; + + // Process the message + let processed = hook_manager + .process_lsp_message(test_msg.clone(), "peer123") + .await; + assert!(processed); + + // Verify the received message + let received_msg = rx.recv().await.unwrap(); + assert_eq!(received_msg.message_type, LSPS0_MESSAGE_TYPE); + assert_eq!(received_msg.payload, test_request.as_bytes()); + } + + #[tokio::test] + async fn hook_manager_no_subscriber() { + let hook_manager = CustomMessageHookManager::new(); + + // Create test message with ID that has no subscriber + let test_request = create_test_request("unknown-id"); + let test_msg = CustomMsg { + message_type: LSPS0_MESSAGE_TYPE, + payload: test_request.as_bytes().to_vec(), + }; + + // Process the message + let processed = hook_manager.process_lsp_message(test_msg, "peer123").await; + assert!(!processed); + } + + #[tokio::test] + async fn hook_manager_clean_up_after_timeout() { + let hook_manager = CustomMessageHookManager::new(); + + // Create test message + let test_id = "test-id-456"; + let test_request = create_test_request(test_id); + let test_msg = CustomMsg { + message_type: LSPS0_MESSAGE_TYPE, + payload: test_request.as_bytes().to_vec(), + }; + + // Set up a subscription + let (tx, _rx) = mpsc::channel(1); + let tx_arc = Arc::new(tx); + hook_manager + .subscribe_hook_once(test_id, Arc::downgrade(&tx_arc)) + .await; + + // drop the reference pointer here to simulate a timeout. + drop(tx_arc); + + // Should not process as the reference has been dropped. + let processed = hook_manager + .process_lsp_message(test_msg.clone(), "peer123") + .await; + assert!(!processed); + assert!(hook_manager.subs.read().await.is_empty()); + } +} diff --git a/plugins/lsps-plugin/src/service.rs b/plugins/lsps-plugin/src/service.rs new file mode 100644 index 000000000000..8078f00430ab --- /dev/null +++ b/plugins/lsps-plugin/src/service.rs @@ -0,0 +1,126 @@ +use anyhow::anyhow; +use async_trait::async_trait; +use cln_lsps::jsonrpc::server::{JsonRpcResponseWriter, RequestHandler}; +use cln_lsps::jsonrpc::{server::JsonRpcServer, JsonRpcRequest}; +use cln_lsps::jsonrpc::{JsonRpcResponse, RequestObject, RpcError, TransportError}; +use cln_lsps::lsps0; +use cln_lsps::lsps0::model::{Lsps0listProtocolsRequest, Lsps0listProtocolsResponse}; +use cln_lsps::lsps0::transport::{self, CustomMsg}; +use cln_plugin::options::ConfigOption; +use cln_plugin::{options, Plugin}; +use cln_rpc::notifications::CustomMsgNotification; +use cln_rpc::primitives::PublicKey; +use log::debug; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use std::sync::Arc; + +/// An option to enable this service. It defaults to `false` as we don't want a +/// node to be an LSP per default. +/// If a user want's to run an LSP service on their node this has to explicitly +/// set to true. We keep this as a dev option for now until it actually does +/// something. +const OPTION_ENABLED: options::DefaultBooleanConfigOption = ConfigOption::new_bool_with_default( + "dev-lsps-service", + false, + "Enables an LSPS service on the node.", +); + +#[derive(Clone)] +struct State { + lsps_service: JsonRpcServer, +} + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + let lsps_service = JsonRpcServer::builder() + .with_handler( + Lsps0listProtocolsRequest::METHOD.to_string(), + Arc::new(Lsps0ListProtocolsHandler), + ) + .build(); + let state = State { lsps_service }; + + if let Some(plugin) = cln_plugin::Builder::new(tokio::io::stdin(), tokio::io::stdout()) + .option(OPTION_ENABLED) + .hook("custommsg", on_custommsg) + .configure() + .await? + { + if !plugin.option(&OPTION_ENABLED)? { + return plugin + .disable(&format!("`{}` not enabled", OPTION_ENABLED.name)) + .await; + } + + let plugin = plugin.start(state).await?; + plugin.join().await + } else { + Ok(()) + } +} + +async fn on_custommsg( + p: Plugin, + v: serde_json::Value, +) -> Result { + // All of this could be done async if needed. + let continue_response = Ok(serde_json::json!({ + "result": "continue" + })); + let msg: CustomMsgNotification = + serde_json::from_value(v).map_err(|e| anyhow!("invalid custommsg: {e}"))?; + + let req = CustomMsg::from_str(&msg.payload).map_err(|e| anyhow!("invalid payload {e}"))?; + if req.message_type != lsps0::transport::LSPS0_MESSAGE_TYPE { + // We don't care if this is not for us! + return continue_response; + } + + let dir = p.configuration().lightning_dir; + let rpc_path = Path::new(&dir).join(&p.configuration().rpc_file); + let mut writer = LspsResponseWriter { + peer_id: msg.peer_id, + rpc_path: rpc_path.try_into()?, + }; + + let service = p.state().lsps_service.clone(); + match service.handle_message(&req.payload, &mut writer).await { + Ok(_) => continue_response, + Err(e) => { + debug!("failed to handle lsps message: {}", e); + continue_response + } + } +} + +pub struct LspsResponseWriter { + peer_id: PublicKey, + rpc_path: PathBuf, +} + +#[async_trait] +impl JsonRpcResponseWriter for LspsResponseWriter { + async fn write(&mut self, payload: &[u8]) -> cln_lsps::jsonrpc::Result<()> { + let mut client = cln_rpc::ClnRpc::new(&self.rpc_path).await.map_err(|e| { + cln_lsps::jsonrpc::Error::Transport(TransportError::Other(e.to_string())) + })?; + transport::send_custommsg(&mut client, payload.to_vec(), self.peer_id).await + } +} + +pub struct Lsps0ListProtocolsHandler; + +#[async_trait] +impl RequestHandler for Lsps0ListProtocolsHandler { + async fn handle(&self, payload: &[u8]) -> core::result::Result, RpcError> { + let req: RequestObject = + serde_json::from_slice(payload).unwrap(); + if let Some(id) = req.id { + let res = Lsps0listProtocolsResponse { protocols: vec![] }.into_response(id); + let res_vec = serde_json::to_vec(&res).unwrap(); + return Ok(res_vec); + } + Ok(vec![]) + } +} diff --git a/tests/test_cln_lsps.py b/tests/test_cln_lsps.py new file mode 100644 index 000000000000..68ae13792b07 --- /dev/null +++ b/tests/test_cln_lsps.py @@ -0,0 +1,27 @@ +from fixtures import * # noqa: F401,F403 +import os + +RUST_PROFILE = os.environ.get("RUST_PROFILE", "debug") + + +def test_lsps_service_disabled(node_factory): + """By default we disable the LSPS service plugin. + + It should only be enabled if we explicitly set the config option + `lsps-service=True`. + """ + + l1 = node_factory.get_node(1) + l1.daemon.is_in_log("`lsps-service` not enabled") + + +def test_lsps0_listprotocols(node_factory): + l1, l2 = node_factory.get_nodes(2, opts=[ + {}, {"dev-lsps-service": True} + ]) + + # We don't need a channel to query for lsps services + node_factory.join_nodes([l1, l2], fundchannel=False) + + res = l1.rpc.lsps_listprotocols(peer=l2.info['id']) + assert res