Skip to content

Commit

Permalink
refactor: use TIdent to unify kvapi key def for MaskPolicyTableIdList…
Browse files Browse the repository at this point in the history
…Ident and DataMaskNameIdent

- Part of databendlabs#14719
  • Loading branch information
drmingdrmer committed Mar 29, 2024
1 parent ffa889f commit ed5ec2c
Show file tree
Hide file tree
Showing 17 changed files with 230 additions and 234 deletions.
47 changes: 25 additions & 22 deletions src/meta/api/src/data_mask_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ use databend_common_meta_app::app_error::DatamaskAlreadyExists;
use databend_common_meta_app::app_error::UnknownDatamask;
use databend_common_meta_app::data_mask::CreateDatamaskReply;
use databend_common_meta_app::data_mask::CreateDatamaskReq;
use databend_common_meta_app::data_mask::DataMaskNameIdent;
use databend_common_meta_app::data_mask::DatamaskId;
use databend_common_meta_app::data_mask::DatamaskMeta;
use databend_common_meta_app::data_mask::DatamaskNameIdent;
use databend_common_meta_app::data_mask::DropDatamaskReply;
use databend_common_meta_app::data_mask::DropDatamaskReq;
use databend_common_meta_app::data_mask::GetDatamaskReply;
use databend_common_meta_app::data_mask::GetDatamaskReq;
use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent;
use databend_common_meta_app::data_mask::MaskpolicyTableIdList;
use databend_common_meta_app::data_mask::MaskpolicyTableIdListKey;
use databend_common_meta_app::id_generator::IdGenerator;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::schema::TableId;
Expand Down Expand Up @@ -82,8 +82,8 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
CreateOption::Create => {
return Err(KVAppError::AppError(AppError::DatamaskAlreadyExists(
DatamaskAlreadyExists::new(
&name_key.name,
format!("create data mask: {}", req.name),
name_key.name(),
format!("create data mask: {}", req.name.display()),
),
)));
}
Expand All @@ -110,10 +110,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {

let id = fetch_id(self, IdGenerator::data_mask_id()).await?;
let id_key = DatamaskId { id };
let id_list_key = MaskpolicyTableIdListKey {
tenant: name_key.tenant.clone(),
name: name_key.name.clone(),
};
let id_list_key = MaskPolicyTableIdListIdent::new_from(name_key.clone());

debug!(
id :? =(&id_key),
Expand Down Expand Up @@ -202,8 +199,12 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {

let name_key = &req.name;

let (_id_seq, _id, _data_mask_seq, policy) =
get_data_mask_or_err(self, name_key, format!("drop_data_mask: {}", name_key)).await?;
let (_id_seq, _id, _data_mask_seq, policy) = get_data_mask_or_err(
self,
name_key,
format!("drop_data_mask: {}", name_key.display()),
)
.await?;

Ok(GetDatamaskReply { policy })
}
Expand All @@ -212,7 +213,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
/// Returns (id_seq, id, data_mask_seq, data_mask)
async fn get_data_mask_or_err(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name_key: &DatamaskNameIdent,
name_key: &DataMaskNameIdent,
msg: impl Display,
) -> Result<(u64, u64, u64, DatamaskMeta), KVAppError> {
let (id_seq, id) = get_u64_value(kv_api, name_key).await?;
Expand All @@ -237,14 +238,14 @@ async fn get_data_mask_or_err(
/// Otherwise returns UnknownDatamask error
pub fn data_mask_has_to_exist(
seq: u64,
name_ident: &DatamaskNameIdent,
name_ident: &DataMaskNameIdent,
msg: impl Display,
) -> Result<(), KVAppError> {
if seq == 0 {
debug!(seq = seq, name_ident :? =(name_ident); "data mask does not exist");

Err(KVAppError::AppError(AppError::UnknownDatamask(
UnknownDatamask::new(&name_ident.name, format!("{}: {}", msg, name_ident)),
UnknownDatamask::new(name_ident.name(), format!("{}: {}", msg, name_ident.name())),
)))
} else {
Ok(())
Expand All @@ -253,14 +254,12 @@ pub fn data_mask_has_to_exist(

async fn clear_table_column_mask_policy(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name_ident: &DatamaskNameIdent,
name_ident: &DataMaskNameIdent,
condition: &mut Vec<TxnCondition>,
if_then: &mut Vec<TxnOp>,
) -> Result<(), KVAppError> {
let id_list_key = MaskpolicyTableIdListKey {
tenant: name_ident.tenant.clone(),
name: name_ident.name.clone(),
};
let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone());

let (id_list_seq, id_list_opt): (_, Option<MaskpolicyTableIdList>) =
get_pb_value(kv_api, &id_list_key).await?;
if let Some(id_list) = id_list_opt {
Expand All @@ -277,7 +276,7 @@ async fn clear_table_column_mask_policy(
if let Some(column_mask_policy) = table_meta.column_mask_policy {
let new_column_mask_policy = column_mask_policy
.into_iter()
.filter(|(_, name)| name != &name_ident.name)
.filter(|(_, name)| name != name_ident.name())
.collect();

table_meta.column_mask_policy = Some(new_column_mask_policy);
Expand All @@ -294,15 +293,19 @@ async fn clear_table_column_mask_policy(

async fn construct_drop_mask_policy_operations(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name_key: &DatamaskNameIdent,
name_key: &DataMaskNameIdent,
drop_if_exists: bool,
if_delete: bool,
ctx: &str,
condition: &mut Vec<TxnCondition>,
if_then: &mut Vec<TxnOp>,
) -> Result<(), KVAppError> {
let result =
get_data_mask_or_err(kv_api, name_key, format!("drop_data_mask: {}", name_key)).await;
let result = get_data_mask_or_err(
kv_api,
name_key,
format!("drop_data_mask: {}", name_key.display()),
)
.await;

let (id_seq, id, data_mask_seq, _) = match result {
Ok((id_seq, id, data_mask_seq, meta)) => (id_seq, id, data_mask_seq, meta),
Expand Down
12 changes: 7 additions & 5 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ use databend_common_meta_app::app_error::ViewAlreadyExists;
use databend_common_meta_app::app_error::VirtualColumnAlreadyExists;
use databend_common_meta_app::app_error::WrongShare;
use databend_common_meta_app::app_error::WrongShareObject;
use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent;
use databend_common_meta_app::data_mask::MaskpolicyTableIdList;
use databend_common_meta_app::data_mask::MaskpolicyTableIdListKey;
use databend_common_meta_app::id_generator::IdGenerator;
use databend_common_meta_app::schema::CatalogId;
use databend_common_meta_app::schema::CatalogIdToName;
Expand Down Expand Up @@ -5526,11 +5526,13 @@ async fn update_mask_policy(
tenant: String,
table_id: u64,
) -> Result<(), KVAppError> {
let tenant = Tenant::new_or_err(&tenant, func_name!())?;

/// Fetch and update the table id list with `f`, and fill in the txn preconditions and operations.
async fn update_table_ids(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
txn_req: &mut TxnRequest,
key: MaskpolicyTableIdListKey,
key: MaskPolicyTableIdListIdent,
f: impl FnOnce(&mut BTreeSet<u64>),
) -> Result<(), KVAppError> {
let (id_list_seq, id_list_opt): (_, Option<MaskpolicyTableIdList>) =
Expand All @@ -5554,7 +5556,7 @@ async fn update_mask_policy(
update_table_ids(
kv_api,
txn_req,
MaskpolicyTableIdListKey::new(&tenant, new_mask_name),
MaskPolicyTableIdListIdent::new(tenant.clone(), new_mask_name),
|list: &mut BTreeSet<u64>| {
list.insert(table_id);
},
Expand All @@ -5565,7 +5567,7 @@ async fn update_mask_policy(
update_table_ids(
kv_api,
txn_req,
MaskpolicyTableIdListKey::new(&tenant, old),
MaskPolicyTableIdListIdent::new(tenant.clone(), old),
|list: &mut BTreeSet<u64>| {
list.remove(&table_id);
},
Expand All @@ -5577,7 +5579,7 @@ async fn update_mask_policy(
update_table_ids(
kv_api,
txn_req,
MaskpolicyTableIdListKey::new(&tenant, mask_name),
MaskPolicyTableIdListIdent::new(tenant.clone(), mask_name),
|list: &mut BTreeSet<u64>| {
list.remove(&table_id);
},
Expand Down
Loading

0 comments on commit ed5ec2c

Please sign in to comment.