diff --git a/.github/workflows/validate-examples.yml b/.github/workflows/validate-examples.yml index 1e278c6a..d69a1016 100644 --- a/.github/workflows/validate-examples.yml +++ b/.github/workflows/validate-examples.yml @@ -144,7 +144,7 @@ jobs: fail-fast: false matrix: examples: - [ "actors", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "query_state", "secrets-bulk" ] + [ "actors", "client", "configuration", "crypto", "distributed-lock", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "query_state", "secrets-bulk" ] steps: - name: Check out code uses: actions/checkout@v4 diff --git a/Cargo.toml b/Cargo.toml index c12534b9..0c762377 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,10 @@ path = "examples/configuration/main.rs" name = "crypto" path = "examples/crypto/main.rs" +[[example]] +name = "distributed-lock" +path = "examples/distributed-lock/main.rs" + [[example]] name = "invoke-grpc-client" path = "examples/invoke/grpc/client.rs" diff --git a/examples/distributed-lock/README.md b/examples/distributed-lock/README.md new file mode 100644 index 00000000..29663a94 --- /dev/null +++ b/examples/distributed-lock/README.md @@ -0,0 +1,33 @@ +# Distributed Lock + +This is a simple example that demonstrates Dapr's Distributed Lock capabilities. + +> **Note:** Make sure to use latest version of proto bindings. + +## Running + +To run this example: + +1. Run the multi-app run template: + + + +```bash +dapr run -f . +``` + + + +2. Stop with `ctrl + c` diff --git a/examples/distributed-lock/components/local-storage.yml b/examples/distributed-lock/components/local-storage.yml new file mode 100644 index 00000000..d7c69764 --- /dev/null +++ b/examples/distributed-lock/components/local-storage.yml @@ -0,0 +1,12 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: lockstore +spec: + type: lock.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" \ No newline at end of file diff --git a/examples/distributed-lock/dapr.yaml b/examples/distributed-lock/dapr.yaml new file mode 100644 index 00000000..02991e02 --- /dev/null +++ b/examples/distributed-lock/dapr.yaml @@ -0,0 +1,10 @@ +version: 1 +common: + daprdLogDestination: console +apps: + - appID: distributed-lock-example + appDirPath: ./ + daprGRPCPort: 35002 + logLevel: debug + command: [ "cargo", "run", "--example", "distributed-lock" ] + resourcesPath: ./components \ No newline at end of file diff --git a/examples/distributed-lock/main.rs b/examples/distributed-lock/main.rs new file mode 100644 index 00000000..056e2302 --- /dev/null +++ b/examples/distributed-lock/main.rs @@ -0,0 +1,67 @@ +use tokio::time::sleep; + +#[tokio::main] +async fn main() -> Result<(), Box> { + sleep(std::time::Duration::new(2, 0)).await; + let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?; + let addr = format!("https://127.0.0.1:{}", port); + + let mut client = dapr::Client::::connect(addr).await?; + + let result = client + .lock(dapr::client::TryLockRequest { + store_name: "lockstore".to_string(), + resource_id: "resource".to_string(), + lock_owner: "some-random-id".to_string(), + expiry_in_seconds: 60, + }) + .await + .unwrap(); + + assert!(result.success); + + println!("Successfully acquired lock on: resource"); + + let result = client + .lock(dapr::client::TryLockRequest { + store_name: "lockstore".to_string(), + resource_id: "resource".to_string(), + lock_owner: "some-random-id".to_string(), + expiry_in_seconds: 60, + }) + .await + .unwrap(); + + assert!(!result.success); + + println!("Unsuccessfully acquired lock on: resource"); + + let result = client + .unlock(dapr::client::UnlockRequest { + store_name: "lockstore".to_string(), + resource_id: "resource".to_string(), + lock_owner: "some-random-id".to_string(), + }) + .await + .unwrap(); + + assert_eq!(0, result.status); + + println!("Successfully released lock on: resource"); + + let result = client + .lock(dapr::client::TryLockRequest { + store_name: "lockstore".to_string(), + resource_id: "resource".to_string(), + lock_owner: "some-random-id".to_string(), + expiry_in_seconds: 60, + }) + .await + .unwrap(); + + assert!(result.success); + + println!("Successfully acquired lock on: resource"); + + Ok(()) +} diff --git a/src/client.rs b/src/client.rs index b58d08f9..6727ee91 100644 --- a/src/client.rs +++ b/src/client.rs @@ -489,6 +489,24 @@ impl Client { .collect(); self.0.decrypt(requested_items).await } + + /// Distributed lock request call + /// + /// # Arguments + /// + /// * `request` - Request to be made, TryLockRequest + pub async fn lock(&mut self, request: TryLockRequest) -> Result { + self.0.lock(request).await + } + + /// Distributed lock request call + /// + /// # Arguments + /// + /// * `request` - Request to be made, TryLockRequest + pub async fn unlock(&mut self, request: UnlockRequest) -> Result { + self.0.unlock(request).await + } } #[async_trait] @@ -539,6 +557,10 @@ pub trait DaprInterface: Sized { -> Result, Status>; async fn decrypt(&mut self, payload: Vec) -> Result, Status>; + + async fn try_lock(&mut self, request: TryLockRequest) -> Result; + + async fn unlock(&mut self, request: UnlockRequest) -> Result; } #[async_trait] @@ -709,6 +731,13 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient { } Ok(data) } + async fn lock(&mut self, request: TryLockRequest) -> Result { + Ok(self.try_lock_alpha1(request).await?.into_inner()) + } + + async fn unlock(&mut self, request: UnlockRequest) -> Result { + Ok(self.unlock_alpha1(request).await?.into_inner()) + } } /// A request from invoking a service @@ -806,6 +835,17 @@ pub type EncryptRequestOptions = crate::dapr::dapr::proto::runtime::v1::EncryptR /// Decryption request options pub type DecryptRequestOptions = crate::dapr::dapr::proto::runtime::v1::DecryptRequestOptions; +/// Lock response +pub type TryLockResponse = dapr_v1::TryLockResponse; + +/// Lock request +pub type TryLockRequest = dapr_v1::TryLockRequest; + +/// Unlock request +pub type UnlockRequest = dapr_v1::UnlockRequest; + +/// Unlock response +pub type UnlockResponse = dapr_v1::UnlockResponse; type StreamPayload = crate::dapr::dapr::proto::common::v1::StreamPayload; impl From<(K, Vec)> for common_v1::StateItem where