Skip to content
2 changes: 1 addition & 1 deletion ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ env_logger = "0.5"
lazy_static = "1.0"

[target.'cfg(not(windows))'.dev-dependencies]
tokio-uds = "0.1"
tokio-uds = "0.2"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating the package helped me diagnose tests issues


[badges]
travis-ci = { repository = "paritytech/jsonrpc", branch = "master"}
1 change: 1 addition & 0 deletions ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub extern crate jsonrpc_core;
#[cfg(test)] mod logger;

mod server;
mod select_with_weak;
mod meta;

use jsonrpc_core as jsonrpc;
Expand Down
77 changes: 77 additions & 0 deletions ipc/src/select_with_weak.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use jsonrpc::futures::{Poll, Async};
use jsonrpc::futures::stream::{Stream, Fuse};

pub trait SelectWithWeakExt: Stream {
fn select_with_weak<S>(self, other: S) -> SelectWithWeak<Self, S>
where S: Stream<Item = Self::Item, Error = Self::Error>, Self: Sized;
}

impl<T> SelectWithWeakExt for T where T: Stream {
fn select_with_weak<S>(self, other: S) -> SelectWithWeak<Self, S>
where S: Stream<Item = Self::Item, Error = Self::Error>, Self: Sized {
new(self, other)
}
}

/// An adapter for merging the output of two streams.
///
/// The merged stream produces items from either of the underlying streams as
/// they become available, and the streams are polled in a round-robin fashion.
/// Errors, however, are not merged: you get at most one error at a time.
///
/// Finishes when strong stream finishes
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct SelectWithWeak<S1, S2> {
strong: Fuse<S1>,
weak: Fuse<S2>,
use_strong: bool,
}

fn new<S1, S2>(stream1: S1, stream2: S2) -> SelectWithWeak<S1, S2>
where S1: Stream,
S2: Stream<Item = S1::Item, Error = S1::Error>
{
SelectWithWeak {
strong: stream1.fuse(),
weak: stream2.fuse(),
use_strong: false,
}
}

impl<S1, S2> Stream for SelectWithWeak<S1, S2>
where S1: Stream,
S2: Stream<Item = S1::Item, Error = S1::Error>
{
type Item = S1::Item;
type Error = S1::Error;

fn poll(&mut self) -> Poll<Option<S1::Item>, S1::Error> {
let mut checked_strong = false;
loop {
if self.use_strong {
match self.strong.poll()? {
Async::Ready(Some(item)) => {
self.use_strong = false;
return Ok(Some(item).into())
},
Async::Ready(None) => return Ok(None.into()),
Async::NotReady => {
if !checked_strong {
self.use_strong = false;
} else {
return Ok(Async::NotReady)
}
}
}
checked_strong = true;
} else {
self.use_strong = true;
match self.weak.poll()? {
Async::Ready(Some(item)) => return Ok(Some(item).into()),
Async::Ready(None) | Async::NotReady => (),
}
}
}
}
}
125 changes: 71 additions & 54 deletions ipc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use server_utils::tokio_io::AsyncRead;
use server_utils::{reactor, session, codecs};

use meta::{MetaExtractor, NoopExtractor, RequestContext};
use select_with_weak::SelectWithWeakExt;

/// IPC server session
pub struct Service<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
Expand Down Expand Up @@ -131,7 +132,6 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
}
};

