Skip to content

Commit

Permalink
Replace crossbeam_channel with inbox for ActorRefs
Browse files Browse the repository at this point in the history
Reduces the memory usage from ~7.08 GB to 2.09 GB for the stress_memory
example. Furthermore it reduces its startup time from ~10 seconds to ~5.

The main difference is that the inbox crate uses a bounded channel,
currently of size 8, where as we were using Crossbeam's unbounded
channel. Note that using Crossbeam's bounded channel increased the
memory usage to 8.90 GB for the stress_memory example.
  • Loading branch information
Thomasdezeeuw committed Nov 23, 2020
1 parent 00640b2 commit 9aa02a0
Show file tree
Hide file tree
Showing 35 changed files with 1,358 additions and 3,054 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ test = ["lazy_static", "rand"]
crossbeam-channel = { version = "0.5.0", default-features = false, features = ["std"] }
futures-core = { version = "0.3.6", default-features = false }
futures-io = { version = "0.3.6", default-features = false, features = ["std"] }
# FIXME: use released version.
inbox = { git = "https://github.com/Thomasdezeeuw/inbox", rev = "35f4269f07a5dd717c08e20cc5e260bd980462f0" }
libc = { version = "0.2.79", default-features = false }
log = { version = "0.4.8", default-features = false }
mio = { version = "0.7.5", default-features = false, features = ["os-poll", "tcp", "udp", "pipe"] }
Expand Down
5 changes: 3 additions & 2 deletions examples/1_hello_world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ fn add_greeter_actor(mut runtime_ref: RuntimeRef) -> Result<(), !> {
async fn greeter_actor(mut ctx: actor::Context<&'static str>) -> Result<(), !> {
// All actors have an actor context, which give the actor access to, among
// other things, its inbox from which it can receive a message.
let name = ctx.receive_next().await;
println!("Hello {}", name);
while let Ok(name) = ctx.receive_next().await {
println!("Hello {}", name);
}
Ok(())
}
36 changes: 17 additions & 19 deletions examples/3_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,32 @@ fn add_rpc_actor(mut runtime_ref: RuntimeRef) -> Result<(), !> {
Ok(())
}

async fn ping_actor(mut ctx: actor::Context<!>, actor_ref: ActorRef<PongMessage>) -> Result<(), !> {
// Send our RPC request.
let rpc = actor_ref.rpc(&mut ctx, Ping).unwrap();

// Await until we get a response.
match rpc.await {
async fn ping_actor(_: actor::Context<!>, actor_ref: ActorRef<PongMessage>) -> Result<(), !> {
// Make a Remote Procedure Call (RPC) and await the response.
match actor_ref.rpc(Ping).await {
Ok(response) => println!("Got a RPC response: {}", response),
Err(no_response) => eprintln!("RPC request error: {}", no_response),
Err(err) => eprintln!("RPC request error: {}", err),
}

Ok(())
}

// Message type to support the ping-pong RPC call.
// Message type to support the ping-pong RPC.
type PongMessage = RpcMessage<Ping, Pong>;

async fn pong_actor(mut ctx: actor::Context<PongMessage>) -> Result<(), !> {
// Await a message, same as all other messages.
let msg = ctx.receive_next().await;

// Next we respond to the request.
let res = msg.handle(|request| {
println!("Got a RPC request: {}", request);
// Return a response.
Pong
});

if let Err(err) = res {
eprintln!("failed to respond to RPC: {}", err);
while let Ok(msg) = ctx.receive_next().await {
// Next we respond to the request.
let res = msg.handle(|request| {
println!("Got a RPC request: {}", request);
// Return a response.
Pong
});

if let Err(err) = res {
eprintln!("failed to respond to RPC: {}", err);
}
}

Ok(())
Expand Down
8 changes: 4 additions & 4 deletions examples/6_process_signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ fn sync_actor(mut ctx: SyncContext<Message>) -> Result<(), !> {
}

async fn thread_safe_actor(mut ctx: actor::Context<Message, ThreadSafe>) -> Result<(), !> {
loop {
match ctx.receive_next().await {
while let Ok(msg) = ctx.receive_next().await {
match msg {
Message::Print(msg) => println!("Got a message: {}", msg),
Message::Terminate => break,
}
Expand All @@ -93,8 +93,8 @@ async fn thread_safe_actor(mut ctx: actor::Context<Message, ThreadSafe>) -> Resu
}

async fn local_actor(mut ctx: actor::Context<Message>) -> Result<(), !> {
loop {
match ctx.receive_next().await {
while let Ok(msg) = ctx.receive_next().await {
match msg {
Message::Print(msg) => println!("Got a message: {}", msg),
Message::Terminate => break,
}
Expand Down
2 changes: 1 addition & 1 deletion examples/99_stress_memory.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! This is just a memory stress test of the runtime.
//!
//! Currently using 10 million "actors" this test uses 7.08 GB and takes ~26
//! Currently using 10 million "actors" this test uses 2.09 GB and takes ~5
//! seconds to spawn the actors.
#![feature(never_type)]
Expand Down
1 change: 0 additions & 1 deletion specs/one-shot/properties/Deadlock free.mcf

This file was deleted.

6 changes: 0 additions & 6 deletions specs/one-shot/specs.mcrl2proj

This file was deleted.

85 changes: 0 additions & 85 deletions specs/one-shot/specs_spec.mcrl2

This file was deleted.

Loading

0 comments on commit 9aa02a0

Please sign in to comment.