diff --git a/examples/1_hello_world.rs b/examples/1_hello_world.rs index 029e8cf4b..576c9a1cc 100644 --- a/examples/1_hello_world.rs +++ b/examples/1_hello_world.rs @@ -33,11 +33,10 @@ fn add_greeter_actor(mut runtime_ref: RuntimeRef) -> Result<(), !> { /// Our greeter actor. /// /// We'll receive a single message and print it. -async fn greeter_actor(mut ctx: actor::Context<&'static str>) -> Result<(), !> { +async fn greeter_actor(mut ctx: actor::Context<&'static str>) { // All actors have an actor context, which give the actor access to, among // other things, its inbox from which it can receive a message. while let Ok(name) = ctx.receive_next().await { println!("Hello {}", name); } - Ok(()) } diff --git a/examples/3_rpc.rs b/examples/3_rpc.rs index e3212267f..7a0e41f8b 100644 --- a/examples/3_rpc.rs +++ b/examples/3_rpc.rs @@ -22,20 +22,18 @@ fn add_rpc_actor(mut runtime_ref: RuntimeRef) -> Result<(), !> { Ok(()) } -async fn ping_actor(_: actor::Context, actor_ref: ActorRef) -> Result<(), !> { +async fn ping_actor(_: actor::Context, actor_ref: ActorRef) { // Make a Remote Procedure Call (RPC) and await the response. match actor_ref.rpc(Ping).await { Ok(response) => println!("Got a RPC response: {}", response), Err(err) => eprintln!("RPC request error: {}", err), } - - Ok(()) } // Message type to support the ping-pong RPC. type PongMessage = RpcMessage; -async fn pong_actor(mut ctx: actor::Context) -> Result<(), !> { +async fn pong_actor(mut ctx: actor::Context) { // Await a message, same as all other messages. while let Ok(msg) = ctx.receive_next().await { // Next we respond to the request. @@ -49,8 +47,6 @@ async fn pong_actor(mut ctx: actor::Context) -> Result<(), !> { eprintln!("failed to respond to RPC: {}", err); } } - - Ok(()) } struct Ping; diff --git a/examples/6_process_signals.rs b/examples/6_process_signals.rs index 1b48b16cc..248892cb9 100644 --- a/examples/6_process_signals.rs +++ b/examples/6_process_signals.rs @@ -79,7 +79,7 @@ fn sync_actor(mut ctx: SyncContext) { println!("shutting down the synchronous actor"); } -async fn thread_safe_actor(mut ctx: actor::Context) -> Result<(), !> { +async fn thread_safe_actor(mut ctx: actor::Context) { while let Ok(msg) = ctx.receive_next().await { match msg { Message::Print(msg) => println!("Got a message: {}", msg), @@ -88,10 +88,9 @@ async fn thread_safe_actor(mut ctx: actor::Context) -> Resu } println!("shutting down the thread local actor"); - Ok(()) } -async fn local_actor(mut ctx: actor::Context) -> Result<(), !> { +async fn local_actor(mut ctx: actor::Context) { while let Ok(msg) = ctx.receive_next().await { match msg { Message::Print(msg) => println!("Got a message: {}", msg), @@ -100,5 +99,4 @@ async fn local_actor(mut ctx: actor::Context) -> Result<(), !> { } println!("shutting down the thread safe actor"); - Ok(()) } diff --git a/examples/8_tracing.rs b/examples/8_tracing.rs index 654b7918d..43f7d1b7b 100644 --- a/examples/8_tracing.rs +++ b/examples/8_tracing.rs @@ -77,7 +77,7 @@ async fn relay_actor( } /// Sync actor that prints all messages it receives. -fn print_actor(mut ctx: SyncContext<&'static str>) -> Result<(), !> { +fn print_actor(mut ctx: SyncContext<&'static str>) { loop { // Start timing of receiving a message. let timing = ctx.start_trace(); @@ -95,5 +95,4 @@ fn print_actor(mut ctx: SyncContext<&'static str>) -> Result<(), !> { println!("Received message: {}", msg); ctx.finish_trace(timing, "printing message", &[("message", &msg)]); } - Ok(()) } diff --git a/examples/99_stress_memory.rs b/examples/99_stress_memory.rs index bd7fc028d..0a7a41601 100644 --- a/examples/99_stress_memory.rs +++ b/examples/99_stress_memory.rs @@ -39,12 +39,11 @@ fn main() -> Result<(), rt::Error> { } /// Our "actor", but it doesn't do much. -async fn actor(_: actor::Context) -> Result<(), !> { - Ok(()) +async fn actor(_: actor::Context) { + /* Nothing. */ } -async fn control_actor(_: actor::Context) -> Result<(), !> { +async fn control_actor(_: actor::Context) { info!("Running, check the memory usage!"); info!("Send a signal (e.g. by pressing Ctrl-C) to stop."); - Ok(()) } diff --git a/src/actor/context.rs b/src/actor/context.rs index 8bd1a1e52..013969c91 100644 --- a/src/actor/context.rs +++ b/src/actor/context.rs @@ -82,13 +82,12 @@ impl Context { /// /// use heph::actor; /// - /// async fn greeter_actor(mut ctx: actor::Context) -> Result<(), !> { + /// async fn greeter_actor(mut ctx: actor::Context) { /// if let Ok(name) = ctx.try_receive_next() { /// println!("Hello: {}", name); /// } else { /// println!("Hello world"); /// } - /// Ok(()) /// } /// /// # // Use the `greeter_actor` function to silence dead code warning. @@ -111,11 +110,10 @@ impl Context { /// /// use heph::actor; /// - /// async fn print_actor(mut ctx: actor::Context) -> Result<(), !> { + /// async fn print_actor(mut ctx: actor::Context) { /// if let Ok(msg) = ctx.receive_next().await { /// println!("Got a message: {}", msg); /// } - /// Ok(()) /// } /// /// # // Use the `print_actor` function to silence dead code warning. @@ -134,7 +132,7 @@ impl Context { /// use heph::timer::Timer; /// use heph::util::either; /// - /// async fn print_actor(mut ctx: actor::Context) -> Result<(), !> { + /// async fn print_actor(mut ctx: actor::Context) { /// // Create a timer, this will be ready once the timeout has /// // passed. /// let timeout = Timer::timeout(&mut ctx, Duration::from_millis(100)); @@ -147,8 +145,6 @@ impl Context { /// Ok(Err(_)) => println!("No message"), /// Err(_) => println!("Timed out receiving message"), /// } - /// - /// Ok(()) /// } /// /// # // Use the `print_actor` function to silence dead code warning. diff --git a/src/actor/mod.rs b/src/actor/mod.rs index 0ac53b702..5b635de5d 100644 --- a/src/actor/mod.rs +++ b/src/actor/mod.rs @@ -183,12 +183,11 @@ pub trait NewActor { /// } /// /// /// Our actor implementation that prints all messages it receives. - /// async fn actor(mut ctx: actor::Context) -> Result<(), !> { + /// async fn actor(mut ctx: actor::Context) { /// if let Ok(msg) = ctx.receive_next().await { /// # assert_eq!(msg, Message::String("Hello world".to_owned())); /// println!("received message: {:?}", msg); /// } - /// Ok(()) /// } /// ``` type Message; @@ -463,13 +462,13 @@ impl_new_actor!( /// The `Actor` trait defines how the actor is run. /// /// Effectively an `Actor` is a [`Future`] which returns a `Result<(), Error>`, -/// where `Error` is defined on the trait. That is why there is a blanket -/// implementation for all `Future`s with a `Result<(), Error>` as `Output` -/// type. +/// where `Error` is defined on the trait. All `Future`s where the [`Output`] +/// type is `Result<(), Error>` or `()` implement the `Actor` trait. /// /// The easiest way to implement this by using an async function, see the /// [module level] documentation. /// +/// [`Output`]: Future::Output /// [module level]: crate::actor /// /// # Panics @@ -500,9 +499,13 @@ pub trait Actor { -> Poll>; } -impl Actor for Fut +/// Supported are [`Future`]s with `Result<(), E>` or `()` [`Output`]. +/// +/// [`Output`]: Future::Output +impl Actor for Fut where - Fut: Future>, + Fut: Future, + O: private::ActorResult, { type Error = E; @@ -510,7 +513,34 @@ where self: Pin<&mut Self>, ctx: &mut task::Context<'_>, ) -> Poll> { - self.poll(ctx) + self.poll(ctx).map(private::ActorResult::into) + } +} + +mod private { + /// Trait to support [`Actor`] for `Result<(), E>` and `()`. + pub trait ActorResult { + /// See [`Actor::Error`]. + type Error; + + /// Convert the return type in an `Result<(), Self::Error>`. + fn into(self) -> Result<(), Self::Error>; + } + + impl ActorResult for Result<(), E> { + type Error = E; + + fn into(self) -> Result<(), E> { + self + } + } + + impl ActorResult for () { + type Error = !; + + fn into(self) -> Result<(), !> { + Ok(()) + } } } diff --git a/src/actor_ref/mod.rs b/src/actor_ref/mod.rs index 4de1ac574..8fb1a3130 100644 --- a/src/actor_ref/mod.rs +++ b/src/actor_ref/mod.rs @@ -46,11 +46,10 @@ //! } //! //! /// Our actor. -//! async fn actor(mut ctx: actor::Context) -> Result<(), !> { +//! async fn actor(mut ctx: actor::Context) { //! if let Ok(msg) = ctx.receive_next().await { //! println!("got message: {}", msg); //! } -//! Ok(()) //! } //! ``` //! @@ -88,7 +87,7 @@ //! } //! //! /// Our actor. -//! async fn actor(mut ctx: actor::Context) -> Result<(), !> { +//! async fn actor(mut ctx: actor::Context) { //! if let Ok(msg) = ctx.receive_next().await { //! println!("First message: {}", msg); //! } @@ -96,7 +95,6 @@ //! if let Ok(msg) = ctx.receive_next().await { //! println!("Second message: {}", msg); //! } -//! Ok(()) //! } //! ``` diff --git a/src/actor_ref/rpc.rs b/src/actor_ref/rpc.rs index f5b5c0fe1..7c6be5958 100644 --- a/src/actor_ref/rpc.rs +++ b/src/actor_ref/rpc.rs @@ -37,7 +37,7 @@ //! } //! //! /// Receiving actor of the RPC. -//! async fn counter(mut ctx: actor::Context) -> Result<(), !> { +//! async fn counter(mut ctx: actor::Context) { //! // State of the counter. //! let mut count: usize = 0; //! // Receive a message like normal. @@ -46,12 +46,10 @@ //! // Send back the current state, ignoring any errors. //! let _ = response.respond(count); //! } -//! // And we're done. -//! Ok(()) //! } //! //! /// Sending actor of the RPC. -//! async fn requester(_: actor::Context, actor_ref: ActorRef) -> Result<(), !> { +//! async fn requester(_: actor::Context, actor_ref: ActorRef) { //! // Make the procedure call. //! let response = actor_ref.rpc(10).await; //! # assert!(response.is_ok()); @@ -61,7 +59,6 @@ //! // Actor failed to respond. //! Err(err) => eprintln!("Counter didn't reply: {}", err), //! } -//! Ok(()) //! } //! //! # fn main() -> Result<(), rt::Error> { @@ -133,7 +130,7 @@ //! } //! //! /// Sending actor of the RPC. -//! async fn requester(_: actor::Context, actor_ref: ActorRef) -> Result<(), !> { +//! async fn requester(_: actor::Context, actor_ref: ActorRef) { //! // Increase the counter by ten. //! // NOTE: do handle the errors correctly in practice, this is just an //! // example. @@ -144,7 +141,6 @@ //! let count = actor_ref.rpc(()).await.unwrap(); //! # assert_eq!(count, 10); //! println!("Current count {}", count); -//! Ok(()) //! } //! //! # fn main() -> Result<(), rt::Error> { diff --git a/src/lib.rs b/src/lib.rs index 2c361cff3..0b928050a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,14 +16,12 @@ //! # //! # use heph::actor; //! # -//! async fn actor(mut ctx: actor::Context) -> Result<(), !> { +//! async fn actor(mut ctx: actor::Context) { //! // Receive a message. //! if let Ok(msg) = ctx.receive_next().await { //! // Print the message. //! println!("got a message: {}", msg); //! } -//! // And we're done. -//! Ok(()) //! } //! # //! # drop(actor); // Silence dead code warnings. diff --git a/src/log.rs b/src/log.rs index 542c5d6ec..2ca0b8cc7 100644 --- a/src/log.rs +++ b/src/log.rs @@ -36,11 +36,9 @@ //! Ok(()) //! } //! -//! async fn greeter_actor(_: actor::Context) -> Result<(), !> { +//! async fn greeter_actor(_: actor::Context) { //! // Log an informational message. //! info!("Hello world"); -//! -//! Ok(()) //! } //! ``` diff --git a/src/rt/mod.rs b/src/rt/mod.rs index 7444ddeb2..441e329d7 100644 --- a/src/rt/mod.rs +++ b/src/rt/mod.rs @@ -162,7 +162,7 @@ fn sync_worker_id() { /// } /// /// /// Our actor that greets people. -/// async fn actor(mut ctx: actor::Context<&'static str>, msg: &'static str) -> Result<(), !> { +/// async fn actor(mut ctx: actor::Context<&'static str>, msg: &'static str) { /// // `msg` is the argument passed to `spawn` in the `setup` function /// // above, in this example it was "Hello". /// @@ -172,7 +172,6 @@ fn sync_worker_id() { /// // This should print "Hello world"! /// println!("{} {}", msg, name); /// } -/// Ok(()) /// } /// ``` pub struct Runtime { diff --git a/src/rt/process/tests.rs b/src/rt/process/tests.rs index 59d5af2aa..1e8385cff 100644 --- a/src/rt/process/tests.rs +++ b/src/rt/process/tests.rs @@ -156,9 +156,8 @@ fn process_data_runtime_increase() { assert!(process.fair_runtime >= SLEEP_TIME); } -async fn ok_actor(mut ctx: actor::Context<()>) -> Result<(), !> { +async fn ok_actor(mut ctx: actor::Context<()>) { assert_eq!(ctx.receive_next().await, Ok(())); - Ok(()) } #[test] diff --git a/src/rt/scheduler/tests.rs b/src/rt/scheduler/tests.rs index 337535da2..013bac3f5 100644 --- a/src/rt/scheduler/tests.rs +++ b/src/rt/scheduler/tests.rs @@ -50,9 +50,7 @@ fn has_process() { assert!(!scheduler.has_ready_process()); } -async fn simple_actor(_: actor::Context) -> Result<(), !> { - Ok(()) -} +async fn simple_actor(_: actor::Context) {} #[test] fn add_actor() { @@ -239,13 +237,8 @@ fn scheduler_run_order() { // The order in which the processes have been run. let run_order = Rc::new(RefCell::new(Vec::new())); - async fn order_actor( - _: actor::Context, - id: usize, - order: Rc>>, - ) -> Result<(), !> { + async fn order_actor(_: actor::Context, id: usize, order: Rc>>) { order.borrow_mut().push(id); - Ok(()) } // Add our processes. diff --git a/src/rt/shared/scheduler/tests.rs b/src/rt/shared/scheduler/tests.rs index 6577a36c9..adccff2dd 100644 --- a/src/rt/shared/scheduler/tests.rs +++ b/src/rt/shared/scheduler/tests.rs @@ -25,9 +25,7 @@ fn is_send() { assert_send::(); } -async fn simple_actor(_: actor::Context) -> Result<(), !> { - Ok(()) -} +async fn simple_actor(_: actor::Context) {} #[test] fn adding_actor() { @@ -113,9 +111,8 @@ fn scheduler_run_order() { _: actor::Context, id: usize, order: Arc>>, - ) -> Result<(), !> { + ) { order.lock().unwrap().push(id); - Ok(()) } // Add our processes. diff --git a/src/supervisor.rs b/src/supervisor.rs index b48e264f4..edaa9fb0e 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -234,9 +234,8 @@ where /// } /// /// /// Our actor that never returns an error. -/// async fn actor(ctx: actor::Context<&'static str>) -> Result<(), !> { +/// async fn actor(ctx: actor::Context<&'static str>) { /// # drop(ctx); // Silence dead code warnings. -/// Ok(()) /// } /// ``` #[derive(Copy, Clone, Debug)] diff --git a/src/test.rs b/src/test.rs index d4ccc038f..1f6187d22 100644 --- a/src/test.rs +++ b/src/test.rs @@ -243,14 +243,12 @@ where /// use heph::actor; /// use heph::test::size_of_actor_val; /// -/// async fn actor(mut ctx: actor::Context) -> Result<(), !> { +/// async fn actor(mut ctx: actor::Context) { /// // Receive a message. /// if let Ok(msg) = ctx.receive_next().await { /// // Print the message. /// println!("got a message: {}", msg); /// } -/// // And we're done. -/// Ok(()) /// } /// /// assert_eq!(size_of_actor_val(&(actor as fn(_) -> _)), 64); @@ -266,8 +264,8 @@ where fn test_size_of_actor() { use crate::actor::context::ThreadLocal; - async fn actor1(_: actor::Context) -> Result<(), !> { - Ok(()) + async fn actor1(_: actor::Context) { + /* Nothing. */ } #[allow(trivial_casts)] diff --git a/src/timer.rs b/src/timer.rs index cf41fba20..432f92a61 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -64,7 +64,7 @@ impl From for io::Error { /// # Ok(()) /// # } /// # -/// async fn actor(mut ctx: actor::Context) -> Result<(), !> { +/// async fn actor(mut ctx: actor::Context) { /// # let start = Instant::now(); /// // Create a timer, this will be ready once the timeout has passed. /// let timeout = Timer::timeout(&mut ctx, Duration::from_millis(200)); @@ -74,7 +74,6 @@ impl From for io::Error { /// timeout.await; /// # assert!(Instant::now() >= start + Duration::from_millis(200)); /// println!("One second has passed!"); -/// Ok(()) /// } /// ``` #[derive(Debug)] @@ -200,7 +199,7 @@ impl actor::Bound for Timer { /// # } /// # } /// # -/// async fn actor(mut ctx: actor::Context) -> Result<(), !> { +/// async fn actor(mut ctx: actor::Context) { /// // `OtherFuture` is a type that implements `Future`. /// let future = IoFuture; /// // Create our deadline. @@ -214,7 +213,6 @@ impl actor::Bound for Timer { /// // However the other future is rather slow, so the timeout will pass. /// assert!(result.is_err()); /// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::TimedOut); -/// Ok(()) /// } /// ``` #[derive(Debug)] @@ -373,7 +371,7 @@ impl actor::Bound for Deadline { /// # Ok(()) /// # } /// -/// async fn actor(mut ctx: actor::Context) -> Result<(), !> { +/// async fn actor(mut ctx: actor::Context) { /// # let start = Instant::now(); /// let mut interval = Interval::new(&mut ctx, Duration::from_millis(200)); /// # assert!(interval.next_deadline() >= start + Duration::from_millis(200)); @@ -384,7 +382,6 @@ impl actor::Bound for Deadline { /// println!("Hello world"); /// # break; /// } -/// Ok(()) /// } /// ``` #[derive(Debug)] diff --git a/tests/functional.rs b/tests/functional.rs index 084c129ec..86183fa19 100644 --- a/tests/functional.rs +++ b/tests/functional.rs @@ -8,6 +8,7 @@ mod util; #[path = "functional"] // rustfmt can't find the files. mod functional { + mod actor; mod actor_context; mod actor_group; mod actor_ref; diff --git a/tests/functional/actor.rs b/tests/functional/actor.rs new file mode 100644 index 000000000..33c8f7eb2 --- /dev/null +++ b/tests/functional/actor.rs @@ -0,0 +1,21 @@ +//! Tests for the [`Actor`] trait. + +use heph::actor::{self, NewActor}; + +#[test] +fn future_output_result() { + // Actor is implemented for `Future>`. + async fn actor(_: actor::Context<()>) -> Result<(), ()> { + Ok(()) + } + is_new_actor(actor as fn(_) -> _); +} + +#[test] +fn future_output_tuple() { + // Actor is implemented for `Future`. + async fn actor(_: actor::Context<()>) {} + is_new_actor(actor as fn(_) -> _); +} + +fn is_new_actor(_: NA) {} diff --git a/tests/functional/actor_context.rs b/tests/functional/actor_context.rs index 7e461e1b9..7e2d7748c 100644 --- a/tests/functional/actor_context.rs +++ b/tests/functional/actor_context.rs @@ -19,7 +19,7 @@ fn thread_safe_is_send_sync() { assert_sync::>(); } -async fn local_actor_context_actor(mut ctx: actor::Context) -> Result<(), !> { +async fn local_actor_context_actor(mut ctx: actor::Context) { assert_eq!(ctx.try_receive_next(), Err(RecvError::Empty)); let msg = ctx.receive_next().await.unwrap(); @@ -27,7 +27,6 @@ async fn local_actor_context_actor(mut ctx: actor::Context) -> Result<(), assert_eq!(ctx.receive_next().await, Err(NoMessages)); assert_eq!(ctx.try_receive_next(), Err(RecvError::Disconnected)); - Ok(()) } #[test] @@ -48,7 +47,7 @@ fn local_actor_context() { assert_eq!(poll_actor(Pin::as_mut(&mut actor)), Poll::Ready(Ok(()))); } -async fn actor_ref_actor(mut ctx: actor::Context) -> Result<(), !> { +async fn actor_ref_actor(mut ctx: actor::Context) { assert_eq!(ctx.receive_next().await, Err(NoMessages)); // Send a message to ourselves. @@ -56,8 +55,6 @@ async fn actor_ref_actor(mut ctx: actor::Context) -> Result<(), !> { self_ref.send(123usize).await.unwrap(); let msg = ctx.receive_next().await.unwrap(); assert_eq!(msg, 123); - - Ok(()) } #[test] @@ -70,10 +67,9 @@ fn actor_ref() { assert_eq!(poll_actor(Pin::as_mut(&mut actor)), Poll::Ready(Ok(()))); } -async fn runtime_actor(mut ctx: actor::Context) -> Result<(), !> { +async fn runtime_actor(mut ctx: actor::Context) { let actor_ref = ctx.actor_ref(); ctx.runtime().receive_signals(actor_ref); - Ok(()) } #[test] @@ -86,7 +82,7 @@ fn runtime() { assert_eq!(poll_actor(Pin::as_mut(&mut actor)), Poll::Ready(Ok(()))); } -async fn thread_safe_try_spawn_actor(mut ctx: actor::Context) -> Result<(), !> { +async fn thread_safe_try_spawn_actor(mut ctx: actor::Context) { let actor_ref1 = ctx .try_spawn( NoSupervisor, @@ -104,14 +100,11 @@ async fn thread_safe_try_spawn_actor(mut ctx: actor::Context) actor_ref1.send(123usize).await.unwrap(); actor_ref2.send(123usize).await.unwrap(); - - Ok(()) } -async fn spawned_actor1(mut ctx: actor::Context) -> Result<(), !> { +async fn spawned_actor1(mut ctx: actor::Context) { let msg = ctx.receive_next().await.unwrap(); assert_eq!(msg, 123); - Ok(()) } #[test] diff --git a/tests/functional/actor_group.rs b/tests/functional/actor_group.rs index 870d7eb6a..1405568da 100644 --- a/tests/functional/actor_group.rs +++ b/tests/functional/actor_group.rs @@ -37,7 +37,7 @@ fn empty() { assert!(group.is_empty()); } -async fn expect_msgs(mut ctx: actor::Context, expected: Vec) -> Result<(), !> +async fn expect_msgs(mut ctx: actor::Context, expected: Vec) where M: Eq + fmt::Debug, { @@ -45,7 +45,6 @@ where let got = ctx.receive_next().await.expect("missing message"); assert_eq!(got, expected); } - Ok(()) } #[test] diff --git a/tests/functional/actor_ref.rs b/tests/functional/actor_ref.rs index 445975163..7bc6cac8e 100644 --- a/tests/functional/actor_ref.rs +++ b/tests/functional/actor_ref.rs @@ -43,7 +43,7 @@ fn send_value_future_is_send_sync() { assert_sync::>(); } -async fn expect_msgs(mut ctx: actor::Context, expected: Vec) -> Result<(), !> +async fn expect_msgs(mut ctx: actor::Context, expected: Vec) where M: Eq + fmt::Debug, { @@ -51,7 +51,6 @@ where let got = ctx.receive_next().await.expect("missing message"); assert_eq!(got, expected); } - Ok(()) } #[test] @@ -95,14 +94,13 @@ fn try_send_disconnected() { assert_eq!(actor_ref.try_send(1usize), Err(SendError)); } -async fn relay_msgs(_: actor::Context, relay_ref: ActorRef, msgs: Vec) -> Result<(), !> +async fn relay_msgs(_: actor::Context, relay_ref: ActorRef, msgs: Vec) where M: Eq + fmt::Debug + Unpin, { for msg in msgs { relay_ref.send(msg).await.unwrap() } - Ok(()) } #[test] @@ -146,9 +144,8 @@ fn send_full_inbox() { assert_eq!(poll_actor(Pin::as_mut(&mut actor)), Poll::Ready(Ok(()))); } -async fn relay_error(_: actor::Context, relay_ref: ActorRef) -> Result<(), !> { +async fn relay_error(_: actor::Context, relay_ref: ActorRef) { assert_eq!(relay_ref.send(1usize).await, Err(SendError)); - Ok(()) } #[test] @@ -322,10 +319,9 @@ fn try_mapped_send() { // Sends zero to a `NonZeroUsize` mapped actor reference causing a conversion // error. -async fn send_error(_: actor::Context, relay_ref: ActorRef) -> Result<(), !> { +async fn send_error(_: actor::Context, relay_ref: ActorRef) { let res = relay_ref.send(0usize).await; assert_eq!(res, Err(SendError)); - Ok(()) } #[test] @@ -399,20 +395,18 @@ fn send_error_format() { assert_eq!(format!("{}", SendError), "unable to send message"); } -async fn wake_on_send(_: actor::Context, relay_ref: ActorRef) -> Result<(), !> { +async fn wake_on_send(_: actor::Context, relay_ref: ActorRef) { relay_ref .send(123usize) .await .expect("failed to send message"); - Ok(()) } -async fn wake_on_receive(mut ctx: actor::Context, expected: Vec) -> Result<(), !> { +async fn wake_on_receive(mut ctx: actor::Context, expected: Vec) { for expected in expected { let got = ctx.receive_next().await.expect("missing message"); assert_eq!(got, expected); } - Ok(()) } #[test] @@ -456,11 +450,10 @@ struct Ping; #[derive(Copy, Clone, Debug, Eq, PartialEq)] struct Pong; -async fn ping(_: actor::Context, relay_ref: ActorRef) -> Result<(), !> { +async fn ping(_: actor::Context, relay_ref: ActorRef) { let rpc = relay_ref.rpc(Ping); let res = rpc.await; assert_eq!(res, Ok(Pong)); - Ok(()) } enum RpcTestMessage { @@ -474,14 +467,13 @@ impl From> for RpcTestMessage { } } -async fn pong(mut ctx: actor::Context) -> Result<(), !> { +async fn pong(mut ctx: actor::Context) { while let Ok(msg) = ctx.receive_next().await { match msg { RpcTestMessage::Ping(msg) => msg.handle(|_| Pong).unwrap(), RpcTestMessage::Check => {} } } - Ok(()) } #[test] @@ -511,14 +503,10 @@ fn rpc() { ); } -async fn rpc_send_error_actor( - _: actor::Context, - relay_ref: ActorRef, -) -> Result<(), !> { +async fn rpc_send_error_actor(_: actor::Context, relay_ref: ActorRef) { let rpc = relay_ref.rpc(Ping); let res = rpc.await; assert_eq!(res, Err(RpcError::SendError)); - Ok(()) } #[test] @@ -574,14 +562,10 @@ fn rpc_full_inbox() { ); } -async fn ping_no_response( - _: actor::Context, - relay_ref: ActorRef, -) -> Result<(), !> { +async fn ping_no_response(_: actor::Context, relay_ref: ActorRef) { let rpc = relay_ref.rpc(Ping); let res = rpc.await; assert_eq!(res, Err(RpcError::NoResponse)); - Ok(()) } #[test] @@ -604,7 +588,7 @@ fn rpc_no_response() { ); } -async fn pong_respond_error(mut ctx: actor::Context) -> Result<(), !> { +async fn pong_respond_error(mut ctx: actor::Context) { while let Ok(msg) = ctx.receive_next().await { match msg { RpcTestMessage::Ping(RpcMessage { response, .. }) => { @@ -613,7 +597,6 @@ async fn pong_respond_error(mut ctx: actor::Context) -> Result<( RpcTestMessage::Check => {} } } - Ok(()) } #[test] @@ -637,7 +620,7 @@ fn rpc_respond_error() { ); } -async fn pong_is_connected(mut ctx: actor::Context) -> Result<(), !> { +async fn pong_is_connected(mut ctx: actor::Context) { while let Ok(msg) = ctx.receive_next().await { match msg { RpcTestMessage::Ping(RpcMessage { response, .. }) => { @@ -651,7 +634,6 @@ async fn pong_is_connected(mut ctx: actor::Context) -> Result<() RpcTestMessage::Check => {} } } - Ok(()) } #[test] @@ -683,24 +665,19 @@ fn rpc_error_format() { assert_eq!(format!("{}", RpcError::NoResponse), "no RPC response"); } -async fn wake_on_response( - _: actor::Context, - relay_ref: ActorRef, -) -> Result<(), !> { +async fn wake_on_response(_: actor::Context, relay_ref: ActorRef) { let rpc = relay_ref.rpc(Ping); let res = rpc.await; assert_eq!(res, Ok(Pong)); - Ok(()) } -async fn wake_on_rpc_receive(mut ctx: actor::Context) -> Result<(), !> { +async fn wake_on_rpc_receive(mut ctx: actor::Context) { while let Ok(msg) = ctx.receive_next().await { match msg { RpcTestMessage::Ping(msg) => msg.handle(|_| Pong).unwrap(), RpcTestMessage::Check => {} } } - Ok(()) } #[test] diff --git a/tests/functional/from_message.rs b/tests/functional/from_message.rs index bc189d7ab..dd240917c 100644 --- a/tests/functional/from_message.rs +++ b/tests/functional/from_message.rs @@ -53,7 +53,7 @@ fn from_message() { ); } -async fn ping_actor(_: actor::Context, actor_ref: ActorRef) -> Result<(), !> { +async fn ping_actor(_: actor::Context, actor_ref: ActorRef) { actor_ref.send("Hello!".to_owned()).await.unwrap(); let response = actor_ref.rpc("Rpc".to_owned()).await.unwrap(); @@ -61,11 +61,9 @@ async fn ping_actor(_: actor::Context, actor_ref: ActorRef) -> Resul let response = actor_ref.rpc(("Rpc2".to_owned(), 2)).await.unwrap(); assert_eq!(response, (1, 2)); - - Ok(()) } -async fn pong_actor(mut ctx: actor::Context) -> Result<(), !> { +async fn pong_actor(mut ctx: actor::Context) { let msg = ctx.receive_next().await.unwrap(); assert!(matches!(msg, Message::Msg(msg) if msg == "Hello!")); @@ -84,5 +82,4 @@ async fn pong_actor(mut ctx: actor::Context) -> Result<(), !> { } assert_eq!(count, 3); - Ok(()) } diff --git a/tests/functional/tcp/listener.rs b/tests/functional/tcp/listener.rs index bae3a36aa..b96fc9e23 100644 --- a/tests/functional/tcp/listener.rs +++ b/tests/functional/tcp/listener.rs @@ -17,7 +17,7 @@ use crate::util::{any_local_address, any_local_ipv6_address, run_actors}; #[test] fn local_addr() { - async fn actor(mut ctx: actor::Context) -> Result<(), !> { + async fn actor(mut ctx: actor::Context) { let address = "127.0.0.1:12345".parse().unwrap(); let mut listener = TcpListener::bind(&mut ctx, address).unwrap(); assert_eq!(listener.local_addr().unwrap(), address); @@ -26,8 +26,6 @@ fn local_addr() { let address = "[::1]:12345".parse().unwrap(); let mut listener = TcpListener::bind(&mut ctx, address).unwrap(); assert_eq!(listener.local_addr().unwrap(), address); - - Ok(()) } let actor = actor as fn(_) -> _; @@ -38,7 +36,7 @@ fn local_addr() { #[test] fn local_addr_port_zero() { - async fn actor(mut ctx: actor::Context) -> Result<(), !> { + async fn actor(mut ctx: actor::Context) { let address = any_local_address(); let mut listener = TcpListener::bind(&mut ctx, address).unwrap(); let got = listener.local_addr().unwrap(); @@ -51,8 +49,6 @@ fn local_addr_port_zero() { let got = listener.local_addr().unwrap(); assert_eq!(got.ip(), address.ip()); assert!(got.port() != 0); - - Ok(()) } let actor = actor as fn(_) -> _; @@ -63,15 +59,13 @@ fn local_addr_port_zero() { #[test] fn ttl() { - async fn actor(mut ctx: actor::Context) -> Result<(), !> { + async fn actor(mut ctx: actor::Context) { let mut listener = TcpListener::bind(&mut ctx, any_local_address()).unwrap(); let initial = listener.ttl().unwrap(); let expected = initial + 10; listener.set_ttl(expected).unwrap(); assert_eq!(listener.ttl().unwrap(), expected); - - Ok(()) } let actor = actor as fn(_) -> _; @@ -82,7 +76,7 @@ fn ttl() { const DATA: &[u8] = b"Hello world"; -async fn stream_actor(mut ctx: actor::Context) -> Result<(), !> +async fn stream_actor(mut ctx: actor::Context) where actor::Context: rt::Access, { @@ -94,7 +88,6 @@ where let n = stream.send(DATA).await.unwrap(); assert_eq!(n, DATA.len()); - Ok(()) } #[test] @@ -117,10 +110,7 @@ fn try_accept() { YieldOnce(false) } - async fn listener_actor( - mut ctx: actor::Context, - actor_ref: ActorRef, - ) -> Result<(), !> { + async fn listener_actor(mut ctx: actor::Context, actor_ref: ActorRef) { let mut listener = TcpListener::bind(&mut ctx, any_local_address()).unwrap(); let address = listener.local_addr().unwrap(); @@ -143,8 +133,6 @@ fn try_accept() { let n = stream.recv(&mut buf).await.unwrap(); assert_eq!(n, DATA.len()); assert_eq!(buf, DATA); - - Ok(()) } let stream_actor = stream_actor as fn(_) -> _; @@ -160,10 +148,7 @@ fn try_accept() { #[test] fn accept() { - async fn listener_actor( - mut ctx: actor::Context, - actor_ref: ActorRef, - ) -> Result<(), !> { + async fn listener_actor(mut ctx: actor::Context, actor_ref: ActorRef) { let mut listener = TcpListener::bind(&mut ctx, any_local_address()).unwrap(); let address = listener.local_addr().unwrap(); @@ -177,8 +162,6 @@ fn accept() { let n = stream.recv(&mut buf).await.unwrap(); assert_eq!(n, DATA.len()); assert_eq!(buf, DATA); - - Ok(()) } let stream_actor = stream_actor as fn(_) -> _; @@ -194,10 +177,7 @@ fn accept() { #[test] fn incoming() { - async fn listener_actor( - mut ctx: actor::Context, - actor_ref: ActorRef, - ) -> Result<(), !> { + async fn listener_actor(mut ctx: actor::Context, actor_ref: ActorRef) { let mut listener = TcpListener::bind(&mut ctx, any_local_address()).unwrap(); let address = listener.local_addr().unwrap(); @@ -212,8 +192,6 @@ fn incoming() { let n = stream.recv(&mut buf).await.unwrap(); assert_eq!(n, DATA.len()); assert_eq!(buf, DATA); - - Ok(()) } let stream_actor = stream_actor as fn(_) -> _; @@ -229,23 +207,18 @@ fn incoming() { #[test] fn actor_bound() { - async fn listener_actor1( - mut ctx: actor::Context, - actor_ref: ActorRef, - ) -> Result<(), !> + async fn listener_actor1(mut ctx: actor::Context, actor_ref: ActorRef) where actor::Context: rt::Access, { let listener = TcpListener::bind(&mut ctx, any_local_address()).unwrap(); actor_ref.send(listener).await.unwrap(); - Ok(()) } async fn listener_actor2( mut ctx: actor::Context, actor_ref: ActorRef, - ) -> Result<(), !> - where + ) where actor::Context: rt::Access, { let mut listener = ctx.receive_next().await.unwrap(); @@ -265,7 +238,6 @@ fn actor_bound() { let n = stream.recv(&mut buf).await.unwrap(); assert_eq!(n, DATA.len()); assert_eq!(buf, DATA); - Ok(()) } fn setup(mut runtime_ref: RuntimeRef) -> Result<(), !> { diff --git a/tests/functional/tcp/server.rs b/tests/functional/tcp/server.rs index 97ad11372..d86c01c64 100644 --- a/tests/functional/tcp/server.rs +++ b/tests/functional/tcp/server.rs @@ -55,7 +55,7 @@ where } } -async fn actor(_: actor::Context, mut stream: TcpStream, _: SocketAddr) -> Result<(), !> +async fn actor(_: actor::Context, mut stream: TcpStream, _: SocketAddr) where actor::Context: rt::Access, { @@ -63,7 +63,6 @@ where let n = stream.recv(&mut buf).await.unwrap(); assert_eq!(n, DATA.len()); assert_eq!(buf, DATA); - Ok(()) } const DATA: &[u8] = b"Hello world"; @@ -72,8 +71,7 @@ async fn stream_actor( mut ctx: actor::Context, address: SocketAddr, actor_ref: ActorRef, -) -> Result<(), !> -where +) where actor::Context: rt::Access, { let mut stream = TcpStream::connect(&mut ctx, address) @@ -86,8 +84,6 @@ where // Send a message to stop the listener. actor_ref.send(Terminate).await.unwrap(); - - Ok(()) } #[test] @@ -258,7 +254,7 @@ fn new_actor_error() { let (server_actor, _) = init_local_actor(server, ()).unwrap(); let server_actor: Box> = Box::new(ServerWrapper(server_actor)); - async fn stream_actor(mut ctx: actor::Context, address: SocketAddr) -> Result<(), !> + async fn stream_actor(mut ctx: actor::Context, address: SocketAddr) where actor::Context: rt::Access, { @@ -269,8 +265,6 @@ fn new_actor_error() { // Just need to create the connection. drop(stream); - - Ok(()) } let stream_actor = stream_actor as fn(_, _) -> _; diff --git a/tests/functional/tcp/stream.rs b/tests/functional/tcp/stream.rs index 92bde481f..237ba8f2d 100644 --- a/tests/functional/tcp/stream.rs +++ b/tests/functional/tcp/stream.rs @@ -1049,10 +1049,7 @@ fn peek_vectored() { #[test] fn shutdown_read() { - async fn listener_actor( - mut ctx: actor::Context, - actor_ref: ActorRef, - ) -> Result<(), !> { + async fn listener_actor(mut ctx: actor::Context, actor_ref: ActorRef) { let mut listener = TcpListener::bind(&mut ctx, any_local_address()).unwrap(); let address = listener.local_addr().unwrap(); @@ -1068,11 +1065,9 @@ fn shutdown_read() { let n = stream.recv(&mut buf).await.unwrap(); assert_eq!(n, DATA.len()); assert_eq!(buf, DATA); - - Ok(()) } - async fn stream_actor(mut ctx: actor::Context) -> Result<(), !> { + async fn stream_actor(mut ctx: actor::Context) { let address = ctx.receive_next().await.unwrap(); let mut stream = TcpStream::connect(&mut ctx, address) .unwrap() @@ -1086,8 +1081,6 @@ fn shutdown_read() { assert_eq!(n, 0); stream.send_all(DATA).await.unwrap(); - - Ok(()) } let stream_actor = stream_actor as fn(_) -> _; @@ -1103,10 +1096,7 @@ fn shutdown_read() { #[test] fn shutdown_write() { - async fn listener_actor( - mut ctx: actor::Context, - actor_ref: ActorRef, - ) -> Result<(), !> { + async fn listener_actor(mut ctx: actor::Context, actor_ref: ActorRef) { let mut listener = TcpListener::bind(&mut ctx, any_local_address()).unwrap(); let address = listener.local_addr().unwrap(); @@ -1122,10 +1112,9 @@ fn shutdown_write() { assert_eq!(n, 0); stream.send_all(DATA).await.unwrap(); - Ok(()) } - async fn stream_actor(mut ctx: actor::Context) -> Result<(), !> { + async fn stream_actor(mut ctx: actor::Context) { let address = ctx.receive_next().await.unwrap(); let mut stream = TcpStream::connect(&mut ctx, address) .unwrap() @@ -1141,8 +1130,6 @@ fn shutdown_write() { let n = stream.recv(&mut buf).await.unwrap(); assert_eq!(n, DATA.len()); assert_eq!(buf, DATA); - - Ok(()) } let stream_actor = stream_actor as fn(_) -> _; @@ -1158,10 +1145,7 @@ fn shutdown_write() { #[test] fn shutdown_both() { - async fn listener_actor( - mut ctx: actor::Context, - actor_ref: ActorRef, - ) -> Result<(), !> { + async fn listener_actor(mut ctx: actor::Context, actor_ref: ActorRef) { let mut listener = TcpListener::bind(&mut ctx, any_local_address()).unwrap(); let address = listener.local_addr().unwrap(); @@ -1174,11 +1158,9 @@ fn shutdown_both() { let mut buf = Vec::with_capacity(2); let n = stream.recv(&mut buf).await.unwrap(); assert_eq!(n, 0); - - Ok(()) } - async fn stream_actor(mut ctx: actor::Context) -> Result<(), !> { + async fn stream_actor(mut ctx: actor::Context) { let address = ctx.receive_next().await.unwrap(); let mut stream = TcpStream::connect(&mut ctx, address) .unwrap() @@ -1193,8 +1175,6 @@ fn shutdown_both() { let mut buf = Vec::with_capacity(2); let n = stream.recv(&mut buf).await.unwrap(); assert_eq!(n, 0); - - Ok(()) } let stream_actor = stream_actor as fn(_) -> _; diff --git a/tests/functional/timer.rs b/tests/functional/timer.rs index 2eb5f2ba7..834dc2de1 100644 --- a/tests/functional/timer.rs +++ b/tests/functional/timer.rs @@ -24,7 +24,7 @@ fn deadline_passed_into_io_error() { #[test] fn timer() { - async fn actor(mut ctx: actor::Context) -> Result<(), !> { + async fn actor(mut ctx: actor::Context) { let start = Instant::now(); let mut timer = Timer::timeout(&mut ctx, TIMEOUT); assert!(timer.deadline() >= start + TIMEOUT); @@ -33,7 +33,6 @@ fn timer() { let _ = (&mut timer).await; assert!(timer.deadline() >= start + TIMEOUT); assert!(timer.has_passed()); - Ok(()) } let actor = actor as fn(_) -> _; @@ -58,7 +57,7 @@ impl Future for Pending { #[test] fn timer_wrap() { - async fn actor(mut ctx: actor::Context) -> Result<(), !> { + async fn actor(mut ctx: actor::Context) { let start = Instant::now(); let future = Pending(123); let mut deadline = Timer::timeout(&mut ctx, TIMEOUT).wrap(future); @@ -69,7 +68,6 @@ fn timer_wrap() { assert_eq!(res, Err(DeadlinePassed)); assert!(deadline.deadline() >= start + TIMEOUT); assert!(deadline.has_passed()); - Ok(()) } let actor = actor as fn(_) -> _; @@ -83,7 +81,7 @@ fn timer_wrap() { #[test] fn deadline() { - async fn actor(mut ctx: actor::Context) -> Result<(), !> { + async fn actor(mut ctx: actor::Context) { let start = Instant::now(); let future = Pending(123); let mut deadline = Deadline::timeout(&mut ctx, TIMEOUT, future.clone()); @@ -99,7 +97,6 @@ fn deadline() { assert_eq!(*deadline.get_ref(), future); assert_eq!(*deadline.get_mut(), future); assert_eq!(deadline.into_inner(), future); - Ok(()) } let actor = actor as fn(_) -> _; @@ -113,12 +110,11 @@ fn deadline() { #[test] fn interval() { - async fn actor(mut ctx: actor::Context) -> Result<(), !> { + async fn actor(mut ctx: actor::Context) { let start = Instant::now(); let mut interval = Interval::new(&mut ctx, TIMEOUT); assert!(interval.next_deadline() >= start + TIMEOUT); let _ = next(&mut interval).await; - Ok(()) } let actor = actor as fn(_) -> _; @@ -132,16 +128,15 @@ fn interval() { #[test] fn triggered_timers_run_actors() { - async fn timer_actor(mut ctx: actor::Context) -> Result<(), !> + async fn timer_actor(mut ctx: actor::Context) where actor::Context: rt::Access, { let timer = Timer::timeout(&mut ctx, TIMEOUT); let _ = timer.await; - Ok(()) } - async fn deadline_actor(mut ctx: actor::Context) -> Result<(), !> + async fn deadline_actor(mut ctx: actor::Context) where actor::Context: rt::Access, { @@ -149,13 +144,11 @@ fn triggered_timers_run_actors() { let deadline = Deadline::timeout(&mut ctx, TIMEOUT, future); let res: Result<(), DeadlinePassed> = deadline.await; assert_eq!(res, Err(DeadlinePassed)); - Ok(()) } - async fn interval_actor(mut ctx: actor::Context) -> Result<(), !> { + async fn interval_actor(mut ctx: actor::Context) { let mut interval = Interval::new(&mut ctx, TIMEOUT); let _ = next(&mut interval).await; - Ok(()) } fn setup(mut runtime_ref: RuntimeRef) -> Result<(), !> { @@ -202,42 +195,35 @@ fn triggered_timers_run_actors() { #[test] fn timers_actor_bound() { - async fn timer_actor1( - mut ctx: actor::Context, - actor_ref: ActorRef, - ) -> Result<(), !> + async fn timer_actor1(mut ctx: actor::Context, actor_ref: ActorRef) where actor::Context: rt::Access, { let timer = Timer::timeout(&mut ctx, TIMEOUT); actor_ref.send(timer).await.unwrap(); - Ok(()) } - async fn timer_actor2(mut ctx: actor::Context) -> Result<(), !> + async fn timer_actor2(mut ctx: actor::Context) where actor::Context: rt::Access, { let mut timer = ctx.receive_next().await.unwrap(); timer.bind_to(&mut ctx).unwrap(); let _ = timer.await; - Ok(()) } async fn deadline_actor1( mut ctx: actor::Context, actor_ref: ActorRef>, - ) -> Result<(), !> - where + ) where actor::Context: rt::Access, { let future = Pending(123); let deadline = Deadline::timeout(&mut ctx, TIMEOUT, future); actor_ref.send(deadline).await.unwrap(); - Ok(()) } - async fn deadline_actor2(mut ctx: actor::Context, K>) -> Result<(), !> + async fn deadline_actor2(mut ctx: actor::Context, K>) where actor::Context, K>: rt::Access, { @@ -245,23 +231,17 @@ fn timers_actor_bound() { deadline.bind_to(&mut ctx).unwrap(); let res: Result<(), DeadlinePassed> = deadline.await; assert_eq!(res, Err(DeadlinePassed)); - Ok(()) } - async fn interval_actor1( - mut ctx: actor::Context, - actor_ref: ActorRef, - ) -> Result<(), !> { + async fn interval_actor1(mut ctx: actor::Context, actor_ref: ActorRef) { let interval = Interval::new(&mut ctx, TIMEOUT); actor_ref.send(interval).await.unwrap(); - Ok(()) } - async fn interval_actor2(mut ctx: actor::Context) -> Result<(), !> { + async fn interval_actor2(mut ctx: actor::Context) { let mut interval = ctx.receive_next().await.unwrap(); interval.bind_to(&mut ctx).unwrap(); let _ = next(&mut interval).await; - Ok(()) } fn setup(mut runtime_ref: RuntimeRef) -> Result<(), !> { diff --git a/tests/message_loss.rs b/tests/message_loss.rs index cd7bcd19d..41f16cca1 100644 --- a/tests/message_loss.rs +++ b/tests/message_loss.rs @@ -14,13 +14,13 @@ use std::task::Poll; use heph::actor::{self, NoMessages}; use heph::test::{init_local_actor, poll_actor, set_message_loss}; -async fn expect_1_messages(mut ctx: actor::Context) -> Result<(), !> { +async fn expect_1_messages(mut ctx: actor::Context) { let msg = ctx.receive_next().await.expect("missing first message"); assert_eq!(msg, 123); match ctx.receive_next().await { Ok(msg) => panic!("unexpected message: {:?}", msg), - Err(NoMessages) => Ok(()), + Err(NoMessages) => return, } }