Skip to content

Commit

Permalink
Make tests compile and use current_thread in tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
tikue committed Oct 15, 2018
1 parent e6741a2 commit 36e4846
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 66 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ members = [
"tarpc",
"plugins",
]

[patch."https://github.com/rust-lang-nursery/futures-rs"]
futures-preview = { git = "https://github.com/andreasots/futures-rs", branch = "compat-stack-overflow", features = ["compat", "tokio-compat"] }
6 changes: 3 additions & 3 deletions bincode-transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use bytes::{Bytes, BytesMut};
use crate::vendored::tokio_serde_bincode::{IoErrorWrapper, ReadBincode, WriteBincode};
use futures::{
Poll,
compat::{Compat, Future01CompatExt, Stream01CompatExt},
compat::{Compat01As03, Future01CompatExt, Stream01CompatExt},
prelude::*,
ready, task,
};
Expand Down Expand Up @@ -100,13 +100,13 @@ where
/// A [`TcpListener`] that wraps connections in bincode transports.
#[derive(Debug)]
pub struct Incoming<Item, SinkItem> {
incoming: Compat<tokio_tcp::Incoming>,
incoming: Compat01As03<tokio_tcp::Incoming>,
local_addr: SocketAddr,
ghost: PhantomData<(Item, SinkItem)>,
}

impl<Item, SinkItem> Incoming<Item, SinkItem> {
unsafe_pinned!(incoming: Compat<tokio_tcp::Incoming>);
unsafe_pinned!(incoming: Compat01As03<tokio_tcp::Incoming>);

/// Returns the address being listened on.
pub fn local_addr(&self) -> SocketAddr {
Expand Down
7 changes: 3 additions & 4 deletions rpc/src/util/deadline_compat.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use futures::{
compat::{Compat, Future01CompatExt},
compat::{Compat01As03, Future01CompatExt},
prelude::*,
ready, task::{Poll, LocalWaker},
};
use log::trace;
use pin_utils::unsafe_pinned;
use std::pin::Pin;
use std::time::Instant;
Expand All @@ -13,12 +12,12 @@ use tokio_timer::{timeout, Delay};
#[derive(Debug)]
pub struct Deadline<T> {
future: T,
delay: Compat<Delay>,
delay: Compat01As03<Delay>,
}

impl<T> Deadline<T> {
unsafe_pinned!(future: T);
unsafe_pinned!(delay: Compat<Delay>);
unsafe_pinned!(delay: Compat01As03<Delay>);

/// Create a new `Deadline` that completes when `future` completes or when
/// `deadline` is reached.
Expand Down
1 change: 1 addition & 0 deletions tarpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", fea
bincode-transport = { path = "../bincode-transport" }
env_logger = "0.5"
tokio = "0.1"
tokio-executor = "0.1"
34 changes: 15 additions & 19 deletions tarpc/examples/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
#![plugin(tarpc_plugins)]

use futures::{
compat::TokioDefaultSpawner,
future::{self, Ready},
prelude::*,
spawn, Future,
Future,
};
use rpc::{
client, context,
Expand Down Expand Up @@ -65,17 +64,15 @@ impl Subscriber {
async fn listen(id: u32, config: server::Config) -> io::Result<SocketAddr> {
let incoming = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = incoming.local_addr();
spawn!(
tokio_executor::spawn(
Server::new(config)
.incoming(incoming)
.take(1)
.respond_with(subscriber::serve(Subscriber { id }))
).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Could not spawn server: {:?}", e),
)
})?;
.unit_error()
.boxed()
.compat()
);
Ok(addr)
}
}
Expand Down Expand Up @@ -119,7 +116,7 @@ impl publisher::Service for Publisher {
addr: SocketAddr,
) -> io::Result<()> {
let conn = await!(bincode_transport::connect(&addr))?;
let subscriber = await!(subscriber::new_stub(client::Config::default(), conn));
let subscriber = await!(subscriber::new_stub(client::Config::default(), conn))?;
println!("Subscribing {}.", id);
clients.lock().unwrap().insert(id, subscriber);
Ok(())
Expand Down Expand Up @@ -147,17 +144,15 @@ async fn run() -> io::Result<()> {
env_logger::init();
let transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let publisher_addr = transport.local_addr();
spawn!(
tokio_executor::spawn(
Server::new(server::Config::default())
.incoming(transport)
.take(1)
.respond_with(publisher::serve(Publisher::new()))
).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Could not spawn server: {:?}", e),
)
})?;
.unit_error()
.boxed()
.compat()
);

let subscriber1 = await!(Subscriber::listen(0, server::Config::default()))?;
let subscriber2 = await!(Subscriber::listen(1, server::Config::default()))?;
Expand All @@ -167,7 +162,7 @@ async fn run() -> io::Result<()> {
let mut publisher = await!(publisher::new_stub(
client::Config::default(),
publisher_conn
));
))?;

