Skip to content

Commit

Permalink
Add table lock manager
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Oct 25, 2023
1 parent dffe113 commit 9951426
Show file tree
Hide file tree
Showing 28 changed files with 542 additions and 285 deletions.
7 changes: 6 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 5 additions & 10 deletions src/query/catalog/src/catalog/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ use common_meta_app::schema::UpsertTableOptionReply;
use common_meta_app::schema::UpsertTableOptionReq;
use common_meta_app::schema::VirtualColumnMeta;
use common_meta_types::MetaId;
use common_pipeline_core::table_lock::TableLockReq;
use dyn_clone::DynClone;

use crate::database::Database;
Expand Down Expand Up @@ -259,22 +260,16 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
req: TruncateTableReq,
) -> Result<TruncateTableReply>;

async fn list_table_lock_revs(&self, table_id: u64) -> Result<Vec<u64>>;
async fn list_table_lock_revs(&self, req: Box<dyn TableLockReq>) -> Result<Vec<u64>>;

async fn create_table_lock_rev(
&self,
expire_secs: u64,
table_info: &TableInfo,
req: Box<dyn TableLockReq>,
) -> Result<CreateTableLockRevReply>;

async fn extend_table_lock_rev(
&self,
expire_secs: u64,
table_info: &TableInfo,
revision: u64,
) -> Result<()>;
async fn extend_table_lock_rev(&self, req: Box<dyn TableLockReq>) -> Result<()>;

async fn delete_table_lock_rev(&self, table_info: &TableInfo, revision: u64) -> Result<()>;
async fn delete_table_lock_rev(&self, req: Box<dyn TableLockReq>) -> Result<()>;

/// Table function
Expand Down
5 changes: 5 additions & 0 deletions src/query/ee-features/table-lock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ common-catalog = { path = "../../catalog" }
common-exception = { path = "../../../common/exception" }
common-license = { path = "../../../common/license" }
common-meta-app = { path = "../../../meta/app" }
common-meta-kvapi = { path = "../../../meta/kvapi" }
common-pipeline-core = { path = "../../pipeline/core" }

async-backtrace = { workspace = true }
async-channel = "1.7.1"
async-trait = "0.1.57"
chrono = { workspace = true }
futures = "0.3.24"
log = { workspace = true }
parking_lot = "0.12.1"
rand = "0.8.5"

[build-dependencies]
Expand Down
5 changes: 4 additions & 1 deletion src/query/ee-features/table-lock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod table_level_lock;
mod table_lock_handler;
mod table_lock_heartbeat;
mod table_lock_manager;

pub use table_level_lock::*;
pub use table_lock_handler::TableLockHandler;
pub use table_lock_handler::TableLockHandlerWrapper;
pub use table_lock_heartbeat::TableLockHeartbeat;
pub use table_lock_manager::TableLockManager;
189 changes: 189 additions & 0 deletions src/query/ee-features/table-lock/src/table_level_lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::sync::Arc;

use chrono::Utc;
use common_meta_app::schema::CreateTableLockRevReq;
use common_meta_app::schema::DeleteTableLockRevReq;
use common_meta_app::schema::ExtendTableLockRevReq;
use common_meta_app::schema::ListTableLockRevReq;
use common_meta_app::schema::TableLockKey;
use common_meta_kvapi::kvapi::Key;
use common_pipeline_core::table_lock::TableLockReq;
use common_pipeline_core::TableLock;

use crate::TableLockManager;

pub struct TableLevelLock {
lock_mgr: Arc<TableLockManager>,
table_id: u64,
}

impl TableLevelLock {
pub fn create(lock_mgr: Arc<TableLockManager>, table_id: u64) -> Arc<dyn TableLock> {
Arc::new(TableLevelLock { lock_mgr, table_id })
}
}

#[async_trait::async_trait]
impl TableLock for TableLevelLock {
fn lock_id(&self) -> String {
self.table_id.to_string()
}

fn watch_key(&self, revision: u64) -> String {
// Get the previous revision, watch the delete event.
let lock_key = TableLockKey {
table_id: self.table_id,
revision,
};
lock_key.to_string_key()
}

fn create_table_lock_req(&self, expire_secs: u64) -> Box<dyn TableLockReq> {
Box::new(CreateTableLockReq {
table_id: self.table_id,
expire_secs,
})
}

fn extend_table_lock_req(&self, expire_secs: u64, revision: u64) -> Box<dyn TableLockReq> {
Box::new(ExtendTableLockReq {
table_id: self.table_id,
expire_secs,
revision,
})
}

fn delete_table_lock_req(&self, revision: u64) -> Box<dyn TableLockReq> {
Box::new(DeleteTableLockReq {
table_id: self.table_id,
revision,
})
}

fn list_table_lock_req(&self) -> Box<dyn TableLockReq> {
Box::new(ListTableLockReq {
table_id: self.table_id,
})
}
}

