Skip to content

Commit

Permalink
Make tests compile and use current_thread in tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
tikue committed Oct 15, 2018
1 parent e6741a2 commit 6273337
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 111 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ members = [
"tarpc",
"plugins",
]

[patch."https://github.com/rust-lang-nursery/futures-rs"]
futures-preview = { git = "https://github.com/andreasots/futures-rs", branch = "compat-stack-overflow", features = ["compat", "tokio-compat"] }
futures-test-preview = { git = "https://github.com/andreasots/futures-rs", branch = "compat-stack-overflow" }
6 changes: 3 additions & 3 deletions bincode-transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use bytes::{Bytes, BytesMut};
use crate::vendored::tokio_serde_bincode::{IoErrorWrapper, ReadBincode, WriteBincode};
use futures::{
Poll,
compat::{Compat, Future01CompatExt, Stream01CompatExt},
compat::{Compat01As03, Future01CompatExt, Stream01CompatExt},
prelude::*,
ready, task,
};
Expand Down Expand Up @@ -100,13 +100,13 @@ where
/// A [`TcpListener`] that wraps connections in bincode transports.
#[derive(Debug)]
pub struct Incoming<Item, SinkItem> {
incoming: Compat<tokio_tcp::Incoming>,
incoming: Compat01As03<tokio_tcp::Incoming>,
local_addr: SocketAddr,
ghost: PhantomData<(Item, SinkItem)>,
}

impl<Item, SinkItem> Incoming<Item, SinkItem> {
unsafe_pinned!(incoming: Compat<tokio_tcp::Incoming>);
unsafe_pinned!(incoming: Compat01As03<tokio_tcp::Incoming>);

/// Returns the address being listened on.
pub fn local_addr(&self) -> SocketAddr {
Expand Down
13 changes: 9 additions & 4 deletions bincode-transport/tests/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
extern crate test;

use self::test::stats::Stats;
use futures::{compat::TokioDefaultSpawner, prelude::*, spawn};
use futures::{compat::TokioDefaultSpawner, prelude::*};
use rpc::{
client::{self, Client},
context,
Expand All @@ -32,15 +32,18 @@ async fn bench() -> io::Result<()> {
let listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = listener.local_addr();

spawn!(
tokio_executor::spawn(
Server::<u32, u32>::new(server::Config::default())
.incoming(listener)
.take(1)
.respond_with(|_ctx, request| futures::future::ready(Ok(request)))
.unit_error()
.boxed()
.compat()
);

let conn = await!(bincode_transport::connect(&addr))?;
let client = &mut await!(Client::<u32, u32>::new(client::Config::default(), conn));
let client = &mut await!(Client::<u32, u32>::new(client::Config::default(), conn))?;

let total = 10_000usize;
let mut successful = 0u32;
Expand Down Expand Up @@ -98,11 +101,13 @@ async fn bench() -> io::Result<()> {
#[test]
fn bench_small_packet() -> io::Result<()> {
env_logger::init();
rpc::init(TokioDefaultSpawner);

tokio::run(
bench()
.map_err(|e| panic!(e.to_string()))
.boxed()
.compat(TokioDefaultSpawner),
.compat(),
);
println!("done");

Expand Down
28 changes: 9 additions & 19 deletions bincode-transport/tests/cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use futures::{
compat::{Future01CompatExt, TokioDefaultSpawner},
prelude::*,
spawn, stream,
stream,
};
use log::{info, trace};
use rand::distributions::{Distribution, Normal};
Expand Down Expand Up @@ -73,21 +73,16 @@ async fn run() -> io::Result<()> {
Ok(request)
}
});
spawn!(handler);
tokio_executor::spawn(handler.unit_error().boxed().compat());
});

spawn!(server).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Couldn't spawn server: {:?}", e),
)
})?;
tokio_executor::spawn(server.unit_error().boxed().compat());

let conn = await!(bincode_transport::connect(&addr))?;
let client = await!(Client::<String, String>::new(
client::Config::default(),
conn
));
))?;

