Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example for Input and output Bindings #57

Merged
merged 9 commits into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/validate-examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ jobs:
fail-fast: false
matrix:
examples:
[ "actors", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "query_state", "secrets-bulk" ]
[ "actors", "bindings", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "query_state", "secrets-bulk" ]
steps:
- name: Check out code
uses: actions/checkout@v4
Expand Down
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ path = "examples/pubsub/publisher.rs"
name = "subscriber"
path = "examples/pubsub/subscriber.rs"

[[example]]
name = "output-bindings"
path = "examples/bindings/output.rs"

[[example]]
name = "input-bindings"
path = "examples/bindings/input.rs"

[[example]]
name = "query_state_q1"
path = "examples/query_state/query1.rs"
Expand Down
70 changes: 70 additions & 0 deletions examples/bindings/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Input and Output Bindings Example

This is a simple example that demonstrates Dapr's binding capabilities. To implement input bindings in your rust application, you need to implement `AppCallback` server for subscribing to events. Specifically, the following two methods need to be implemented for input bindings to work:

1. `list_input_bindings` - Dapr runtime calls this method to get list of bindings the application is subscribed to.
2. `on_binding_event` - Defines how the application handles the input binding event.

> **Note:** Make sure to use latest version of proto bindings.

In order to have both examples working with the same binding configuration ServiceBus was used here. If you don't have it available you can change to a binding that works for both Input and Output from [this list](https://docs.dapr.io/reference/components-reference/supported-bindings/)


## Running

To run this example:

1. Run a kafka container

<!-- STEP
name: Run kafka instance
background: true
sleep: 60
timeout_seconds: 120
expected_return_code:
expected_stderr_lines:
-->

```bash
docker run -p 9092:9092 apache/kafka:3.7.1
```

<!-- END_STEP -->

2. Run the multi-app run template (`dapr.yaml`)

<!-- STEP
name: Run Multi-app Run
output_match_mode: substring
match_order: sequential
expected_stdout_lines:
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 0 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 1 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 2 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 3 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 4 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 5 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 6 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 7 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 8 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 9 => hello from rust!'
background: true
sleep: 30
timeout_seconds: 90
-->

```bash
dapr run -f .
```

<!-- END_STEP -->
23 changes: 23 additions & 0 deletions examples/bindings/components/bindings.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: binding-example
spec:
type: bindings.kafka
metadata:
- name: direction
value: "input, output"
# Kafka broker connection setting
- name: brokers
value: localhost:9092
# consumer configuration: topic and consumer group
- name: topics
value: sample
- name: consumerGroup
value: group1
# publisher configuration: topic
- name: publishTopic
value: sample
- name: authType
value: "none"

16 changes: 16 additions & 0 deletions examples/bindings/dapr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
version: 1
common:
resourcesPath: ./components/
daprdLogDestination: console
apps:
- appID: rust-input-b
appDirPath: ./
appProtocol: grpc
appPort: 50051
logLevel: debug
command: ["cargo", "run", "--example", "input-bindings"]
- appID: rust-output-b
appDirPath: ./
appProtocol: grpc
logLevel: debug
command: ["cargo", "run", "--example", "output-bindings"]
87 changes: 87 additions & 0 deletions examples/bindings/input.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use tonic::{transport::Server, Request, Response, Status};

use dapr::dapr::dapr::proto::common::v1::{InvokeRequest, InvokeResponse};
use dapr::dapr::dapr::proto::runtime::v1::app_callback_server::{AppCallback, AppCallbackServer};
use dapr::dapr::dapr::proto::runtime::v1::{
BindingEventRequest, BindingEventResponse, ListInputBindingsResponse,
ListTopicSubscriptionsResponse, TopicEventRequest, TopicEventResponse,
};

#[derive(Default)]
pub struct AppCallbackService {}