start_signal.send(Ok(())).expect("Cannot fail since receiver never dropped before receiving");
let remote = handle.remote().clone();
let connections = listener.incoming();
let mut id = 0u64;
Expand Down Expand Up @@ -172,7 +172,9 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
})
})
.filter_map(|x| x)
.select(receiver.map_err(|e| {
// we use `select_with_weak` here, instead of `select`, to close the stream
// as soon as the ipc pipe is closed
.select_with_weak(receiver.map_err(|e| {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maintains the stream as long as the connection exists. the outgoing stream may be closed earlier

warn!(target: "ipc", "Notification error: {:?}", e);
std::io::ErrorKind::Other.into()
}));
Expand All @@ -187,6 +189,7 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {

Ok(())
});
start_signal.send(Ok(())).expect("Cannot fail since receiver never dropped before receiving");

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start_signal was triggered before the server was fully set up. After fixing the order of initialisation, we no longer need thread::sleep in tests


let stop = stop_receiver.map_err(|_| std::io::ErrorKind::Interrupted.into());
future::Either::B(
Expand Down Expand Up @@ -247,8 +250,8 @@ mod tests {
use super::{ServerBuilder, Server};
use jsonrpc::{MetaIoHandler, Value};
use jsonrpc::futures::{Future, future, Stream, Sink};
use jsonrpc::futures::sync::{mpsc, oneshot};
use self::tokio_uds::UnixStream;
use server_utils::tokio_core::reactor::Core;
use server_utils::tokio_io::AsyncRead;
use server_utils::codecs;

Expand All @@ -263,25 +266,25 @@ mod tests {
fn run(path: &str) -> Server {
let builder = server_builder();
let server = builder.start(path).expect("Server must run with no issues");
thread::sleep(::std::time::Duration::from_millis(50));
server
}

fn dummy_request_str(path: &str, data: &str) -> String {
let mut core = Core::new().expect("Tokio Core should be created with no errors");

let stream = UnixStream::connect(path, &core.handle()).expect("Should have been connected to the server");
let (writer, reader) = stream.framed(codecs::StreamCodec::stream_incoming()).split();
let reply = writer
.send(data.to_owned())
.and_then(move |_| {
reader.into_future().map_err(|(err, _)| err)
})
.and_then(|(reply, _)| {
future::ok(reply.expect("there should be one reply"))
});
let stream_future = UnixStream::connect(path);
let reply = stream_future.and_then(|stream| {
let stream= stream.framed(codecs::StreamCodec::stream_incoming());
let reply = stream
.send(data.to_owned())
.and_then(move |stream| {
stream.into_future().map_err(|(err, _)| err)
})
.and_then(|(reply, _)| {
future::ok(reply.expect("there should be one reply"))
});
reply
});

core.run(reply).unwrap()
reply.wait().expect("wait for reply")
}

#[test]
Expand All @@ -304,54 +307,54 @@ mod tests {
let path = "/tmp/test-ipc-30000";
let _server = run(path);

let core = Core::new().expect("Tokio Core should be created with no errors");
UnixStream::connect(path, &core.handle()).expect("Socket should connect");
UnixStream::connect(path).wait().expect("Socket should connect");
}

#[test]
fn request() {
::logger::init_log();
let path = "/tmp/test-ipc-40000";
let _server = run(path);
let server = run(path);
let (stop_signal, stop_receiver) = oneshot::channel();

let result = dummy_request_str(
path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
);
let t = thread::spawn(move || {
let result = dummy_request_str(
path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
);
stop_signal.send(result).unwrap();
});
t.join().unwrap();

assert_eq!(
result,
"{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
"Response does not exactly match the expected response",
let _ = stop_receiver.map(move |result: String| {
assert_eq!(
result,
"{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
"Response does not exactly match the expected response",
);
server.close();
}).wait();
}

#[test]
fn req_parallel() {
use std::thread;

::logger::init_log();
let path = "/tmp/test-ipc-45000";
let _server = run(path);
let server = run(path);
let (stop_signal, stop_receiver) = mpsc::channel(400);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we parametrize the nr of threads/loops so we can do mpsc::channel(TEST_THRDS * TEST_ITERS);?


let mut handles = Vec::new();
for _ in 0..4 {
let path = path.clone();
let mut stop_signal = stop_signal.clone();
handles.push(
thread::spawn(move || {
for _ in 0..100 {
let result = dummy_request_str(
&path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
);

assert_eq!(
result,
"{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
"Response does not exactly match the expected response",
);

::std::thread::sleep(::std::time::Duration::from_millis(10));
stop_signal.try_send(result).unwrap();
}
})
);
Expand All @@ -360,6 +363,15 @@ mod tests {
for handle in handles.drain(..) {
handle.join().unwrap();
}

let _ = stop_receiver.map(|result| {
assert_eq!(
result,
"{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
"Response does not exactly match the expected response",
);
}).take(400).collect().wait();
server.close();
}

#[test]
Expand All @@ -370,8 +382,7 @@ mod tests {
server.close();

assert!(::std::fs::metadata(path).is_err(), "There should be no socket file left");
let core = Core::new().expect("Tokio Core should be created with no errors");
assert!(UnixStream::connect(path, &core.handle()).is_err(), "Connection to the closed socket should fail");
assert!(UnixStream::connect(path).wait().is_err(), "Connection to the closed socket should fail");
}

fn huge_response_test_str() -> String {
Expand All @@ -392,6 +403,7 @@ mod tests {

#[test]
fn test_huge_response() {
::logger::init_log();
let path = "/tmp/test-ipc-60000";

let mut io = MetaIoHandler::<()>::default();
Expand All @@ -400,21 +412,26 @@ mod tests {
});
let builder = ServerBuilder::new(io);

let _server = builder.start(path).expect("Server must run with no issues");
thread::sleep(::std::time::Duration::from_millis(50));

let result = dummy_request_str(&path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}",
);
let server = builder.start(path).expect("Server must run with no issues");
let (stop_signal, stop_receiver) = oneshot::channel();

assert_eq!(
result,
huge_response_test_json(),
"Response does not exactly match the expected response",
let t = thread::spawn(move || {
let result = dummy_request_str(
&path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}",
);

}


stop_signal.send(result).unwrap();
});
t.join().unwrap();

let _ = stop_receiver.map(move |result: String| {
assert_eq!(
result,
huge_response_test_json(),
"Response does not exactly match the expected response",
);
server.close();
});
}
}