Skip to content

feat: add table lock manager to fix table lock cannot be released #13434

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 50 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
54ba261
Add table lock manager
zhyass Oct 25, 2023
c50d730
make lint
zhyass Oct 26, 2023
677c925
rename to table lock holder
zhyass Oct 26, 2023
0ac750f
Add valid check
zhyass Oct 26, 2023
b581da4
fix
zhyass Oct 26, 2023
50959d8
add metrics
zhyass Oct 26, 2023
1ec38fc
Merge remote-tracking branch 'upstream/main' into lock_manager
zhyass Oct 26, 2023
f1c8878
add catalog
zhyass Oct 26, 2023
e459e8a
add test
zhyass Oct 26, 2023
2d647bd
Merge branch 'main' into lock_manager
zhyass Oct 27, 2023
10bffdf
fix review comment
zhyass Oct 27, 2023
9349628
remove unused
zhyass Oct 27, 2023
fce0847
fix table lock release
zhyass Oct 27, 2023
131fe29
add check lock
zhyass Oct 27, 2023
83de4e7
Merge remote-tracking branch 'upstream/main' into lock_manager
zhyass Oct 27, 2023
ecc1e84
remove unused codes
zhyass Oct 27, 2023
d8884a1
update
zhyass Oct 27, 2023
c5c308d
fix
zhyass Oct 28, 2023
60801ab
Merge remote-tracking branch 'upstream/main' into lock_manager
zhyass Oct 28, 2023
2d6aa6b
remove unused code
zhyass Oct 28, 2023
a5bbc11
make lint
zhyass Oct 28, 2023
d313a87
Add table lock meta
zhyass Oct 30, 2023
aaa01ea
Merge remote-tracking branch 'upstream/main' into lock_manager
zhyass Oct 30, 2023
e80176c
add setting acquire_lock_timeout_secs
zhyass Oct 30, 2023
56d822a
Merge remote-tracking branch 'upstream/main' into lock_manager
zhyass Oct 30, 2023
3b8edd5
rename metrics
zhyass Oct 30, 2023
31f3c37
fix
zhyass Oct 30, 2023
5103a6c
add user in table lock meta
zhyass Oct 30, 2023
4e28b79
fix comment
zhyass Oct 31, 2023
314871e
Merge remote-tracking branch 'upstream/main' into lock_manager
zhyass Oct 31, 2023
54dd426
fix test
zhyass Oct 31, 2023
458c7f9
rename
zhyass Oct 31, 2023
d9742a0
update
zhyass Oct 31, 2023
b9d12da
remove lockrequest
zhyass Oct 31, 2023
2010608
Merge remote-tracking branch 'upstream/main' into lock_manager
zhyass Oct 31, 2023
b95cc3f
rename
zhyass Oct 31, 2023
c5c926b
Merge remote-tracking branch 'upstream/main' into lock_manager
zhyass Oct 31, 2023
5dd3047
fix lint
zhyass Oct 31, 2023
9de83a7
fix review comment
zhyass Nov 1, 2023
eea2a3a
Merge remote-tracking branch 'upstream/main' into lock_manager
zhyass Nov 1, 2023
15ef732
fix test
zhyass Nov 1, 2023
f40da10
Merge branch 'main' into lock_manager
zhyass Nov 1, 2023
1b2d7ea
Update src/meta/proto-conv/tests/it/proto_conv.rs
zhyass Nov 1, 2023
6ebb346
fix review comment
zhyass Nov 1, 2023
f08af2d
add retry for lock holder
zhyass Nov 2, 2023
c89889f
Merge branch 'main' into lock_manager
zhyass Nov 2, 2023
1173a2c
update
zhyass Nov 2, 2023
04fec9d
Merge branch 'main' into lock_manager
zhyass Nov 2, 2023
a9186dd
Merge branch 'main' into lock_manager
zhyass Nov 2, 2023
01ea4d4
Merge branch 'main' into lock_manager
zhyass Nov 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 5 additions & 10 deletions src/query/catalog/src/catalog/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ use common_meta_app::schema::UpsertTableOptionReply;
use common_meta_app::schema::UpsertTableOptionReq;
use common_meta_app::schema::VirtualColumnMeta;
use common_meta_types::MetaId;
use common_pipeline_core::TableLockReq;
use dyn_clone::DynClone;

