Skip to content

Commit

Permalink
Add the lock operations (datenlord#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jicheng Shi committed Mar 14, 2021
1 parent 16cc8c6 commit 5f92235
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 8 deletions.
25 changes: 19 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ use std::net::SocketAddr;

use grpcio::{Channel, ChannelBuilder, EnvBuilder, LbPolicy};

use crate::protos::rpc_grpc::{AuthClient, KvClient, LeaseClient, WatchClient};
use crate::protos::{
lock_grpc::LockClient,
rpc_grpc::{AuthClient, KvClient, LeaseClient, WatchClient},
};
use crate::watch::EtcdWatchResponse;
use crate::{Auth, KeyRange, Kv, Lease, Result, Watch};
use crate::{Auth, KeyRange, Kv, Lease, Lock, Result, Watch};

/// Config for establishing etcd client.
pub struct ClientConfig {
Expand Down Expand Up @@ -41,11 +44,13 @@ pub struct Inner {
watch_client: Watch,
/// Lease client for lease operations.
lease_client: Lease,
/// Lock client for lock operations.
lock_client: Lock,
}

impl Client {
/// Get grpc channel.
fn get_channel(cfg: &ClientConfig) -> Result<Channel> {
fn get_channel(cfg: &ClientConfig) -> Channel {
// let mut endpoints = Vec::with_capacity(cfg.endpoints.len());
// for e in cfg.endpoints.iter() {
// let c = Channel::from_shared(e.to_owned())?;
Expand Down Expand Up @@ -97,7 +102,7 @@ impl Client {
let ch = ChannelBuilder::new(env)
.load_balancing_policy(LbPolicy::RoundRobin)
.connect(end_points.as_str());
Ok(ch)
ch
}

/// Connects to etcd generate auth token.
Expand Down Expand Up @@ -128,7 +133,7 @@ impl Client {
/// Will returns `Err` if failed to contact with given endpoints or authentication failed.
#[inline]
pub async fn connect(cfg: ClientConfig) -> Result<Self> {
let channel = Self::get_channel(&cfg)?;
let channel = Self::get_channel(&cfg);
let etcd_watch_client = WatchClient::new(channel.clone());

Ok(Self {
Expand All @@ -142,7 +147,8 @@ impl Client {
cfg.cache_enable,
),
watch_client: Watch::new(etcd_watch_client),
lease_client: Lease::new(LeaseClient::new(channel)),
lease_client: Lease::new(LeaseClient::new(channel.clone())),
lock_client: Lock::new(LockClient::new(channel)),
}),
})
}
Expand All @@ -161,6 +167,13 @@ impl Client {
self.inner.kv_client.clone()
}

/// Get a lock client.
#[inline]
#[must_use]
pub fn lock(&self) -> Lock {
self.inner.lock_client.clone()
}

/// Gets a watch client.
#[inline]
#[must_use]
Expand Down
64 changes: 62 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ pub use lease::{
EtcdLeaseGrantRequest, EtcdLeaseGrantResponse, EtcdLeaseKeepAliveRequest,
EtcdLeaseKeepAliveResponse, EtcdLeaseRevokeRequest, EtcdLeaseRevokeResponse, Lease,
};
pub use lock::Lock;
pub use lock::{EtcdLockRequest, EtcdLockResponse, EtcdUnlockRequest, EtcdUnlockResponse};
pub use response_header::ResponseHeader;
pub use utilities::OverflowArithmetic;
pub use watch::{EtcdWatchRequest, EtcdWatchResponse, Event, EventType, Watch};
Expand All @@ -113,6 +115,8 @@ mod kv;
mod lazy;
/// Lease mod for lease operations.
mod lease;
/// Lock mod for lock operations.
mod lock;
/// Etcd client request and response protos
mod protos;
/// Etcd API response header
Expand All @@ -129,10 +133,13 @@ mod tests {
use anyhow::Context;
use async_compat::Compat;
use std::collections::HashMap;
use std::time::Duration;
use std::time::SystemTime;
use utilities::Cast;

const DEFAULT_ETCD_ENDPOINT1_FOR_TEST: &str = "127.0.0.1:2379";
const DEFAULT_ETCD_ENDPOINT2_FOR_TEST: &str = "127.0.0.1:2380";
// Should not connect 2380 port, which will cause lock operation error.
//const DEFAULT_ETCD_ENDPOINT2_FOR_TEST: &str = "127.0.0.1:2380";

#[test]
fn test_all() -> anyhow::Result<()> {
Expand All @@ -141,11 +148,64 @@ mod tests {
test_transaction()
.await
.context("test etcd transaction operations")?;
test_lock().await.context("test etcd lock operations")?;
Ok::<(), anyhow::Error>(())
}))?;
Ok(())
}

async fn test_lock() -> anyhow::Result<()> {
// 1. Lock on "ABC"
let client = build_etcd_client().await?;
let lease_id = client
.lease()
.grant(EtcdLeaseGrantRequest::new(Duration::from_secs(10)))
.await?
.id();
let lease_id_2 = client
.lease()
.grant(EtcdLeaseGrantRequest::new(Duration::from_secs(10)))
.await?
.id();
let key_bytes = client
.lock()
.lock(EtcdLockRequest::new(b"ABC".to_vec(), lease_id))
.await?
.take_key();

// 2. Wait until the first lock released automatically
let time1 = SystemTime::now();
let key_bytes2 = client
.lock()
.lock(EtcdLockRequest::new(b"ABC".to_vec(), lease_id_2))
.await?
.take_key();
let time2 = SystemTime::now();

// wait a least 5 seconds (the first lock has a 10s lease)
assert!(time2.duration_since(time1)?.as_secs() > 5);

let key_slice = key_bytes.as_slice();
assert_eq!(
key_slice
.get(..3)
.unwrap_or_else(|| panic!("key slice get first 3 bytes failed")),
b"ABC".to_vec()
);

// 3. Release all locks
client
.lock()
.unlock(EtcdUnlockRequest::new(key_bytes))
.await?;

client
.lock()
.unlock(EtcdUnlockRequest::new(key_bytes2))
.await?;
Ok(())
}

async fn test_transaction() -> anyhow::Result<()> {
let client = build_etcd_client().await?;
test_compose(&client).await?;
Expand Down Expand Up @@ -357,7 +417,7 @@ mod tests {
let client = Client::connect(ClientConfig {
endpoints: vec![
DEFAULT_ETCD_ENDPOINT1_FOR_TEST.to_owned(),
DEFAULT_ETCD_ENDPOINT2_FOR_TEST.to_owned(),
//DEFAULT_ETCD_ENDPOINT2_FOR_TEST.to_owned(),
],
auth: None,
cache_size: 64,
Expand Down
47 changes: 47 additions & 0 deletions src/lock/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/// The mod of lock release operations
mod release;
/// The mod of lock require operations
mod require;

use crate::protos::lock_grpc::LockClient;
use crate::Result as Res;
pub use release::{EtcdUnlockRequest, EtcdUnlockResponse};
pub use require::{EtcdLockRequest, EtcdLockResponse};

/// Lock client.
#[derive(Clone)]
pub struct Lock {
/// Etcd Lock client.
client: LockClient,
}

impl Lock {
/// Creates a new `LockClient`.
///
/// This method should only be called within etcd client.
pub(crate) const fn new(client: LockClient) -> Self {
Self { client }
}

/// Performs a lock operation.
///
/// # Errors
///
/// Will return `Err` if RPC call is failed.
#[inline]
pub async fn lock(&mut self, req: EtcdLockRequest) -> Res<EtcdLockResponse> {
let resp = self.client.lock_async(&req.into())?.await?;
Ok(From::from(resp))
}

/// Performs a unlock operation.
///
/// # Errors
///
/// Will return `Err` if RPC call is failed.
#[inline]
pub async fn unlock(&mut self, req: EtcdUnlockRequest) -> Res<EtcdUnlockResponse> {
let resp = self.client.unlock_async(&req.into())?.await?;
Ok(From::from(resp))
}
}
64 changes: 64 additions & 0 deletions src/lock/release.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use crate::protos::lock::{UnlockRequest, UnlockResponse};
use crate::ResponseHeader;

/// Request for requiring a lock
pub struct EtcdUnlockRequest {
/// Etcd lock request
proto: UnlockRequest,
}

impl EtcdUnlockRequest {
/// Creates a new `EtcdUnlockRequest` for requiring a lock
#[inline]
pub fn new<T>(key: T) -> Self
where
T: Into<Vec<u8>>,
{
let lock_request = UnlockRequest {
key: key.into(),
..UnlockRequest::default()
};

Self {
proto: lock_request,
}
}

/// Get the name from `UnlockRequest`
#[inline]
pub fn get_key(&self) -> Vec<u8> {
self.proto.get_key().to_vec()
}
}

impl Into<UnlockRequest> for EtcdUnlockRequest {
#[inline]
fn into(self) -> UnlockRequest {
self.proto
}
}

/// Response for requring a lock.
#[derive(Debug)]
pub struct EtcdUnlockResponse {
/// Etcd lock response
proto: UnlockResponse,
}

impl EtcdUnlockResponse {
/// Takes the header out of response, leaving a `None` in its place.
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
match self.proto.header.take() {
Some(header) => Some(From::from(header)),
None => None,
}
}
}

impl From<UnlockResponse> for EtcdUnlockResponse {
#[inline]
fn from(resp: UnlockResponse) -> Self {
Self { proto: resp }
}
}
78 changes: 78 additions & 0 deletions src/lock/require.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use crate::protos::lock::{LockRequest, LockResponse};
use crate::ResponseHeader;
use utilities::Cast;

/// Request for requiring a lock
pub struct EtcdLockRequest {
/// Etcd lock request
proto: LockRequest,
}

impl EtcdLockRequest {
/// Creates a new `EtcdLockRequest` for requiring a lock
#[inline]
pub fn new<T>(name: T, lease: u64) -> Self
where
T: Into<Vec<u8>>,
{
let lock_request = LockRequest {
name: name.into(),
lease: lease.cast(),
..LockRequest::default()
};

Self {
proto: lock_request,
}
}

/// Get the name from `LockRequest`
#[inline]
pub fn get_name(&self) -> Vec<u8> {
self.proto.get_name().to_vec()
}

/// Get the name from `LockRequest`
#[inline]
pub fn get_lease(&self) -> u64 {
self.proto.get_lease().cast()
}
}

impl Into<LockRequest> for EtcdLockRequest {
#[inline]
fn into(self) -> LockRequest {
self.proto
}
}

/// Response for requring a lock.
#[derive(Debug)]
pub struct EtcdLockResponse {
/// Etcd lock response
proto: LockResponse,
}

impl EtcdLockResponse {
/// Takes the header out of response, leaving a `None` in its place.
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
match self.proto.header.take() {
Some(header) => Some(From::from(header)),
None => None,
}
}

/// Take the key out of response, leaving a empty Vec in its place.
#[inline]
pub fn take_key(&mut self) -> Vec<u8> {
self.proto.take_key()
}
}

impl From<LockResponse> for EtcdLockResponse {
#[inline]
fn from(resp: LockResponse) -> Self {
Self { proto: resp }
}
}

0 comments on commit 5f92235

Please sign in to comment.