// Proxy service
let listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
Expand All @@ -109,16 +104,11 @@ async fn run() -> io::Result<()> {
let mut client = client.clone();
async move { await!(client.call(ctx, request)) }
});
spawn!(handler);
tokio_executor::spawn(handler.unit_error().boxed().compat());
}
});

spawn!(proxy_server).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Couldn't spawn server: {:?}", e),
)
})?;
tokio_executor::spawn(proxy_server.unit_error().boxed().compat());

let mut config = client::Config::default();
config.max_in_flight_requests = 10;
Expand All @@ -127,7 +117,7 @@ async fn run() -> io::Result<()> {
let client = await!(Client::<String, String>::new(
config,
await!(bincode_transport::connect(&addr))?
));
))?;

// Make 3 speculative requests, returning only the quickest.
let mut clients: Vec<_> = (1..=3u32).map(|_| client.clone()).collect();
Expand All @@ -149,13 +139,13 @@ async fn run() -> io::Result<()> {
#[test]
fn cancel_slower() -> io::Result<()> {
env_logger::init();
rpc::init(TokioDefaultSpawner);

tokio::run(
run()
.boxed()
.map_err(|e| panic!(e))
.compat(TokioDefaultSpawner),
.compat(),
);

Ok(())
}
30 changes: 9 additions & 21 deletions bincode-transport/tests/pushback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
use futures::{
compat::{Future01CompatExt, TokioDefaultSpawner},
prelude::*,
spawn,
};
use log::{error, info, trace, warn};
use log::{error, info, trace};
use rand::distributions::{Distribution, Normal};
use rpc::{
client::{self, Client},
Expand Down Expand Up @@ -73,43 +72,31 @@ async fn run() -> io::Result<()> {
Ok(request)
}
});
if let Err(e) = spawn!(handler) {
warn!("Couldn't spawn request handler: {:?}", e);
}
tokio_executor::spawn(handler.unit_error().boxed().compat());
});

spawn!(server).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Couldn't spawn server: {:?}", e),
)
})?;
tokio_executor::spawn(server.unit_error().boxed().compat());

let mut config = client::Config::default();
config.max_in_flight_requests = 10;
config.pending_request_buffer = 10;

let conn = await!(bincode_transport::connect(&addr))?;
let client = await!(Client::<String, String>::new(config, conn));
let client = await!(Client::<String, String>::new(config, conn))?;

let clients = (1..=100u32).map(|_| client.clone()).collect::<Vec<_>>();
for mut client in clients {
let ctx = context::current();
spawn!(
tokio_executor::spawn(
async move {
let trace_id = *ctx.trace_id();
let response = client.call(ctx, "ping".into());
match await!(response) {
Ok(response) => info!("[{}] response: {}", trace_id, response),
Err(e) => error!("[{}] request error: {:?}: {}", trace_id, e.kind(), e),
}
}
).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Couldn't spawn server: {:?}", e),
)
})?;
}.unit_error().boxed().compat()
);
}

Ok(())
Expand All @@ -118,13 +105,14 @@ async fn run() -> io::Result<()> {
#[test]
fn ping_pong() -> io::Result<()> {
env_logger::init();
rpc::init(TokioDefaultSpawner);

tokio::run(
run()
.map_ok(|_| println!("done"))
.map_err(|e| panic!(e.to_string()))
.boxed()
.compat(TokioDefaultSpawner),
.compat(),
);

Ok(())
Expand Down
1 change: 0 additions & 1 deletion rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
gen_future,
decl_macro,
existential_type,
option_replace,
)]
#![deny(missing_docs, missing_debug_implementations)]

Expand Down
7 changes: 3 additions & 4 deletions rpc/src/util/deadline_compat.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use futures::{
compat::{Compat, Future01CompatExt},
compat::{Compat01As03, Future01CompatExt},
prelude::*,
ready, task::{Poll, LocalWaker},
};
use log::trace;
use pin_utils::unsafe_pinned;
use std::pin::Pin;
use std::time::Instant;
Expand All @@ -13,12 +12,12 @@ use tokio_timer::{timeout, Delay};
#[derive(Debug)]
pub struct Deadline<T> {
future: T,
delay: Compat<Delay>,
delay: Compat01As03<Delay>,
}