use crate::database::Database;
Expand Down Expand Up @@ -259,22 +260,16 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
req: TruncateTableReq,
) -> Result<TruncateTableReply>;

async fn list_table_lock_revs(&self, table_id: u64) -> Result<Vec<u64>>;
async fn list_table_lock_revs(&self, req: Box<dyn TableLockReq>) -> Result<Vec<u64>>;

async fn create_table_lock_rev(
&self,
expire_secs: u64,
table_info: &TableInfo,
req: Box<dyn TableLockReq>,
) -> Result<CreateTableLockRevReply>;

async fn extend_table_lock_rev(
&self,
expire_secs: u64,
table_info: &TableInfo,
revision: u64,
) -> Result<()>;
async fn extend_table_lock_rev(&self, req: Box<dyn TableLockReq>) -> Result<()>;

async fn delete_table_lock_rev(&self, table_info: &TableInfo, revision: u64) -> Result<()>;
async fn delete_table_lock_rev(&self, req: Box<dyn TableLockReq>) -> Result<()>;

/// Table function

Expand Down
6 changes: 3 additions & 3 deletions src/query/ee-features/table-lock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ common-catalog = { path = "../../catalog" }
common-exception = { path = "../../../common/exception" }
common-license = { path = "../../../common/license" }
common-meta-app = { path = "../../../meta/app" }
common-meta-kvapi = { path = "../../../meta/kvapi" }
common-pipeline-core = { path = "../../pipeline/core" }

async-backtrace = { workspace = true }
async-trait = "0.1.57"
futures = "0.3.24"
log = { workspace = true }
rand = "0.8.5"
chrono = { workspace = true }

[build-dependencies]

Expand Down
14 changes: 9 additions & 5 deletions src/query/ee-features/table-lock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod table_lock_handler;
mod table_lock_heartbeat;
mod table_level_lock;
mod table_lock_manager;

pub use table_lock_handler::TableLockHandler;
pub use table_lock_handler::TableLockHandlerWrapper;
pub use table_lock_heartbeat::TableLockHeartbeat;
pub use table_level_lock::CreateTableLockReq;
pub use table_level_lock::DeleteTableLockReq;
pub use table_level_lock::ExtendTableLockReq;
pub use table_level_lock::ListTableLockReq;
pub use table_level_lock::TableLevelLock;
pub use table_lock_manager::TableLockManager;
pub use table_lock_manager::TableLockManagerWrapper;
104 changes: 104 additions & 0 deletions src/query/ee-features/table-lock/src/table_level_lock/lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_meta_app::schema::TableLockKey;
use common_meta_kvapi::kvapi::Key;
use common_pipeline_core::TableLock;
use common_pipeline_core::TableLockLevel;
use common_pipeline_core::TableLockReq;

use crate::table_level_lock::req::*;
use crate::TableLockManagerWrapper;

pub struct TableLevelLock {
lock_mgr: Arc<TableLockManagerWrapper>,
table_id: u64,
revision: u64,
}

impl TableLevelLock {
pub fn create(lock_mgr: Arc<TableLockManagerWrapper>, table_id: u64) -> Self {
TableLevelLock {
lock_mgr,
table_id,
revision: 0,
}
}
}

