-
Notifications
You must be signed in to change notification settings - Fork 66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial implementation of actor runtime #99
Changes from all commits
e455e80
ae7aef7
7db861e
2bd95e1
ebf701e
2e34b22
c95a9c0
45eab14
3f286d5
dc6156f
7981005
513b334
6dbe2b7
7abc466
0a0ff9b
188753d
b9cd1f6
8581e82
d18f355
b6b6fe3
6971ab5
a4241d5
8e77624
a58192b
6fc06ac
029f482
a908fac
877efed
a979a1f
6a3c6d8
8769c21
0030ddd
d0e5384
0c0ee4a
f32f4cd
a2d3494
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,3 +8,4 @@ Cargo.lock | |
|
||
# These are backup files generated by rustfmt | ||
**/*.rs.bk | ||
.vscode/settings.json |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
# Actor Example | ||
|
||
This example demonstrates the Dapr actor framework. To author an actor, | ||
|
||
1. Create a struc decorated with the `#[dapr::actor]` macro to house your custom actor methods that map to [Axum handlers](https://docs.rs/axum/latest/axum/handler/index.html), use [Axum extractors](https://docs.rs/axum/latest/axum/extract/index.html) to access the incoming request and return an [`impl IntoResponse`](https://docs.rs/axum/latest/axum/response/trait.IntoResponse.html). | ||
Use the `DaprJson` extractor to deserialize the request from Json coming from a Dapr sidecar. | ||
```rust | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick: I think the example should also include the |
||
#[dapr::actor] | ||
struct MyActor { | ||
id: String, | ||
client: ActorContextClient | ||
} | ||
|
||
#[derive(Serialize, Deserialize)] | ||
pub struct MyRequest { | ||
pub name: String, | ||
} | ||
|
||
#[derive(Serialize, Deserialize)] | ||
pub struct MyResponse { | ||
pub available: bool, | ||
} | ||
|
||
impl MyActor { | ||
fn do_stuff(&self, DaprJson(data): DaprJson<MyRequest>) -> Json<MyResponse> { | ||
println!("doing stuff with {}", data.name); | ||
Json(MyResponse { | ||
available: true | ||
}) | ||
} | ||
} | ||
``` | ||
|
||
There are many ways to write your actor method signature, using Axum handlers, but you also have access to the actor instance via `self`. Here is a super simple example: | ||
```rust | ||
pub async fn method_2(&self) -> impl IntoResponse { | ||
StatusCode::OK | ||
} | ||
``` | ||
1. Implement the `Actor` trait. This trait exposes the following methods: | ||
- `on_activate` - Called when an actor is activated on a host | ||
- `on_deactivate` - Called when an actor is deactivated on a host | ||
- `on_reminder` - Called when a reminder is recieved from the Dapr sidecar | ||
- `on_timer` - Called when a timer is recieved from the Dapr sidecar | ||
|
||
|
||
```rust | ||
#[async_trait] | ||
impl Actor for MyActor { | ||
|
||
async fn on_activate(&self) -> Result<(), ActorError> { | ||
println!("on_activate {}", self.id); | ||
Ok(()) | ||
} | ||
|
||
async fn on_deactivate(&self) -> Result<(), ActorError> { | ||
println!("on_deactivate"); | ||
Ok(()) | ||
} | ||
} | ||
``` | ||
|
||
1. An actor host requires an Http server to recieve callbacks from the Dapr sidecar. The `DaprHttpServer` object implements this functionality and also encapsulates the actor runtime to service any hosted actors. Use the `register_actor` method to register an actor type to be serviced, this method takes an `ActorTypeRegistration` which specifies | ||
- The actor type name (used by Actor clients), and concrete struct | ||
- A factory to construct a new instance of that actor type when one is required to be activated by the runtime. The parameters passed to the factory will be the actor type, actor ID, and a Dapr client for managing state, timers and reminders for the actor. | ||
- The methods that you would like to expose to external clients. | ||
|
||
```rust | ||
let mut dapr_server = dapr::server::DaprHttpServer::new(); | ||
|
||
dapr_server.register_actor(ActorTypeRegistration::new::<MyActor>("MyActor", | ||
Box::new(|actor_type, id, client| Arc::new(MyActor{ | ||
actor_type, | ||
id, | ||
client | ||
}))) | ||
.register_method("do_stuff", MyActor::do_stuff) | ||
.register_method("do_other_stuff", MyActor::do_other_stuff)); | ||
|
||
dapr_server.start(None).await?; | ||
``` | ||
|
||
|
||
## Running | ||
|
||
> Before you run the example make sure local redis state store is running by executing: | ||
> ``` | ||
> docker ps | ||
> ``` | ||
|
||
To run this example: | ||
|
||
1. Start actor host (expose Http server receiver on port 50051): | ||
```bash | ||
dapr run --app-id actor-host --app-protocol http --app-port 50051 cargo run -- --example actor-server | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be worth pointing out that only There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One day, but not today :) |
||
``` | ||
|
||
2. Start actor client: | ||
```bash | ||
dapr run --app-id actor-client --dapr-grpc-port 3502 cargo run -- --example actor-client | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
use serde::{Deserialize, Serialize}; | ||
|
||
#[derive(Serialize, Deserialize, Debug)] | ||
pub struct MyResponse { | ||
pub available: bool, | ||
} | ||
|
||
#[derive(Serialize, Deserialize, Debug)] | ||
pub struct MyRequest { | ||
pub name: String, | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
// TODO: Handle this issue in the sdk | ||
// Introduce delay so that dapr grpc port is assigned before app tries to connect | ||
std::thread::sleep(std::time::Duration::new(2, 0)); | ||
|
||
// Get the Dapr port and create a connection | ||
let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?; | ||
let addr = format!("https://127.0.0.1:{}", port); | ||
|
||
// Create the client | ||
let mut client = dapr::Client::<dapr::client::TonicClient>::connect(addr).await?; | ||
|
||
let data = MyRequest { | ||
name: "test".to_string(), | ||
}; | ||
|
||
let resp: Result<MyResponse, dapr::error::Error> = client | ||
.invoke_actor("MyActor", "a1", "do_stuff", data, None) | ||
.await; | ||
|
||
println!("Response: {:#?}", resp); | ||
|
||
Ok(()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
use async_trait::async_trait; | ||
use axum::Json; | ||
use dapr::server::{ | ||
actor::{ | ||
context_client::ActorContextClient, runtime::ActorTypeRegistration, Actor, ActorError, | ||
}, | ||
utils::DaprJson, | ||
}; | ||
use dapr_macros::actor; | ||
use serde::{Deserialize, Serialize}; | ||
use std::{str::from_utf8, sync::Arc}; | ||
|
||
#[derive(Serialize, Deserialize, Debug)] | ||
pub struct MyResponse { | ||
pub available: bool, | ||
} | ||
|
||
#[derive(Serialize, Deserialize, Debug)] | ||
pub struct MyRequest { | ||
pub name: String, | ||
} | ||
|
||
#[actor] | ||
struct MyActor { | ||
id: String, | ||
client: ActorContextClient, | ||
} | ||
|
||
impl MyActor { | ||
async fn do_stuff(&self, DaprJson(req): DaprJson<MyRequest>) -> Json<MyResponse> { | ||
println!("doing stuff with {}", req.name); | ||
let mut dapr = self.client.clone(); | ||
let r = dapr.get_actor_state("key1").await.unwrap(); | ||
println!("get_actor_state {:?}", r); | ||
Json(MyResponse { available: true }) | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl Actor for MyActor { | ||
async fn on_activate(&self) -> Result<(), ActorError> { | ||
println!("on_activate {}", self.id); | ||
Ok(()) | ||
} | ||
|
||
async fn on_deactivate(&self) -> Result<(), ActorError> { | ||
println!("on_deactivate"); | ||
Ok(()) | ||
} | ||
|
||
async fn on_reminder(&self, reminder_name: &str, data: Vec<u8>) -> Result<(), ActorError> { | ||
println!("on_reminder {} {:?}", reminder_name, from_utf8(&data)); | ||
Ok(()) | ||
} | ||
|
||
async fn on_timer(&self, timer_name: &str, data: Vec<u8>) -> Result<(), ActorError> { | ||
println!("on_timer {} {:?}", timer_name, from_utf8(&data)); | ||
Ok(()) | ||
} | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); | ||
let mut dapr_server = dapr::server::DaprHttpServer::new().await; | ||
|
||
dapr_server | ||
.register_actor( | ||
ActorTypeRegistration::new::<MyActor>( | ||
"MyActor", | ||
Box::new(|_actor_type, actor_id, context| { | ||
Arc::new(MyActor { | ||
id: actor_id.to_string(), | ||
client: context, | ||
}) | ||
}), | ||
) | ||
.register_method("do_stuff", MyActor::do_stuff) | ||
.register_method("do_stuff2", MyActor::do_stuff), | ||
) | ||
.await; | ||
|
||
dapr_server.start(None).await?; | ||
|
||
Ok(()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
target |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
[package] | ||
name = "dapr-macros" | ||
version = "0.14.0" | ||
edition = "2021" | ||
|
||
[lib] | ||
proc-macro = true | ||
|
||
[dependencies] | ||
async-trait = "0.1" | ||
log = "0.4" | ||
axum = "0.6.19" | ||
syn = {version="2.0.29",features=["full"]} | ||
quote = "1.0.8" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
use std::iter; | ||
|
||
use proc_macro::TokenStream; | ||
use quote::quote; | ||
|
||
#[proc_macro_attribute] | ||
pub fn actor(_attr: TokenStream, item: TokenStream) -> TokenStream { | ||
let actor_struct_name = match syn::parse::<syn::ItemStruct>(item.clone()) { | ||
Ok(actor_struct) => actor_struct.ident.clone(), | ||
Err(_) => match syn::parse::<syn::ItemType>(item.clone()) { | ||
Ok(ty) => ty.ident.clone(), | ||
Err(e) => panic!("Error parsing actor struct: {}", e), | ||
}, | ||
}; | ||
|
||
let mut result = TokenStream::from(quote!( | ||
#[async_trait::async_trait] | ||
impl axum::extract::FromRequestParts<dapr::server::actor::runtime::ActorState> for &#actor_struct_name { | ||
type Rejection = dapr::server::actor::ActorRejection; | ||
|
||
async fn from_request_parts( | ||
parts: &mut axum::http::request::Parts, | ||
state: &dapr::server::actor::runtime::ActorState, | ||
) -> Result<Self, Self::Rejection> { | ||
let path = match axum::extract::Path::<dapr::server::actor::ActorPath>::from_request_parts(parts, state).await { | ||
Ok(path) => path, | ||
Err(e) => { | ||
log::error!("Error getting path: {}", e); | ||
return Err(dapr::server::actor::ActorRejection::Path(e)); | ||
} | ||
}; | ||
let actor_type = state.actor_type.clone(); | ||
let actor_id = path.actor_id.clone(); | ||
log::info!( | ||
"Request for actor_type: {}, actor_id: {}", | ||
actor_type, | ||
actor_id | ||
); | ||
let actor = match state | ||
.runtime | ||
.get_or_create_actor(&actor_type, &actor_id) | ||
.await | ||
{ | ||
Ok(actor) => actor, | ||
Err(e) => { | ||
log::error!("Error getting actor: {}", e); | ||
return Err(dapr::server::actor::ActorRejection::ActorError(e.to_string())); | ||
} | ||
}; | ||
let actor = actor.as_ref(); | ||
let well_known_actor = | ||
unsafe { &*(actor as *const dyn dapr::server::actor::Actor as *const #actor_struct_name) }; | ||
Ok(well_known_actor) | ||
} | ||
} | ||
)); | ||
|
||
result.extend(iter::once(item)); | ||
|
||
result | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand it, this will only work with HTTP (hyper underneath), and not gRPC (tonic underneath)?
Will the actors work with for Dapr in gRPC mode?
My assumption here is that it only works in HTTP mode (because of axum, with hyper underneath).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do see Tonic being used in the client example, so I assume this is a non-issue. Will leave the comment there anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dapr side car currently does not support invoking actors via grpc as far as I know, so any actor host would have to support http.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yaron2 Is it supposed to be upgraded soon ? or has it be ?