impl<T> Deadline<T> {
unsafe_pinned!(future: T);
unsafe_pinned!(delay: Compat<Delay>);
unsafe_pinned!(delay: Compat01As03<Delay>);

/// Create a new `Deadline` that completes when `future` completes or when
/// `deadline` is reached.
Expand Down
1 change: 1 addition & 0 deletions tarpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", fea
bincode-transport = { path = "../bincode-transport" }
env_logger = "0.5"
tokio = "0.1"
tokio-executor = "0.1"
34 changes: 15 additions & 19 deletions tarpc/examples/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
#![plugin(tarpc_plugins)]

use futures::{
compat::TokioDefaultSpawner,
future::{self, Ready},
prelude::*,
spawn, Future,
Future,
};
use rpc::{
client, context,
Expand Down Expand Up @@ -65,17 +64,15 @@ impl Subscriber {
async fn listen(id: u32, config: server::Config) -> io::Result<SocketAddr> {
let incoming = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = incoming.local_addr();
spawn!(
tokio_executor::spawn(
Server::new(config)
.incoming(incoming)
.take(1)
.respond_with(subscriber::serve(Subscriber { id }))
).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Could not spawn server: {:?}", e),
)
})?;
.unit_error()
.boxed()
.compat()
);
Ok(addr)
}
}
Expand Down Expand Up @@ -119,7 +116,7 @@ impl publisher::Service for Publisher {
addr: SocketAddr,
) -> io::Result<()> {
let conn = await!(bincode_transport::connect(&addr))?;
let subscriber = await!(subscriber::new_stub(client::Config::default(), conn));
let subscriber = await!(subscriber::new_stub(client::Config::default(), conn))?;
println!("Subscribing {}.", id);
clients.lock().unwrap().insert(id, subscriber);
Ok(())
Expand Down Expand Up @@ -147,17 +144,15 @@ async fn run() -> io::Result<()> {
env_logger::init();
let transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let publisher_addr = transport.local_addr();
spawn!(
tokio_executor::spawn(
Server::new(server::Config::default())
.incoming(transport)
.take(1)
.respond_with(publisher::serve(Publisher::new()))
).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Could not spawn server: {:?}", e),
)
})?;
.unit_error()
.boxed()
.compat()
);

let subscriber1 = await!(Subscriber::listen(0, server::Config::default()))?;
let subscriber2 = await!(Subscriber::listen(1, server::Config::default()))?;
Expand All @@ -167,7 +162,7 @@ async fn run() -> io::Result<()> {
let mut publisher = await!(publisher::new_stub(
client::Config::default(),
publisher_conn
));
))?;

if let Err(e) = await!(publisher.subscribe(context::current(), 0, subscriber1))? {
eprintln!("Couldn't subscribe subscriber 0: {}", e);
Expand All @@ -188,7 +183,8 @@ fn main() {
run()
.boxed()
.map_err(|e| panic!(e))
.compat(TokioDefaultSpawner),
.boxed()
.compat(),
);
thread::sleep(Duration::from_millis(100));
}
8 changes: 3 additions & 5 deletions tarpc/examples/readme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
#![plugin(tarpc_plugins)]

use futures::{
compat::TokioDefaultSpawner,
future::{self, Ready},
prelude::*,
spawn,
};
use rpc::{
client, context,
Expand Down Expand Up @@ -64,14 +62,14 @@ async fn run() -> io::Result<()> {
// the generated Service trait.
.respond_with(serve(HelloServer));

spawn!(server).unwrap();
tokio_executor::spawn(server.unit_error().boxed().compat());

let transport = await!(bincode_transport::connect(&addr))?;

// new_stub is generated by the tarpc::service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
// by the service mcro.
let mut client = await!(new_stub(client::Config::default(), transport));
let mut client = await!(new_stub(client::Config::default(), transport))?;

// The client has an RPC method for each RPC defined in tarpc::service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context
Expand All @@ -88,6 +86,6 @@ fn main() {
run()
.map_err(|e| eprintln!("Oh no: {}", e))
.boxed()
.compat(TokioDefaultSpawner),
.compat(),
);
}
Loading

0 comments on commit 6273337

Please sign in to comment.