-
Notifications
You must be signed in to change notification settings - Fork 66
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Initial implementation of actor runtime (#99)
* actor implementation Signed-off-by: Daniel Gerlag <[email protected]> * wip Signed-off-by: Daniel Gerlag <[email protected]> * wip Signed-off-by: Daniel Gerlag <[email protected]> * wip Signed-off-by: Daniel Gerlag <[email protected]> * wip Signed-off-by: Daniel Gerlag <[email protected]> * nits Signed-off-by: Daniel Gerlag <[email protected]> * tests Signed-off-by: Daniel Gerlag <[email protected]> * make client cloneable Signed-off-by: Daniel Gerlag <[email protected]> * logs Signed-off-by: Daniel Gerlag <[email protected]> * logging Signed-off-by: Daniel Gerlag <[email protected]> * async methods Signed-off-by: Daniel Gerlag <[email protected]> * shutdown semantics Signed-off-by: Daniel Gerlag <[email protected]> * clone actor client context Signed-off-by: Daniel Gerlag <[email protected]> * actor implementation Signed-off-by: Daniel Gerlag <[email protected]> * wip Signed-off-by: Daniel Gerlag <[email protected]> * move tests Signed-off-by: Daniel Gerlag <[email protected]> * actor factory Signed-off-by: Daniel Gerlag <[email protected]> * wip Signed-off-by: Daniel Gerlag <[email protected]> * wip Signed-off-by: Daniel Gerlag <[email protected]> * readme Signed-off-by: Daniel Gerlag <[email protected]> * pr feedback Signed-off-by: Daniel Gerlag <[email protected]> Signed-off-by: Daniel Gerlag <[email protected]> * cargo fmt Signed-off-by: Daniel Gerlag <[email protected]> * cargo clippy --fix Signed-off-by: Daniel Gerlag <[email protected]> * proc macro Signed-off-by: Daniel Gerlag <[email protected]> * dependency shuffle Signed-off-by: Daniel Gerlag <[email protected]> * Update lib.rs Signed-off-by: Daniel Gerlag <[email protected]> * docs Signed-off-by: Daniel Gerlag <[email protected]> * enable decorating type alias Signed-off-by: Daniel Gerlag <[email protected]> * graceful shutdown Signed-off-by: Daniel Gerlag <[email protected]> * merge issues Signed-off-by: Daniel Gerlag <[email protected]> * cargo fmt Signed-off-by: Daniel Gerlag <[email protected]> * update rust version Signed-off-by: Daniel Gerlag <[email protected]> * publish macro crate Signed-off-by: Daniel Gerlag <[email protected]> * dependency issue Signed-off-by: Daniel Gerlag <[email protected]> * clippy warnings Signed-off-by: Daniel Gerlag <[email protected]> --------- Signed-off-by: Daniel Gerlag <[email protected]>
- Loading branch information
1 parent
6950787
commit 8db69d8
Showing
23 changed files
with
1,708 additions
and
27 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,3 +8,4 @@ Cargo.lock | |
|
||
# These are backup files generated by rustfmt | ||
**/*.rs.bk | ||
.vscode/settings.json |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
# Actor Example | ||
|
||
This example demonstrates the Dapr actor framework. To author an actor, | ||
|
||
1. Create a struc decorated with the `#[dapr::actor]` macro to house your custom actor methods that map to [Axum handlers](https://docs.rs/axum/latest/axum/handler/index.html), use [Axum extractors](https://docs.rs/axum/latest/axum/extract/index.html) to access the incoming request and return an [`impl IntoResponse`](https://docs.rs/axum/latest/axum/response/trait.IntoResponse.html). | ||
Use the `DaprJson` extractor to deserialize the request from Json coming from a Dapr sidecar. | ||
```rust | ||
#[dapr::actor] | ||
struct MyActor { | ||
id: String, | ||
client: ActorContextClient | ||
} | ||
|
||
#[derive(Serialize, Deserialize)] | ||
pub struct MyRequest { | ||
pub name: String, | ||
} | ||
|
||
#[derive(Serialize, Deserialize)] | ||
pub struct MyResponse { | ||
pub available: bool, | ||
} | ||
|
||
impl MyActor { | ||
fn do_stuff(&self, DaprJson(data): DaprJson<MyRequest>) -> Json<MyResponse> { | ||
println!("doing stuff with {}", data.name); | ||
Json(MyResponse { | ||
available: true | ||
}) | ||
} | ||
} | ||
``` | ||
|
||
There are many ways to write your actor method signature, using Axum handlers, but you also have access to the actor instance via `self`. Here is a super simple example: | ||
```rust | ||
pub async fn method_2(&self) -> impl IntoResponse { | ||
StatusCode::OK | ||
} | ||
``` | ||
1. Implement the `Actor` trait. This trait exposes the following methods: | ||
- `on_activate` - Called when an actor is activated on a host | ||
- `on_deactivate` - Called when an actor is deactivated on a host | ||
- `on_reminder` - Called when a reminder is recieved from the Dapr sidecar | ||
- `on_timer` - Called when a timer is recieved from the Dapr sidecar | ||
|
||
|
||
```rust | ||
#[async_trait] | ||
impl Actor for MyActor { | ||
|
||
async fn on_activate(&self) -> Result<(), ActorError> { | ||
println!("on_activate {}", self.id); | ||
Ok(()) | ||
} | ||
|
||
async fn on_deactivate(&self) -> Result<(), ActorError> { | ||
println!("on_deactivate"); | ||
Ok(()) | ||
} | ||
} | ||
``` | ||
|
||
1. An actor host requires an Http server to recieve callbacks from the Dapr sidecar. The `DaprHttpServer` object implements this functionality and also encapsulates the actor runtime to service any hosted actors. Use the `register_actor` method to register an actor type to be serviced, this method takes an `ActorTypeRegistration` which specifies | ||
- The actor type name (used by Actor clients), and concrete struct | ||
- A factory to construct a new instance of that actor type when one is required to be activated by the runtime. The parameters passed to the factory will be the actor type, actor ID, and a Dapr client for managing state, timers and reminders for the actor. | ||
- The methods that you would like to expose to external clients. | ||
|
||
```rust | ||
let mut dapr_server = dapr::server::DaprHttpServer::new(); | ||
|
||
dapr_server.register_actor(ActorTypeRegistration::new::<MyActor>("MyActor", | ||
Box::new(|actor_type, id, client| Arc::new(MyActor{ | ||
actor_type, | ||
id, | ||
client | ||
}))) | ||
.register_method("do_stuff", MyActor::do_stuff) | ||
.register_method("do_other_stuff", MyActor::do_other_stuff)); | ||
|
||
dapr_server.start(None).await?; | ||
``` | ||
|
||
|
||
## Running | ||
|
||
> Before you run the example make sure local redis state store is running by executing: | ||
> ``` | ||
> docker ps | ||
> ``` | ||
|
||
To run this example: | ||
|
||
1. Start actor host (expose Http server receiver on port 50051): | ||
```bash | ||
dapr run --app-id actor-host --app-protocol http --app-port 50051 cargo run -- --example actor-server | ||
``` | ||
|
||
2. Start actor client: | ||
```bash | ||
dapr run --app-id actor-client --dapr-grpc-port 3502 cargo run -- --example actor-client | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
use serde::{Deserialize, Serialize}; | ||
|
||
#[derive(Serialize, Deserialize, Debug)] | ||
pub struct MyResponse { | ||
pub available: bool, | ||
} | ||
|
||
#[derive(Serialize, Deserialize, Debug)] | ||
pub struct MyRequest { | ||
pub name: String, | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
// TODO: Handle this issue in the sdk | ||
// Introduce delay so that dapr grpc port is assigned before app tries to connect | ||
std::thread::sleep(std::time::Duration::new(2, 0)); | ||
|
||
// Get the Dapr port and create a connection | ||
let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?; | ||
let addr = format!("https://127.0.0.1:{}", port); | ||
|
||
// Create the client | ||
let mut client = dapr::Client::<dapr::client::TonicClient>::connect(addr).await?; | ||
|
||
let data = MyRequest { | ||
name: "test".to_string(), | ||
}; | ||
|
||
let resp: Result<MyResponse, dapr::error::Error> = client | ||
.invoke_actor("MyActor", "a1", "do_stuff", data, None) | ||
.await; | ||
|
||
println!("Response: {:#?}", resp); | ||
|
||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
use async_trait::async_trait; | ||
use axum::Json; | ||
use dapr::server::{ | ||
actor::{ | ||
context_client::ActorContextClient, runtime::ActorTypeRegistration, Actor, ActorError, | ||
}, | ||
utils::DaprJson, | ||
}; | ||
use dapr_macros::actor; | ||
use serde::{Deserialize, Serialize}; | ||
use std::{str::from_utf8, sync::Arc}; | ||
|
||
#[derive(Serialize, Deserialize, Debug)] | ||
pub struct MyResponse { | ||
pub available: bool, | ||
} | ||
|
||
#[derive(Serialize, Deserialize, Debug)] | ||
pub struct MyRequest { | ||
pub name: String, | ||
} | ||
|
||
#[actor] | ||
struct MyActor { | ||
id: String, | ||
client: ActorContextClient, | ||
} | ||
|
||
impl MyActor { | ||
async fn do_stuff(&self, DaprJson(req): DaprJson<MyRequest>) -> Json<MyResponse> { | ||
println!("doing stuff with {}", req.name); | ||
let mut dapr = self.client.clone(); | ||
let r = dapr.get_actor_state("key1").await.unwrap(); | ||
println!("get_actor_state {:?}", r); | ||
Json(MyResponse { available: true }) | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl Actor for MyActor { | ||
async fn on_activate(&self) -> Result<(), ActorError> { | ||
println!("on_activate {}", self.id); | ||
Ok(()) | ||
} | ||
|
||
async fn on_deactivate(&self) -> Result<(), ActorError> { | ||
println!("on_deactivate"); | ||
Ok(()) | ||
} | ||
|
||
async fn on_reminder(&self, reminder_name: &str, data: Vec<u8>) -> Result<(), ActorError> { | ||
println!("on_reminder {} {:?}", reminder_name, from_utf8(&data)); | ||
Ok(()) | ||
} | ||
|
||
async fn on_timer(&self, timer_name: &str, data: Vec<u8>) -> Result<(), ActorError> { | ||
println!("on_timer {} {:?}", timer_name, from_utf8(&data)); | ||
Ok(()) | ||
} | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); | ||
let mut dapr_server = dapr::server::DaprHttpServer::new().await; | ||
|
||
dapr_server | ||
.register_actor( | ||
ActorTypeRegistration::new::<MyActor>( | ||
"MyActor", | ||
Box::new(|_actor_type, actor_id, context| { | ||
Arc::new(MyActor { | ||
id: actor_id.to_string(), | ||
client: context, | ||
}) | ||
}), | ||
) | ||
.register_method("do_stuff", MyActor::do_stuff) | ||
.register_method("do_stuff2", MyActor::do_stuff), | ||
) | ||
.await; | ||
|
||
dapr_server.start(None).await?; | ||
|
||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
target |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
[package] | ||
name = "dapr-macros" | ||
version = "0.14.0" | ||
edition = "2021" | ||
|
||
[lib] | ||
proc-macro = true | ||
|
||
[dependencies] | ||
async-trait = "0.1" | ||
log = "0.4" | ||
axum = "0.6.19" | ||
syn = {version="2.0.29",features=["full"]} | ||
quote = "1.0.8" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
use std::iter; | ||
|
||
use proc_macro::TokenStream; | ||
use quote::quote; | ||
|
||
#[proc_macro_attribute] | ||
pub fn actor(_attr: TokenStream, item: TokenStream) -> TokenStream { | ||
let actor_struct_name = match syn::parse::<syn::ItemStruct>(item.clone()) { | ||
Ok(actor_struct) => actor_struct.ident.clone(), | ||
Err(_) => match syn::parse::<syn::ItemType>(item.clone()) { | ||
Ok(ty) => ty.ident.clone(), | ||
Err(e) => panic!("Error parsing actor struct: {}", e), | ||
}, | ||
}; | ||
|
||
let mut result = TokenStream::from(quote!( | ||
#[async_trait::async_trait] | ||
impl axum::extract::FromRequestParts<dapr::server::actor::runtime::ActorState> for &#actor_struct_name { | ||
type Rejection = dapr::server::actor::ActorRejection; | ||
|
||
async fn from_request_parts( | ||
parts: &mut axum::http::request::Parts, | ||
state: &dapr::server::actor::runtime::ActorState, | ||
) -> Result<Self, Self::Rejection> { | ||
let path = match axum::extract::Path::<dapr::server::actor::ActorPath>::from_request_parts(parts, state).await { | ||
Ok(path) => path, | ||
Err(e) => { | ||
log::error!("Error getting path: {}", e); | ||
return Err(dapr::server::actor::ActorRejection::Path(e)); | ||
} | ||
}; | ||
let actor_type = state.actor_type.clone(); | ||
let actor_id = path.actor_id.clone(); | ||
log::info!( | ||
"Request for actor_type: {}, actor_id: {}", | ||
actor_type, | ||
actor_id | ||
); | ||
let actor = match state | ||
.runtime | ||
.get_or_create_actor(&actor_type, &actor_id) | ||
.await | ||
{ | ||
Ok(actor) => actor, | ||
Err(e) => { | ||
log::error!("Error getting actor: {}", e); | ||
return Err(dapr::server::actor::ActorRejection::ActorError(e.to_string())); | ||
} | ||
}; | ||
let actor = actor.as_ref(); | ||
let well_known_actor = | ||
unsafe { &*(actor as *const dyn dapr::server::actor::Actor as *const #actor_struct_name) }; | ||
Ok(well_known_actor) | ||
} | ||
} | ||
)); | ||
|
||
result.extend(iter::once(item)); | ||
|
||
result | ||
} |
Oops, something went wrong.