Skip to content

Commit

Permalink
Implement Actor for async functions without result
Browse files Browse the repository at this point in the history
This allows us to drop Result<(), !> as return type from many simple
actors.
  • Loading branch information
Thomasdezeeuw committed Feb 12, 2021
1 parent 512fb98 commit fe0552f
Show file tree
Hide file tree
Showing 29 changed files with 147 additions and 244 deletions.
3 changes: 1 addition & 2 deletions examples/1_hello_world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ fn add_greeter_actor(mut runtime_ref: RuntimeRef) -> Result<(), !> {
/// Our greeter actor.
///
/// We'll receive a single message and print it.
async fn greeter_actor(mut ctx: actor::Context<&'static str>) -> Result<(), !> {
async fn greeter_actor(mut ctx: actor::Context<&'static str>) {
// All actors have an actor context, which give the actor access to, among
// other things, its inbox from which it can receive a message.
while let Ok(name) = ctx.receive_next().await {
println!("Hello {}", name);
}
Ok(())
}
8 changes: 2 additions & 6 deletions examples/3_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,18 @@ fn add_rpc_actor(mut runtime_ref: RuntimeRef) -> Result<(), !> {
Ok(())
}

async fn ping_actor(_: actor::Context<!>, actor_ref: ActorRef<PongMessage>) -> Result<(), !> {
async fn ping_actor(_: actor::Context<!>, actor_ref: ActorRef<PongMessage>) {
// Make a Remote Procedure Call (RPC) and await the response.
match actor_ref.rpc(Ping).await {
Ok(response) => println!("Got a RPC response: {}", response),
Err(err) => eprintln!("RPC request error: {}", err),
}

Ok(())
}

// Message type to support the ping-pong RPC.
type PongMessage = RpcMessage<Ping, Pong>;

async fn pong_actor(mut ctx: actor::Context<PongMessage>) -> Result<(), !> {
async fn pong_actor(mut ctx: actor::Context<PongMessage>) {
// Await a message, same as all other messages.
while let Ok(msg) = ctx.receive_next().await {
// Next we respond to the request.
Expand All @@ -49,8 +47,6 @@ async fn pong_actor(mut ctx: actor::Context<PongMessage>) -> Result<(), !> {
eprintln!("failed to respond to RPC: {}", err);
}
}

Ok(())
}

struct Ping;
Expand Down
6 changes: 2 additions & 4 deletions examples/6_process_signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn sync_actor(mut ctx: SyncContext<Message>) {
println!("shutting down the synchronous actor");
}

async fn thread_safe_actor(mut ctx: actor::Context<Message, ThreadSafe>) -> Result<(), !> {
async fn thread_safe_actor(mut ctx: actor::Context<Message, ThreadSafe>) {
while let Ok(msg) = ctx.receive_next().await {
match msg {
Message::Print(msg) => println!("Got a message: {}", msg),
Expand All @@ -88,10 +88,9 @@ async fn thread_safe_actor(mut ctx: actor::Context<Message, ThreadSafe>) -> Resu
}

println!("shutting down the thread local actor");
Ok(())
}

async fn local_actor(mut ctx: actor::Context<Message>) -> Result<(), !> {
async fn local_actor(mut ctx: actor::Context<Message>) {
while let Ok(msg) = ctx.receive_next().await {
match msg {
Message::Print(msg) => println!("Got a message: {}", msg),
Expand All @@ -100,5 +99,4 @@ async fn local_actor(mut ctx: actor::Context<Message>) -> Result<(), !> {
}

println!("shutting down the thread safe actor");
Ok(())
}
3 changes: 1 addition & 2 deletions examples/8_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn relay_actor(
}

/// Sync actor that prints all messages it receives.
fn print_actor(mut ctx: SyncContext<&'static str>) -> Result<(), !> {
fn print_actor(mut ctx: SyncContext<&'static str>) {
loop {
// Start timing of receiving a message.
let timing = ctx.start_trace();
Expand All @@ -95,5 +95,4 @@ fn print_actor(mut ctx: SyncContext<&'static str>) -> Result<(), !> {
println!("Received message: {}", msg);
ctx.finish_trace(timing, "printing message", &[("message", &msg)]);
}
Ok(())
}
7 changes: 3 additions & 4 deletions examples/99_stress_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@ fn main() -> Result<(), rt::Error> {
}

/// Our "actor", but it doesn't do much.
async fn actor(_: actor::Context<!>) -> Result<(), !> {
Ok(())
async fn actor(_: actor::Context<!>) {
/* Nothing. */
}

async fn control_actor(_: actor::Context<!>) -> Result<(), !> {
async fn control_actor(_: actor::Context<!>) {
info!("Running, check the memory usage!");
info!("Send a signal (e.g. by pressing Ctrl-C) to stop.");
Ok(())
}
10 changes: 3 additions & 7 deletions src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,12 @@ impl<M, C> Context<M, C> {
///
/// use heph::actor;
///
/// async fn greeter_actor(mut ctx: actor::Context<String>) -> Result<(), !> {
/// async fn greeter_actor(mut ctx: actor::Context<String>) {
/// if let Ok(name) = ctx.try_receive_next() {
/// println!("Hello: {}", name);
/// } else {
/// println!("Hello world");
/// }
/// Ok(())
/// }
///
/// # // Use the `greeter_actor` function to silence dead code warning.
Expand All @@ -111,11 +110,10 @@ impl<M, C> Context<M, C> {
///
/// use heph::actor;
///
/// async fn print_actor(mut ctx: actor::Context<String>) -> Result<(), !> {
/// async fn print_actor(mut ctx: actor::Context<String>) {
/// if let Ok(msg) = ctx.receive_next().await {
/// println!("Got a message: {}", msg);
/// }
/// Ok(())
/// }
///
/// # // Use the `print_actor` function to silence dead code warning.
Expand All @@ -134,7 +132,7 @@ impl<M, C> Context<M, C> {
/// use heph::timer::Timer;
/// use heph::util::either;
///
/// async fn print_actor(mut ctx: actor::Context<String>) -> Result<(), !> {
/// async fn print_actor(mut ctx: actor::Context<String>) {
/// // Create a timer, this will be ready once the timeout has
/// // passed.
/// let timeout = Timer::timeout(&mut ctx, Duration::from_millis(100));
Expand All @@ -147,8 +145,6 @@ impl<M, C> Context<M, C> {
/// Ok(Err(_)) => println!("No message"),
/// Err(_) => println!("Timed out receiving message"),
/// }
///
/// Ok(())
/// }
///
/// # // Use the `print_actor` function to silence dead code warning.
Expand Down
46 changes: 38 additions & 8 deletions src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,11 @@ pub trait NewActor {
/// }
///
/// /// Our actor implementation that prints all messages it receives.
/// async fn actor(mut ctx: actor::Context<Message>) -> Result<(), !> {
/// async fn actor(mut ctx: actor::Context<Message>) {
/// if let Ok(msg) = ctx.receive_next().await {
/// # assert_eq!(msg, Message::String("Hello world".to_owned()));
/// println!("received message: {:?}", msg);
/// }
/// Ok(())
/// }
/// ```
type Message;
Expand Down Expand Up @@ -463,13 +462,13 @@ impl_new_actor!(
/// The `Actor` trait defines how the actor is run.
///
/// Effectively an `Actor` is a [`Future`] which returns a `Result<(), Error>`,
/// where `Error` is defined on the trait. That is why there is a blanket
/// implementation for all `Future`s with a `Result<(), Error>` as `Output`
/// type.
/// where `Error` is defined on the trait. All `Future`s where the [`Output`]
/// type is `Result<(), Error>` or `()` implement the `Actor` trait.
///
/// The easiest way to implement this by using an async function, see the
/// [module level] documentation.
///
/// [`Output`]: Future::Output
/// [module level]: crate::actor
///
/// # Panics
Expand Down Expand Up @@ -500,17 +499,48 @@ pub trait Actor {
-> Poll<Result<(), Self::Error>>;
}

impl<Fut, E> Actor for Fut
/// Supported are [`Future`]s with `Result<(), E>` or `()` [`Output`].
///
/// [`Output`]: Future::Output
impl<Fut, O, E> Actor for Fut
where
Fut: Future<Output = Result<(), E>>,
Fut: Future<Output = O>,
O: private::ActorResult<Error = E>,
{
type Error = E;

fn try_poll(
self: Pin<&mut Self>,
ctx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.poll(ctx)
self.poll(ctx).map(private::ActorResult::into)
}
}

mod private {
/// Trait to support [`Actor`] for `Result<(), E>` and `()`.
pub trait ActorResult {
/// See [`Actor::Error`].
type Error;

/// Convert the return type in an `Result<(), Self::Error>`.
fn into(self) -> Result<(), Self::Error>;
}

impl<E> ActorResult for Result<(), E> {
type Error = E;

fn into(self) -> Result<(), E> {
self
}
}

impl ActorResult for () {
type Error = !;

fn into(self) -> Result<(), !> {
Ok(())
}
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/actor_ref/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@
//! }
//!
//! /// Our actor.
//! async fn actor(mut ctx: actor::Context<String>) -> Result<(), !> {
//! async fn actor(mut ctx: actor::Context<String>) {
//! if let Ok(msg) = ctx.receive_next().await {
//! println!("got message: {}", msg);
//! }
//! Ok(())
//! }
//! ```
//!
Expand Down Expand Up @@ -88,15 +87,14 @@
//! }
//!
//! /// Our actor.
//! async fn actor(mut ctx: actor::Context<String>) -> Result<(), !> {
//! async fn actor(mut ctx: actor::Context<String>) {
//! if let Ok(msg) = ctx.receive_next().await {
//! println!("First message: {}", msg);
//! }
//!
//! if let Ok(msg) = ctx.receive_next().await {
//! println!("Second message: {}", msg);
//! }
//! Ok(())
//! }
//! ```

Expand Down
10 changes: 3 additions & 7 deletions src/actor_ref/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
//! }
//!
//! /// Receiving actor of the RPC.
//! async fn counter(mut ctx: actor::Context<Add>) -> Result<(), !> {
//! async fn counter(mut ctx: actor::Context<Add>) {
//! // State of the counter.
//! let mut count: usize = 0;
//! // Receive a message like normal.
Expand All @@ -46,12 +46,10 @@
//! // Send back the current state, ignoring any errors.
//! let _ = response.respond(count);
//! }
//! // And we're done.
//! Ok(())
//! }
//!
//! /// Sending actor of the RPC.
//! async fn requester(_: actor::Context<!>, actor_ref: ActorRef<Add>) -> Result<(), !> {
//! async fn requester(_: actor::Context<!>, actor_ref: ActorRef<Add>) {
//! // Make the procedure call.
//! let response = actor_ref.rpc(10).await;
//! # assert!(response.is_ok());
Expand All @@ -61,7 +59,6 @@
//! // Actor failed to respond.
//! Err(err) => eprintln!("Counter didn't reply: {}", err),
//! }
//! Ok(())
//! }
//!
//! # fn main() -> Result<(), rt::Error> {
Expand Down Expand Up @@ -133,7 +130,7 @@
//! }
//!
//! /// Sending actor of the RPC.
//! async fn requester(_: actor::Context<!>, actor_ref: ActorRef<Message>) -> Result<(), !> {
//! async fn requester(_: actor::Context<!>, actor_ref: ActorRef<Message>) {
//! // Increase the counter by ten.
//! // NOTE: do handle the errors correctly in practice, this is just an
//! // example.
Expand All @@ -144,7 +141,6 @@
//! let count = actor_ref.rpc(()).await.unwrap();
//! # assert_eq!(count, 10);
//! println!("Current count {}", count);
//! Ok(())
//! }
//!
//! # fn main() -> Result<(), rt::Error> {
Expand Down
4 changes: 1 addition & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@
//! #
//! # use heph::actor;
//! #
//! async fn actor(mut ctx: actor::Context<String>) -> Result<(), !> {
//! async fn actor(mut ctx: actor::Context<String>) {
//! // Receive a message.
//! if let Ok(msg) = ctx.receive_next().await {
//! // Print the message.
//! println!("got a message: {}", msg);
//! }
//! // And we're done.
//! Ok(())
//! }
//! #
//! # drop(actor); // Silence dead code warnings.
Expand Down
4 changes: 1 addition & 3 deletions src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@
//! Ok(())
//! }
//!
//! async fn greeter_actor(_: actor::Context<!>) -> Result<(), !> {
//! async fn greeter_actor(_: actor::Context<!>) {
//! // Log an informational message.
//! info!("Hello world");
//!
//! Ok(())
//! }
//! ```

Expand Down
3 changes: 1 addition & 2 deletions src/rt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ fn sync_worker_id() {
/// }
///
/// /// Our actor that greets people.
/// async fn actor(mut ctx: actor::Context<&'static str>, msg: &'static str) -> Result<(), !> {
/// async fn actor(mut ctx: actor::Context<&'static str>, msg: &'static str) {
/// // `msg` is the argument passed to `spawn` in the `setup` function
/// // above, in this example it was "Hello".
///
Expand All @@ -172,7 +172,6 @@ fn sync_worker_id() {
/// // This should print "Hello world"!
/// println!("{} {}", msg, name);
/// }
/// Ok(())
/// }
/// ```
pub struct Runtime<S = !> {
Expand Down
3 changes: 1 addition & 2 deletions src/rt/process/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,8 @@ fn process_data_runtime_increase() {
assert!(process.fair_runtime >= SLEEP_TIME);
}

async fn ok_actor(mut ctx: actor::Context<()>) -> Result<(), !> {
async fn ok_actor(mut ctx: actor::Context<()>) {
assert_eq!(ctx.receive_next().await, Ok(()));
Ok(())
}

#[test]
Expand Down
11 changes: 2 additions & 9 deletions src/rt/scheduler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ fn has_process() {
assert!(!scheduler.has_ready_process());
}

async fn simple_actor(_: actor::Context<!>) -> Result<(), !> {
Ok(())
}
async fn simple_actor(_: actor::Context<!>) {}

#[test]
fn add_actor() {
Expand Down Expand Up @@ -239,13 +237,8 @@ fn scheduler_run_order() {
// The order in which the processes have been run.
let run_order = Rc::new(RefCell::new(Vec::new()));

async fn order_actor(
_: actor::Context<!>,
id: usize,
order: Rc<RefCell<Vec<usize>>>,
) -> Result<(), !> {
async fn order_actor(_: actor::Context<!>, id: usize, order: Rc<RefCell<Vec<usize>>>) {
order.borrow_mut().push(id);
Ok(())
}

// Add our processes.
Expand Down
Loading

0 comments on commit fe0552f

Please sign in to comment.