if let Err(e) = await!(publisher.subscribe(context::current(), 0, subscriber1))? {
eprintln!("Couldn't subscribe subscriber 0: {}", e);
Expand All @@ -188,7 +183,8 @@ fn main() {
run()
.boxed()
.map_err(|e| panic!(e))
.compat(TokioDefaultSpawner),
.boxed()
.compat(),
);
thread::sleep(Duration::from_millis(100));
}
8 changes: 3 additions & 5 deletions tarpc/examples/readme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
#![plugin(tarpc_plugins)]

use futures::{
compat::TokioDefaultSpawner,
future::{self, Ready},
prelude::*,
spawn,
};
use rpc::{
client, context,
Expand Down Expand Up @@ -64,14 +62,14 @@ async fn run() -> io::Result<()> {
// the generated Service trait.
.respond_with(serve(HelloServer));

spawn!(server).unwrap();
tokio_executor::spawn(server.unit_error().boxed().compat());

let transport = await!(bincode_transport::connect(&addr))?;

// new_stub is generated by the tarpc::service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
// by the service mcro.
let mut client = await!(new_stub(client::Config::default(), transport));
let mut client = await!(new_stub(client::Config::default(), transport))?;

// The client has an RPC method for each RPC defined in tarpc::service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context
Expand All @@ -88,6 +86,6 @@ fn main() {
run()
.map_err(|e| eprintln!("Oh no: {}", e))
.boxed()
.compat(TokioDefaultSpawner),
.compat(),
);
}
12 changes: 5 additions & 7 deletions tarpc/examples/server_calling_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@

use crate::{add::Service as AddService, double::Service as DoubleService};
use futures::{
compat::TokioDefaultSpawner,
future::{self, Ready},
prelude::*,
spawn,
};
use rpc::{
client, context,
Expand Down Expand Up @@ -75,24 +73,24 @@ async fn run() -> io::Result<()> {
.incoming(add_listener)
.take(1)
.respond_with(add::serve(AddServer));
spawn!(add_server);
tokio_executor::spawn(add_server.unit_error().boxed().compat());

let to_add_server = await!(bincode_transport::connect(&addr))?;
let add_client = await!(add::new_stub(client::Config::default(), to_add_server));
let add_client = await!(add::new_stub(client::Config::default(), to_add_server))?;

let double_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = double_listener.local_addr();
let double_server = rpc::Server::new(server::Config::default())
.incoming(double_listener)
.take(1)
.respond_with(double::serve(DoubleServer { add_client }));
spawn!(double_server);
tokio_executor::spawn(double_server.unit_error().boxed().compat());

let to_double_server = await!(bincode_transport::connect(&addr))?;
let mut double_client = await!(double::new_stub(
client::Config::default(),
to_double_server
));
))?;

for i in 1..=5 {
println!("{:?}", await!(double_client.double(context::current(), i))?);
Expand All @@ -106,6 +104,6 @@ fn main() {
run()
.map_err(|e| panic!(e))
.boxed()
.compat(TokioDefaultSpawner),
.compat(),
);
}
14 changes: 7 additions & 7 deletions tarpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@
//! #![feature(plugin, futures_api, pin, arbitrary_self_types, await_macro, async_await, existential_type)]
//! #![plugin(tarpc_plugins)]
//!
//!
//! use futures::{
//! compat::TokioDefaultSpawner,
//! future::{self, Ready},
//! prelude::*,
//! spawn,
//! };
//! use tarpc::rpc::{
//! use tarpc::{
//! client, context,
//! server::{self, Handler, Server},
//! };
Expand Down Expand Up @@ -83,14 +83,14 @@
//! // the generated Service trait.
//! .respond_with(serve(HelloServer));
//!
//! spawn!(server).unwrap();
//! tokio_executor::spawn(server.unit_error().boxed().compat());
//!
//! let transport = await!(bincode_transport::connect(&addr))?;
//!
//! // new_stub is generated by the service! macro. Like Server, it takes a config and any
//! // Transport as input, and returns a Client, also generated by the macro.
//! // by the service mcro.
//! let mut client = await!(new_stub(client::Config::default(), transport));
//! let mut client = await!(new_stub(client::Config::default(), transport))?;
//!
//! // The client has an RPC method for each RPC defined in service!. It takes the same args
//! // as defined, with the addition of a Context, which is always the first arg. The Context
Expand All @@ -103,10 +103,11 @@
//! }
//!
//! fn main() {
//! tarpc::init(TokioDefaultSpawner);
//! tokio::run(run()
//! .map_err(|e| eprintln!("Oh no: {}", e))
//! .boxed()
//! .compat(TokioDefaultSpawner),
//! .compat(),
//! );
//! }
//! ```
Expand All @@ -125,8 +126,7 @@

#[doc(hidden)]
pub use futures;
#[doc(hidden)]
pub use rpc;
pub use rpc::*;
#[cfg(feature = "serde")]
#[doc(hidden)]
pub use serde;
Expand Down
Loading

0 comments on commit 36e4846

Please sign in to comment.