impl Drop for TableLevelLock {
fn drop(&mut self) {
self.lock_mgr.unlock(self.lock_id());
}
}

#[derive(Clone)]
pub struct ListTableLockReq {
pub table_id: u64,
}

impl TableLockReq for ListTableLockReq {
fn as_any(&self) -> &dyn Any {
self
}

fn clone_self(&self) -> Box<dyn TableLockReq> {
Box::new(self.clone())
}
}

impl From<&ListTableLockReq> for ListTableLockRevReq {
fn from(value: &ListTableLockReq) -> Self {
ListTableLockRevReq {
table_id: value.table_id,
}
}
}

#[derive(Clone)]
pub struct CreateTableLockReq {
pub table_id: u64,
pub expire_secs: u64,
}

impl TableLockReq for CreateTableLockReq {
fn as_any(&self) -> &dyn Any {
self
}

fn clone_self(&self) -> Box<dyn TableLockReq> {
Box::new(self.clone())
}
}

impl From<&CreateTableLockReq> for CreateTableLockRevReq {
fn from(value: &CreateTableLockReq) -> Self {
CreateTableLockRevReq {
table_id: value.table_id,
expire_at: Utc::now().timestamp() as u64 + value.expire_secs,
}
}
}

#[derive(Clone)]
pub struct ExtendTableLockReq {
pub table_id: u64,
pub expire_secs: u64,
pub revision: u64,
}

impl TableLockReq for ExtendTableLockReq {
fn as_any(&self) -> &dyn Any {
self
}

fn clone_self(&self) -> Box<dyn TableLockReq> {
Box::new(self.clone())
}
}

impl From<&ExtendTableLockReq> for ExtendTableLockRevReq {
fn from(value: &ExtendTableLockReq) -> Self {
ExtendTableLockRevReq {
table_id: value.table_id,
expire_at: Utc::now().timestamp() as u64 + value.expire_secs,
revision: value.revision,
}
}
}

#[derive(Clone)]
pub struct DeleteTableLockReq {
pub table_id: u64,
pub revision: u64,
}

impl TableLockReq for DeleteTableLockReq {
fn as_any(&self) -> &dyn Any {
self
}

fn clone_self(&self) -> Box<dyn TableLockReq> {
Box::new(self.clone())
}
}

impl From<&DeleteTableLockReq> for DeleteTableLockRevReq {
fn from(value: &DeleteTableLockReq) -> Self {
DeleteTableLockRevReq {
table_id: value.table_id,
revision: value.revision,
}
}
}
52 changes: 3 additions & 49 deletions src/query/ee-features/table-lock/src/table_lock_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@

use std::sync::Arc;

use common_base::base::GlobalInstance;
use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_license::license::Feature;
use common_license::license_manager::get_license_manager;
use common_meta_app::schema::TableInfo;
use log::info;
use common_pipeline_core::TableLock;

use crate::TableLockHeartbeat;

Expand All @@ -29,7 +25,7 @@ pub trait TableLockHandler: Sync + Send {
async fn try_lock(
&self,
ctx: Arc<dyn TableContext>,
table_info: TableInfo,
lock: Arc<dyn TableLock>,
) -> Result<TableLockHeartbeat>;
}

Expand All @@ -41,50 +37,8 @@ impl TableLockHandler for DummyTableLock {
async fn try_lock(
&self,
_ctx: Arc<dyn TableContext>,
_table_info: TableInfo,
_lock: Arc<dyn TableLock>,
) -> Result<TableLockHeartbeat> {
Ok(TableLockHeartbeat::default())
}
}

pub struct TableLockHandlerWrapper {
handler: Box<dyn TableLockHandler>,
}

impl TableLockHandlerWrapper {
pub fn new(handler: Box<dyn TableLockHandler>) -> Self {
Self { handler }
}

#[async_backtrace::framed]
pub async fn try_lock(
&self,
ctx: Arc<dyn TableContext>,
table_info: TableInfo,
) -> Result<TableLockHeartbeat> {
self.handler.try_lock(ctx, table_info).await
}

pub fn instance(ctx: Arc<dyn TableContext>) -> Arc<TableLockHandlerWrapper> {
let enabled_table_lock = ctx.get_settings().get_enable_table_lock().unwrap_or(false);

info!("Table lock enabled : [{}]", enabled_table_lock);

let dummy = Arc::new(TableLockHandlerWrapper::new(Box::new(DummyTableLock {})));

if !enabled_table_lock {
// dummy lock does nothing
return dummy;
}

let enterprise_enabled = get_license_manager()
.manager
.check_enterprise_enabled(ctx.get_license_key(), Feature::TableLock)
.is_ok();
if enterprise_enabled {
GlobalInstance::get()
} else {
dummy
}
}
}
Loading

0 comments on commit 9951426

Please sign in to comment.