#[async_trait::async_trait]
impl TableLock for TableLevelLock {
fn level(&self) -> TableLockLevel {
TableLockLevel::Table
}

fn table_id(&self) -> u64 {
self.table_id
}

fn set_revision(&mut self, revision: u64) {
self.revision = revision;
}

fn revision(&self) -> u64 {
self.revision
}

fn watch_delete_key(&self, revision: u64) -> String {
let lock_key = TableLockKey {
table_id: self.table_id,
revision,
};
lock_key.to_string_key()
}

fn create_table_lock_req(&self, expire_secs: u64) -> Box<dyn TableLockReq> {
Box::new(CreateTableLockReq {
table_id: self.table_id,
expire_secs,
})
}

fn extend_table_lock_req(&self, expire_secs: u64) -> Box<dyn TableLockReq> {
Box::new(ExtendTableLockReq {
table_id: self.table_id,
expire_secs,
revision: self.revision,
})
}

fn delete_table_lock_req(&self) -> Box<dyn TableLockReq> {
Box::new(DeleteTableLockReq {
table_id: self.table_id,
revision: self.revision,
})
}

fn list_table_lock_req(&self) -> Box<dyn TableLockReq> {
Box::new(ListTableLockReq {
table_id: self.table_id,
})
}
}

impl Drop for TableLevelLock {
fn drop(&mut self) {
let revision = self.revision();
if revision > 0 {
self.lock_mgr.unlock(revision);
}
}
}
22 changes: 22 additions & 0 deletions src/query/ee-features/table-lock/src/table_level_lock/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod lock;
mod req;

pub use lock::TableLevelLock;
pub use req::CreateTableLockReq;
pub use req::DeleteTableLockReq;
pub use req::ExtendTableLockReq;
pub use req::ListTableLockReq;
122 changes: 122 additions & 0 deletions src/query/ee-features/table-lock/src/table_level_lock/req.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;

use chrono::Utc;
use common_meta_app::schema::CreateTableLockRevReq;
use common_meta_app::schema::DeleteTableLockRevReq;
use common_meta_app::schema::ExtendTableLockRevReq;
use common_meta_app::schema::ListTableLockRevReq;
use common_pipeline_core::TableLockReq;

#[derive(Clone)]
pub struct ListTableLockReq {
pub table_id: u64,
}

impl TableLockReq for ListTableLockReq {
fn as_any(&self) -> &dyn Any {
self
}

fn clone_self(&self) -> Box<dyn TableLockReq> {
Box::new(self.clone())
}
}

impl From<&ListTableLockReq> for ListTableLockRevReq {
fn from(value: &ListTableLockReq) -> Self {
ListTableLockRevReq {
table_id: value.table_id,
}
}
}

#[derive(Clone)]
pub struct CreateTableLockReq {
pub table_id: u64,
pub expire_secs: u64,
}

impl TableLockReq for CreateTableLockReq {
fn as_any(&self) -> &dyn Any {
self
}

fn clone_self(&self) -> Box<dyn TableLockReq> {
Box::new(self.clone())
}
}

impl From<&CreateTableLockReq> for CreateTableLockRevReq {
fn from(value: &CreateTableLockReq) -> Self {
CreateTableLockRevReq {
table_id: value.table_id,
expire_at: Utc::now().timestamp() as u64 + value.expire_secs,
}
}
}

#[derive(Clone)]
pub struct ExtendTableLockReq {
pub table_id: u64,
pub expire_secs: u64,
pub revision: u64,
}

impl TableLockReq for ExtendTableLockReq {
fn as_any(&self) -> &dyn Any {
self
}

fn clone_self(&self) -> Box<dyn TableLockReq> {
Box::new(self.clone())
}
}

impl From<&ExtendTableLockReq> for ExtendTableLockRevReq {
fn from(value: &ExtendTableLockReq) -> Self {
ExtendTableLockRevReq {
table_id: value.table_id,
expire_at: Utc::now().timestamp() as u64 + value.expire_secs,
revision: value.revision,
}
}
}

#[derive(Clone)]
pub struct DeleteTableLockReq {
pub table_id: u64,
pub revision: u64,
}

impl TableLockReq for DeleteTableLockReq {
fn as_any(&self) -> &dyn Any {
self
}

fn clone_self(&self) -> Box<dyn TableLockReq> {
Box::new(self.clone())
}
}

impl From<&DeleteTableLockReq> for DeleteTableLockRevReq {
fn from(value: &DeleteTableLockReq) -> Self {
DeleteTableLockRevReq {
table_id: value.table_id,
revision: value.revision,
}
}
}
Loading