Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Actor trait for Futures that return () #166

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions examples/1_hello_world.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![feature(async_await, await_macro, futures_api, never_type)]

use heph::actor::Context;
use heph::supervisor::NoSupervisor;
use heph::system::{ActorOptions, ActorSystem, ActorSystemRef, RuntimeError};

fn main() -> Result<(), RuntimeError> {
Expand All @@ -26,7 +25,7 @@ fn add_greeter_actor(mut system_ref: ActorSystemRef) -> Result<(), !> {
// We'll use the default actor options here, other examples expand on the
// options available.
let actor = greeter_actor as fn(_) -> _;
let mut actor_ref = system_ref.spawn(NoSupervisor, actor, (), ActorOptions::default());
let mut actor_ref = system_ref.spawn_unsupervised(actor, (), ActorOptions::default());

// By default actors don't do anything when added to the actor system. We
// need to wake them, for example by sending them a message. If we didn't
Expand All @@ -44,10 +43,9 @@ fn add_greeter_actor(mut system_ref: ActorSystemRef) -> Result<(), !> {
/// Our greeter actor.
///
/// We'll receive a single message and print it.
async fn greeter_actor(mut ctx: Context<&'static str>) -> Result<(), !> {
async fn greeter_actor(mut ctx: Context<&'static str>) {
// All actors have an actor context, which give the actor access to its
// inbox, from which we can `receive` a message.
let name = await!(ctx.receive_next());
println!("Hello {}", name);
Ok(())
}
6 changes: 2 additions & 4 deletions examples/1b_hello_world.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![feature(async_await, await_macro, futures_api, never_type)]

use heph::actor::Context;
use heph::supervisor::NoSupervisor;
use heph::system::{ActorOptions, ActorSystem, ActorSystemRef, RuntimeError};

// The creation and running of the actor system is the same as in example 1.
Expand All @@ -21,7 +20,7 @@ fn add_greeter_actor(mut system_ref: ActorSystemRef) -> Result<(), !> {
// that don't have any (initial) external wakers, for example our
// `greeter_actor`.
let actor = greeter_actor as fn(_) -> _;
system_ref.spawn(NoSupervisor, actor, (), ActorOptions {
system_ref.spawn_unsupervised(actor, (), ActorOptions {
schedule: true,
.. ActorOptions::default()
});
Expand All @@ -32,7 +31,6 @@ fn add_greeter_actor(mut system_ref: ActorSystemRef) -> Result<(), !> {
/// Our greeter actor.
///
/// Note: this needs the `schedule` options when adding it to the actor system.
async fn greeter_actor(_: Context<!>) -> Result<(), !> {
async fn greeter_actor(_: Context<!>) {
println!("Hello World");
Ok(())
}
6 changes: 3 additions & 3 deletions examples/3_actor_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures_util::{AsyncReadExt, TryFutureExt};
use heph::actor::Context;
use heph::log::{self, error, info};
use heph::net::{TcpListener, TcpListenerError, TcpStream};
use heph::supervisor::{NoSupervisor, SupervisorStrategy};
use heph::supervisor::SupervisorStrategy;
use heph::system::options::Priority;
use heph::system::{ActorOptions, ActorSystem, ActorSystemRef, RuntimeError};

Expand Down Expand Up @@ -39,7 +39,7 @@ fn setup(mut system_ref: ActorSystemRef) -> io::Result<()> {
// In this example we'll use the Actor Registry. This also actors to be
// lookup dynamically at runtime, see the `echo_actor` for an example of
// that.
system_ref.spawn(NoSupervisor, count_actor as fn(_) -> _, (), ActorOptions {
system_ref.spawn_unsupervised(count_actor as fn(_) -> _, (), ActorOptions {
// To add the actor to the Actor Registry we simply set the `register`
// option. This registers the actor in the Actor Registry and allows it
// to be looked up, see the `echo_actor` below.
Expand Down Expand Up @@ -67,7 +67,7 @@ struct Add;
/// that if you run this example it could display a total count of 1 twice. This
/// means that thread 1 handled the first request and thread 2 handled the
/// second, be careful of this when implementing a counter this way.
async fn count_actor(mut ctx: Context<Add>) -> Result<(), !> {
async fn count_actor(mut ctx: Context<Add>) {
let mut total = 0;
loop {
let _msg = await!(ctx.receive_next());
Expand Down
2 changes: 1 addition & 1 deletion src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::scheduler::ProcessId;
use crate::system::ActorSystemRef;
use crate::util::Shared;

pub use crate::mailbox::{First, MessageSelection, Messages, MessageSelector, Priority};
use crate::mailbox::{First, MessageSelector};

/// The context in which an actor is executed.
///
Expand Down
49 changes: 44 additions & 5 deletions src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub mod message_select {
//! Module containing the `MessageSelector` trait and related types.

#[doc(inline)]
pub use crate::actor::context::{First, MessageSelection, MessageSelector, Messages, Priority};
pub use crate::mailbox::{First, MessageSelection, MessageSelector, Messages, Priority};
}

#[doc(inline)]
Expand Down Expand Up @@ -238,7 +238,7 @@ impl<M, Arg1, Arg2, Arg3, Arg4, Arg5, A> NewActor for fn(ctx: Context<M>, arg1:
/// Effectively an `Actor` is a [`Future`] which returns a `Result<(), Error>`,
/// where `Error` is defined on the trait. That is why there is a blanket
/// implementation for all `Future`s with a `Result<(), Error>` as `Output`
/// type.
/// type. There is also a blanket statement for `Future`s that return `()`.
///
/// The easiest way to implement this by using an async function, see the
/// [module level] documentation.
Expand Down Expand Up @@ -272,12 +272,51 @@ pub trait Actor {
fn try_poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), Self::Error>>;
}

impl<Fut, E> Actor for Fut
where Fut: Future<Output = Result<(), E>>
impl<Fut, O, E> Actor for Fut
where Fut: Future<Output = O>,
O: IntoResult<Error = E>,
{
type Error = E;

fn try_poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), Self::Error>> {
self.poll(waker)
self.poll(waker).map(IntoResult::into_result)
}
}

use private::Private;

/// This trait is an implementation detail to allow [`Actor`] to be implemented
/// for [`Future`]s that return `()` and `Result<(), E>`.
///
/// This will be removed in the future if
/// https://github.com/rust-lang/rfcs/pull/1672, or something like it is
/// implemented.
#[doc(hidden)]
pub trait IntoResult: Private {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the trait should be named ActorResult and should be public.

type Error;

fn into_result(self) -> Result<(), Self::Error>;
}

mod private {
pub trait Private { }

impl<E> Private for Result<(), E> { }
impl Private for () { }
}

impl<E> IntoResult for Result<(), E> {
type Error = E;

fn into_result(self) -> Result<(), E> {
self
}
}

impl IntoResult for () {
type Error = !;

fn into_result(self) -> Result<(), !> {
Ok(self)
}
}
16 changes: 15 additions & 1 deletion src/system/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use crate::actor::{Actor, Context, NewActor};
use crate::actor_ref::{ActorRef, Local};
use crate::mailbox::MailBox;
use crate::scheduler::{ProcessId, Scheduler, SchedulerRef};
use crate::supervisor::Supervisor;
use crate::supervisor::{Supervisor, NoSupervisor};
use crate::util::Shared;

mod error;
Expand Down Expand Up @@ -463,6 +463,20 @@ impl ActorSystemRef {
.unwrap_or_else(|_: AddActorError<!, !>| unreachable!())
}

/// Spawn an actor that doesn't return an error.
///
/// This is a convenience method for `NewActor` and `Actor` implementations
/// that never return an error.
///
/// See [`ActorSystemRef::try_spawn`] for more information.
pub fn spawn_unsupervised<NA>(&mut self, new_actor: NA, arg: NA::Argument, options: ActorOptions) -> ActorRef<NA::Message>
where NA: NewActor<Error = !> + 'static,
NA::Actor: Actor<Error = !> + 'static,
{
self.try_spawn_setup(NoSupervisor, new_actor, |_, _| Ok(arg), options)
.unwrap_or_else(|_: AddActorError<!, !>| unreachable!())
}

/// Spawn an actor that needs to be initialised.
///
/// Just like `try_spawn` this requires a `supervisor`, `new_actor` and
Expand Down