Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added distributed lock API #159

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion .github/workflows/validate-examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ jobs:
fail-fast: false
matrix:
examples:
[ "actors", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "query_state", "secrets-bulk" ]
[ "actors", "client", "configuration", "crypto", "distributed-lock", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "query_state", "secrets-bulk" ]
steps:
- name: Check out code
uses: actions/checkout@v4
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ path = "examples/configuration/main.rs"
name = "crypto"
path = "examples/crypto/main.rs"

[[example]]
name = "distributed-lock"
path = "examples/distributed-lock/main.rs"

[[example]]
name = "invoke-grpc-client"
path = "examples/invoke/grpc/client.rs"
Expand Down
33 changes: 33 additions & 0 deletions examples/distributed-lock/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Distributed Lock

This is a simple example that demonstrates Dapr's Distributed Lock capabilities.

> **Note:** Make sure to use latest version of proto bindings.

## Running

To run this example:

1. Run the multi-app run template:

<!-- STEP
name: Run multi-app
output_match_mode: substring
match_order: sequential
expected_stdout_lines:
- '== APP - distributed-lock-example == Successfully acquired lock on: resource'
- '== APP - distributed-lock-example == Unsuccessfully acquired lock on: resource'
- '== APP - distributed-lock-example == Successfully released lock on: resource'
- '== APP - distributed-lock-example == Successfully acquired lock on: resource'
background: true
sleep: 30
timeout_seconds: 90
-->

```bash
dapr run -f .
```

<!-- END_STEP -->

2. Stop with `ctrl + c`
12 changes: 12 additions & 0 deletions examples/distributed-lock/components/local-storage.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: lockstore
spec:
type: lock.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
10 changes: 10 additions & 0 deletions examples/distributed-lock/dapr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: 1
common:
daprdLogDestination: console
apps:
- appID: distributed-lock-example
appDirPath: ./
daprGRPCPort: 35002
logLevel: debug
command: [ "cargo", "run", "--example", "distributed-lock" ]
resourcesPath: ./components
67 changes: 67 additions & 0 deletions examples/distributed-lock/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
sleep(std::time::Duration::new(2, 0)).await;
let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?;
let addr = format!("https://127.0.0.1:{}", port);

let mut client = dapr::Client::<dapr::client::TonicClient>::connect(addr).await?;

let result = client
.lock(dapr::client::TryLockRequest {
store_name: "lockstore".to_string(),
resource_id: "resource".to_string(),
lock_owner: "some-random-id".to_string(),
expiry_in_seconds: 60,
})
.await
.unwrap();

assert!(result.success);

println!("Successfully acquired lock on: resource");

let result = client
.lock(dapr::client::TryLockRequest {
store_name: "lockstore".to_string(),
resource_id: "resource".to_string(),
lock_owner: "some-random-id".to_string(),
expiry_in_seconds: 60,
})
.await
.unwrap();

assert!(!result.success);

println!("Unsuccessfully acquired lock on: resource");

let result = client
.unlock(dapr::client::UnlockRequest {
store_name: "lockstore".to_string(),
resource_id: "resource".to_string(),
lock_owner: "some-random-id".to_string(),
})
.await
.unwrap();

assert_eq!(0, result.status);

println!("Successfully released lock on: resource");

let result = client
.lock(dapr::client::TryLockRequest {
store_name: "lockstore".to_string(),
resource_id: "resource".to_string(),
lock_owner: "some-random-id".to_string(),
expiry_in_seconds: 60,
})
.await
.unwrap();

assert!(result.success);

println!("Successfully acquired lock on: resource");

Ok(())
}
40 changes: 40 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,24 @@ impl<T: DaprInterface> Client<T> {
.collect();
self.0.decrypt(requested_items).await
}

/// Distributed lock request call
///
/// # Arguments
///
/// * `request` - Request to be made, TryLockRequest
pub async fn lock(&mut self, request: TryLockRequest) -> Result<TryLockResponse, Error> {
self.0.lock(request).await
}

/// Distributed lock request call
///
/// # Arguments
///
/// * `request` - Request to be made, TryLockRequest
pub async fn unlock(&mut self, request: UnlockRequest) -> Result<UnlockResponse, Error> {
self.0.unlock(request).await
}
Comment on lines +493 to +509
Copy link
Member

@mikeee mikeee Mar 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be beneficial to have examples here in the doc comments of building and using the api call e.g.

    /// # Example
    ///
    /// ```
    /// let result = client
    ///     .try_lock(dapr::client::TryLockRequest {
    ///         store_name: "lockstore".to_string(),
    ///         resource_id: "resource".to_string(),
    ///         lock_owner: "owner".to_string(),
    ///         expiry_in_seconds: 60,
    ///     })
    ///     .await
    ///     .unwrap();
    /// ```

}

#[async_trait]
Expand Down Expand Up @@ -539,6 +557,10 @@ pub trait DaprInterface: Sized {
-> Result<Vec<StreamPayload>, Status>;

async fn decrypt(&mut self, payload: Vec<DecryptRequest>) -> Result<Vec<u8>, Status>;

async fn try_lock(&mut self, request: TryLockRequest) -> Result<TryLockResponse, Error>;

async fn unlock(&mut self, request: UnlockRequest) -> Result<UnlockResponse, Error>;
}

#[async_trait]
Expand Down Expand Up @@ -709,6 +731,13 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
}
Ok(data)
}
async fn lock(&mut self, request: TryLockRequest) -> Result<TryLockResponse, Error> {
Ok(self.try_lock_alpha1(request).await?.into_inner())
}

async fn unlock(&mut self, request: UnlockRequest) -> Result<UnlockResponse, Error> {
Ok(self.unlock_alpha1(request).await?.into_inner())
}
}

/// A request from invoking a service
Expand Down Expand Up @@ -806,6 +835,17 @@ pub type EncryptRequestOptions = crate::dapr::dapr::proto::runtime::v1::EncryptR
/// Decryption request options
pub type DecryptRequestOptions = crate::dapr::dapr::proto::runtime::v1::DecryptRequestOptions;

/// Lock response
pub type TryLockResponse = dapr_v1::TryLockResponse;

/// Lock request
pub type TryLockRequest = dapr_v1::TryLockRequest;

/// Unlock request
pub type UnlockRequest = dapr_v1::UnlockRequest;

/// Unlock response
pub type UnlockResponse = dapr_v1::UnlockResponse;
type StreamPayload = crate::dapr::dapr::proto::common::v1::StreamPayload;
impl<K> From<(K, Vec<u8>)> for common_v1::StateItem
where
Expand Down
Loading