#[tonic::async_trait]
impl AppCallback for AppCallbackService {
/// Invokes service method with InvokeRequest.
async fn on_invoke(
&self,
_request: Request<InvokeRequest>,
) -> Result<Response<InvokeResponse>, Status> {
Ok(Response::new(InvokeResponse::default()))
}

/// Lists all topics subscribed by this app.
async fn list_topic_subscriptions(
&self,
_request: Request<()>,
) -> Result<Response<ListTopicSubscriptionsResponse>, Status> {
Ok(Response::new(ListTopicSubscriptionsResponse::default()))
}

/// Subscribes events from Pubsub.
async fn on_topic_event(
&self,
_request: Request<TopicEventRequest>,
) -> Result<Response<TopicEventResponse>, Status> {
Ok(Response::new(TopicEventResponse::default()))
}

/// Lists all input bindings subscribed by this app.
/// NOTE: Dapr runtime will call this method to get
/// the list of bindings the app wants to subscribe to.
/// In this example, the app is subscribing to a local pubsub binding named "binding-example"

async fn list_input_bindings(
&self,
_request: Request<()>,
) -> Result<Response<ListInputBindingsResponse>, Status> {
let list_bindings = ListInputBindingsResponse {
bindings: vec![String::from("binding-example")],
};

Ok(Response::new(list_bindings))
}

/// Listens events from the input bindings.
async fn on_binding_event(
&self,
request: Request<BindingEventRequest>,
) -> Result<Response<BindingEventResponse>, Status> {
let r = request.into_inner();
let name = &r.name;
let data = &r.data;

let message = String::from_utf8_lossy(&data);
println!("Binding Name: {}", &name);
println!("Message: {}", &message);

Ok(Response::new(BindingEventResponse::default()))
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::]:50051".parse().unwrap();

let callback_service = AppCallbackService::default();

println!("AppCallback server listening on: {}", addr);

// Create a gRPC server with the callback_service.
Server::builder()
.add_service(AppCallbackServer::new(callback_service))
.serve(addr)
.await?;

Ok(())
}
35 changes: 35 additions & 0 deletions examples/bindings/output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::{collections::HashMap, thread, time::Duration};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
// Introduce delay so that dapr grpc port is assigned before app tries to connect
thread::sleep(Duration::from_secs(2));

// Get the Dapr port and create a connection
let addr = "https://127.0.0.1".to_string();

// Create the client
let mut client = dapr::Client::<dapr::client::TonicClient>::connect(addr).await?;

// name of the component
let binding_name = "binding-example";

for count in 0..10 {
// message metadata
let mut metadata = HashMap::<String, String>::new();
metadata.insert("count".to_string(), count.to_string());

// message
let message = format!("{} => hello from rust!", &count).into_bytes();

client
.invoke_binding(binding_name, message, "create", Some(metadata))
.await?;

// sleep for 500ms to simulate delay b/w two events
tokio::time::sleep(Duration::from_millis(500)).await;
}

Ok(())
}
8 changes: 8 additions & 0 deletions src/appcallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ impl TopicSubscription {
}
}

impl ListInputBindingsResponse {
pub fn binding(binding_name: String) -> Self {
Self {
bindings: vec![binding_name],
}
}
}

pub struct AppCallbackService {
handlers: Vec<Handler>,
}
Expand Down
10 changes: 9 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,23 @@ impl<T: DaprInterface> Client<T> {
&mut self,
name: S,
data: Vec<u8>,
operation: S,
metadata: Option<HashMap<String, String>>,
) -> Result<InvokeBindingResponse, Error>
where
S: Into<String>,
{
let mut mdata = HashMap::<String, String>::new();
if let Some(m) = metadata {
mdata = m;
}

self.0
.invoke_binding(InvokeBindingRequest {
name: name.into(),
data,
..Default::default()
operation: operation.into(),
metadata: mdata,
})
.await
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/actor/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl ActorTypeRegistration {
pub fn register_method<T>(
mut self,
method_name: &str,
handler: impl Handler<T, ActorState> + Send + Sync,
handler: impl Handler<T, ActorState> + Sync,
) -> Self
where
T: 'static,
Expand Down
Loading