diff --git a/core/src/calls.rs b/core/src/calls.rs index 0c999ddf9..c55d79a23 100644 --- a/core/src/calls.rs +++ b/core/src/calls.rs @@ -7,6 +7,7 @@ use BoxFuture; /// Metadata trait pub trait Metadata: Clone + Send + 'static {} impl Metadata for () {} +impl Metadata for Arc {} /// Asynchronous Method pub trait RpcMethodSimple: Send + Sync + 'static { diff --git a/ipc/Cargo.toml b/ipc/Cargo.toml index bba6d6443..36855a36a 100644 --- a/ipc/Cargo.toml +++ b/ipc/Cargo.toml @@ -17,10 +17,11 @@ parity-tokio-ipc = { git = "https://github.com/nikvolf/parity-tokio-ipc" } [dev-dependencies] env_logger = "0.4" -lazy_static = "0.2" +lazy_static = "1.0" +parking_lot = "0.5" [target.'cfg(not(windows))'.dev-dependencies] -tokio-uds = "0.1" +tokio-uds = "0.2" [badges] travis-ci = { repository = "paritytech/jsonrpc", branch = "master"} diff --git a/ipc/src/lib.rs b/ipc/src/lib.rs index a25759559..c248d0c81 100644 --- a/ipc/src/lib.rs +++ b/ipc/src/lib.rs @@ -14,6 +14,7 @@ extern crate tokio_service; #[cfg(test)] mod logger; mod server; +mod select_with_weak; mod meta; pub use meta::{MetaExtractor, RequestContext}; pub use server::{Server, ServerBuilder}; diff --git a/ipc/src/select_with_weak.rs b/ipc/src/select_with_weak.rs new file mode 100644 index 000000000..204059c27 --- /dev/null +++ b/ipc/src/select_with_weak.rs @@ -0,0 +1,77 @@ +use jsonrpc::futures::{Poll, Async}; +use jsonrpc::futures::stream::{Stream, Fuse}; + +pub trait SelectWithWeakExt: Stream { + fn select_with_weak(self, other: S) -> SelectWithWeak + where S: Stream, Self: Sized; +} + +impl SelectWithWeakExt for T where T: Stream { + fn select_with_weak(self, other: S) -> SelectWithWeak + where S: Stream, Self: Sized { + new(self, other) + } +} + +/// An adapter for merging the output of two streams. +/// +/// The merged stream produces items from either of the underlying streams as +/// they become available, and the streams are polled in a round-robin fashion. +/// Errors, however, are not merged: you get at most one error at a time. +/// +/// Finishes when strong stream finishes +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct SelectWithWeak { + strong: Fuse, + weak: Fuse, + use_strong: bool, +} + +fn new(stream1: S1, stream2: S2) -> SelectWithWeak + where S1: Stream, + S2: Stream +{ + SelectWithWeak { + strong: stream1.fuse(), + weak: stream2.fuse(), + use_strong: false, + } +} + +impl Stream for SelectWithWeak + where S1: Stream, + S2: Stream +{ + type Item = S1::Item; + type Error = S1::Error; + + fn poll(&mut self) -> Poll, S1::Error> { + let mut checked_strong = false; + loop { + if self.use_strong { + match self.strong.poll()? { + Async::Ready(Some(item)) => { + self.use_strong = false; + return Ok(Some(item).into()) + }, + Async::Ready(None) => return Ok(None.into()), + Async::NotReady => { + if !checked_strong { + self.use_strong = false; + } else { + return Ok(Async::NotReady) + } + } + } + checked_strong = true; + } else { + self.use_strong = true; + match self.weak.poll()? { + Async::Ready(Some(item)) => return Ok(Some(item).into()), + Async::Ready(None) | Async::NotReady => (), + } + } + } + } +} diff --git a/ipc/src/server.rs b/ipc/src/server.rs index ad4341c24..97df6d8cd 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -11,6 +11,7 @@ use server_utils::tokio_io::AsyncRead; use server_utils::{reactor, session, codecs}; use meta::{MetaExtractor, NoopExtractor, RequestContext}; +use select_with_weak::SelectWithWeakExt; /// IPC server session pub struct Service = NoopMiddleware> { @@ -131,7 +132,6 @@ impl> ServerBuilder { } }; - start_signal.send(Ok(())).expect("Cannot fail since receiver never dropped before receiving"); let remote = handle.remote().clone(); let connections = listener.incoming(); let mut id = 0u64; @@ -172,7 +172,9 @@ impl> ServerBuilder { }) }) .filter_map(|x| x) - .select(receiver.map_err(|e| { + // we use `select_with_weak` here, instead of `select`, to close the stream + // as soon as the ipc pipe is closed + .select_with_weak(receiver.map_err(|e| { warn!(target: "ipc", "Notification error: {:?}", e); std::io::ErrorKind::Other.into() })); @@ -187,6 +189,7 @@ impl> ServerBuilder { Ok(()) }); + start_signal.send(Ok(())).expect("Cannot fail since receiver never dropped before receiving"); let stop = stop_receiver.map_err(|_| std::io::ErrorKind::Interrupted.into()); future::Either::B( @@ -242,15 +245,19 @@ impl Drop for Server { #[cfg(not(windows))] mod tests { extern crate tokio_uds; + extern crate parking_lot; use std::thread; + use std::sync::Arc; use super::{ServerBuilder, Server}; use jsonrpc::{MetaIoHandler, Value}; use jsonrpc::futures::{Future, future, Stream, Sink}; + use jsonrpc::futures::sync::{mpsc, oneshot}; use self::tokio_uds::UnixStream; - use server_utils::tokio_core::reactor::Core; + use self::parking_lot::Mutex; use server_utils::tokio_io::AsyncRead; use server_utils::codecs; + use meta::{MetaExtractor, RequestContext}; fn server_builder() -> ServerBuilder { let mut io = MetaIoHandler::<()>::default(); @@ -263,25 +270,25 @@ mod tests { fn run(path: &str) -> Server { let builder = server_builder(); let server = builder.start(path).expect("Server must run with no issues"); - thread::sleep(::std::time::Duration::from_millis(50)); server } fn dummy_request_str(path: &str, data: &str) -> String { - let mut core = Core::new().expect("Tokio Core should be created with no errors"); - - let stream = UnixStream::connect(path, &core.handle()).expect("Should have been connected to the server"); - let (writer, reader) = stream.framed(codecs::StreamCodec::stream_incoming()).split(); - let reply = writer - .send(data.to_owned()) - .and_then(move |_| { - reader.into_future().map_err(|(err, _)| err) - }) - .and_then(|(reply, _)| { - future::ok(reply.expect("there should be one reply")) - }); + let stream_future = UnixStream::connect(path); + let reply = stream_future.and_then(|stream| { + let stream= stream.framed(codecs::StreamCodec::stream_incoming()); + let reply = stream + .send(data.to_owned()) + .and_then(move |stream| { + stream.into_future().map_err(|(err, _)| err) + }) + .and_then(|(reply, _)| { + future::ok(reply.expect("there should be one reply")) + }); + reply + }); - core.run(reply).unwrap() + reply.wait().expect("wait for reply") } #[test] @@ -304,39 +311,46 @@ mod tests { let path = "/tmp/test-ipc-30000"; let _server = run(path); - let core = Core::new().expect("Tokio Core should be created with no errors"); - UnixStream::connect(path, &core.handle()).expect("Socket should connect"); + UnixStream::connect(path).wait().expect("Socket should connect"); } #[test] fn request() { ::logger::init_log(); let path = "/tmp/test-ipc-40000"; - let _server = run(path); + let server = run(path); + let (stop_signal, stop_receiver) = oneshot::channel(); - let result = dummy_request_str( - path, - "{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}", - ); + let t = thread::spawn(move || { + let result = dummy_request_str( + path, + "{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}", + ); + stop_signal.send(result).unwrap(); + }); + t.join().unwrap(); - assert_eq!( - result, - "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}", - "Response does not exactly match the expected response", + let _ = stop_receiver.map(move |result: String| { + assert_eq!( + result, + "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}", + "Response does not exactly match the expected response", ); + server.close(); + }).wait(); } #[test] fn req_parallel() { - use std::thread; - ::logger::init_log(); let path = "/tmp/test-ipc-45000"; - let _server = run(path); + let server = run(path); + let (stop_signal, stop_receiver) = mpsc::channel(400); let mut handles = Vec::new(); for _ in 0..4 { let path = path.clone(); + let mut stop_signal = stop_signal.clone(); handles.push( thread::spawn(move || { for _ in 0..100 { @@ -344,14 +358,7 @@ mod tests { &path, "{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}", ); - - assert_eq!( - result, - "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}", - "Response does not exactly match the expected response", - ); - - ::std::thread::sleep(::std::time::Duration::from_millis(10)); + stop_signal.try_send(result).unwrap(); } }) ); @@ -360,6 +367,15 @@ mod tests { for handle in handles.drain(..) { handle.join().unwrap(); } + + let _ = stop_receiver.map(|result| { + assert_eq!( + result, + "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}", + "Response does not exactly match the expected response", + ); + }).take(400).collect().wait(); + server.close(); } #[test] @@ -370,8 +386,7 @@ mod tests { server.close(); assert!(::std::fs::metadata(path).is_err(), "There should be no socket file left"); - let core = Core::new().expect("Tokio Core should be created with no errors"); - assert!(UnixStream::connect(path, &core.handle()).is_err(), "Connection to the closed socket should fail"); + assert!(UnixStream::connect(path).wait().is_err(), "Connection to the closed socket should fail"); } fn huge_response_test_str() -> String { @@ -392,6 +407,7 @@ mod tests { #[test] fn test_huge_response() { + ::logger::init_log(); let path = "/tmp/test-ipc-60000"; let mut io = MetaIoHandler::<()>::default(); @@ -400,21 +416,75 @@ mod tests { }); let builder = ServerBuilder::new(io); - let _server = builder.start(path).expect("Server must run with no issues"); - thread::sleep(::std::time::Duration::from_millis(50)); - - let result = dummy_request_str(&path, - "{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}", - ); + let server = builder.start(path).expect("Server must run with no issues"); + let (stop_signal, stop_receiver) = oneshot::channel(); - assert_eq!( - result, - huge_response_test_json(), - "Response does not exactly match the expected response", + let t = thread::spawn(move || { + let result = dummy_request_str( + &path, + "{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}", ); + stop_signal.send(result).unwrap(); + }); + t.join().unwrap(); + + let _ = stop_receiver.map(move |result: String| { + assert_eq!( + result, + huge_response_test_json(), + "Response does not exactly match the expected response", + ); + server.close(); + }).wait(); } + #[test] + fn test_session_end() { + struct SessionEndMeta { + drop_signal: Option>, + } + + impl Drop for SessionEndMeta { + fn drop(&mut self) { + trace!(target: "ipc", "Dropping session meta"); + self.drop_signal.take().unwrap().send(()).unwrap() + } + } + + struct SessionEndExtractor { + drop_receivers: Arc>>>, + } + + impl MetaExtractor> for SessionEndExtractor { + fn extract(&self, _context: &RequestContext) -> Arc { + let (signal, receiver) = oneshot::channel(); + self.drop_receivers.lock().try_send(receiver).unwrap(); + let meta = SessionEndMeta { + drop_signal: Some(signal), + }; + Arc::new(meta) + } + } + ::logger::init_log(); + let path = "/tmp/test-ipc-30009"; + let (signal, receiver) = mpsc::channel(16); + let session_metadata_extractor = SessionEndExtractor { + drop_receivers: Arc::new(Mutex::new(signal)) + }; + + let io = MetaIoHandler::>::default(); + let builder = ServerBuilder::with_meta_extractor(io, session_metadata_extractor); + let server = builder.start(path).expect("Server must run with no issues"); + { + let _ = UnixStream::connect(path).wait().expect("Socket should connect"); + } + receiver.into_future() + .map_err(|_| ()) + .and_then(|drop_receiver| drop_receiver.0.unwrap().map_err(|_| ())) + .wait().unwrap(); + server.close(); + } }