Skip to content

fix: fix get db without DbIdList key #14059

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 121 additions & 1 deletion src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::cmp::min;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Display;
use std::sync::Arc;

Expand Down Expand Up @@ -485,6 +487,28 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

if_then.push(txn_op_put(&db_id_key, serialize_struct(&db_meta)?)); // (db_id) -> db_meta
}

// add DbIdListKey if not exists
let dbid_idlist = DbIdListKey {
tenant: tenant_dbname.tenant.clone(),
db_name: tenant_dbname.db_name.clone(),
};
let (db_id_list_seq, db_id_list_opt): (_, Option<DbIdList>) =
get_pb_value(self, &dbid_idlist).await?;

if db_id_list_seq == 0 || db_id_list_opt.is_none() {
warn!(
"drop db:{:?}, db_id:{:?} has no DbIdListKey",
tenant_dbname, db_id
);

let mut db_id_list = DbIdList::new();
db_id_list.append(db_id);

condition.push(txn_cond_seq(&dbid_idlist, Eq, db_id_list_seq));
// _fd_db_id_list/<tenant>/<db_name> -> db_id_list
if_then.push(txn_op_put(&dbid_idlist, serialize_struct(&db_id_list)?));
};
}

let txn_req = TxnRequest {
Expand Down Expand Up @@ -795,7 +819,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

// List tables by tenant, db_id, table_name.
let dbid_tbname_idlist = DbIdListKey {
tenant: req.tenant,
tenant: req.tenant.clone(),
// Using a empty db to to list all
db_name: "".to_string(),
};
Expand Down Expand Up @@ -868,6 +892,77 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}

// `list_database` can list db which has no `DbIdListKey`
if include_drop_db {
// if `include_drop_db` is true, return all db info which not exist in db_info_list
let db_id_set: HashSet<u64> = db_info_list
.iter()
.map(|db_info| db_info.ident.db_id)
.collect();

let all_dbs = self.list_databases(req).await?;
for db_info in all_dbs {
if !db_id_set.contains(&db_info.ident.db_id) {
warn!(
"get db history db:{:?}, db_id:{:?} has no DbIdListKey",
db_info.name_ident, db_info.ident.db_id
);
db_info_list.push(db_info);
}
}
} else {
// if `include_drop_db` is false, filter out db which drop_on time out of retention time
let db_id_set: HashSet<u64> = db_info_list
.iter()
.map(|db_info| db_info.ident.db_id)
.collect();

let all_dbs = self.list_databases(req).await?;
let mut add_dbinfo_map = HashMap::new();
let mut db_id_list = Vec::new();
for db_info in all_dbs {
if !db_id_set.contains(&db_info.ident.db_id) {
warn!(
"get db history db:{:?}, db_id:{:?} has no DbIdListKey",
db_info.name_ident, db_info.ident.db_id
);
db_id_list.push(DatabaseId {
db_id: db_info.ident.db_id,
});
add_dbinfo_map.insert(db_info.ident.db_id, db_info);
}
}
let inner_keys: Vec<String> = db_id_list
.iter()
.map(|db_id| db_id.to_string_key())
.collect();
let mut db_id_list_iter = db_id_list.into_iter();
for c in inner_keys.chunks(DEFAULT_MGET_SIZE) {
let db_meta_seq_meta_vec: Vec<(u64, Option<DatabaseMeta>)> =
mget_pb_values(self, c).await?;

for (db_meta_seq, db_meta) in db_meta_seq_meta_vec {
let db_id = db_id_list_iter.next().unwrap().db_id;
if db_meta_seq == 0 || db_meta.is_none() {
error!("get_database_history cannot find {:?} db_meta", db_id);
continue;
}
let db_meta = db_meta.unwrap();
// if include drop db, then no need to fill out of retention time db
if is_drop_time_out_of_retention_time(&db_meta.drop_on, &now) {
continue;
}
if let Some(db_info) = add_dbinfo_map.get(&db_id) {
warn!(
"get db history db:{:?}, db_id:{:?} has no DbIdListKey",
db_info.name_ident, db_info.ident.db_id
);
db_info_list.push(db_info.clone());
}
}
}
}

return Ok(db_info_list);
}

Expand Down Expand Up @@ -2524,6 +2619,31 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}

// add TableIdListKey if not exist
{
// get table id list from _fd_table_id_list/db_id/table_name
let dbid_tbname_idlist = TableIdListKey {
db_id,
table_name: dbid_tbname.table_name.clone(),
};
let (tb_id_list_seq, _tb_id_list_opt): (_, Option<TableIdList>) =
get_pb_value(self, &dbid_tbname_idlist).await?;
if tb_id_list_seq == 0 {
let mut tb_id_list = TableIdList::new();
tb_id_list.append(table_id);

warn!(
"drop table:{:?}, table_id:{:?} has no TableIdList",
dbid_tbname, table_id
);

condition.push(txn_cond_seq(&dbid_tbname_idlist, Eq, tb_id_list_seq));
if_then.push(txn_op_put(
&dbid_tbname_idlist,
serialize_struct(&tb_id_list)?,
));
}
}
let txn_req = TxnRequest {
condition,
if_then,
Expand Down
117 changes: 117 additions & 0 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ use databend_common_meta_types::UpsertKV;
use log::debug;
use log::info;

use crate::deserialize_struct;
use crate::is_all_db_data_removed;
use crate::kv_app_error::KVAppError;
use crate::serialize_struct;
Expand Down Expand Up @@ -286,6 +287,10 @@ impl SchemaApiTestSuite {
suite
.table_drop_without_db_id_to_name(&b.build().await)
.await?;
suite.list_db_without_db_id_list(&b.build().await).await?;
suite
.drop_table_without_table_id_list(&b.build().await)
.await?;
suite.table_rename(&b.build().await).await?;
suite.table_update_meta(&b.build().await).await?;
suite.table_update_mask_policy(&b.build().await).await?;
Expand Down Expand Up @@ -1910,6 +1915,118 @@ impl SchemaApiTestSuite {
Ok(())
}

#[minitrace::trace]
async fn list_db_without_db_id_list<MT>(&self, mt: &MT) -> anyhow::Result<()>
where MT: SchemaApi + kvapi::AsKVApi<Error = MetaError> {
// test drop a db without db_id_list
{
let tenant = "tenant1";
let db = "db1";
let mut util = Util::new(mt, tenant, db, "tb2", "JSON");
util.create_db().await?;

// remove db id list
let dbid_idlist = DbIdListKey {
tenant: tenant.to_string(),
db_name: db.to_string(),
};
util.mt
.as_kv_api()
.upsert_kv(UpsertKV::delete(dbid_idlist.to_string_key()))
.await?;

// drop db
util.drop_db().await?;

// after drop db, check if db id list has been added
let value = util
.mt
.as_kv_api()
.get_kv(&dbid_idlist.to_string_key())
.await?;

assert!(value.is_some());
let seqv = value.unwrap();
let db_id_list: DbIdList = deserialize_struct(&seqv.data)?;
assert_eq!(db_id_list.id_list[0], util.db_id);
}
// test get_database_history can return db without db_id_list
{
let tenant = "tenant2";
let db = "db2";
let mut util = Util::new(mt, tenant, db, "tb2", "JSON");
util.create_db().await?;

// remove db id list
let dbid_idlist = DbIdListKey {
tenant: tenant.to_string(),
db_name: db.to_string(),
};
util.mt
.as_kv_api()
.upsert_kv(UpsertKV::delete(dbid_idlist.to_string_key()))
.await?;

let res = mt
.get_database_history(ListDatabaseReq {
tenant: tenant.to_string(),
filter: None,
})
.await?;

// check if get_database_history return db_id
let mut found = false;
for db_info in res {
if db_info.ident.db_id == util.db_id {
found = true;
break;
}
}

assert!(found);
}
Ok(())
}

#[minitrace::trace]
async fn drop_table_without_table_id_list<MT>(&self, mt: &MT) -> anyhow::Result<()>
where MT: SchemaApi + kvapi::AsKVApi<Error = MetaError> {
// test drop a table without table_id_list
let tenant = "tenant1";
let db = "db1";
let table = "tb1";
let mut util = Util::new(mt, tenant, db, table, "JSON");
util.create_db().await?;
let (tid, _table_meta) = util.create_table().await?;

// remove db id list
let table_id_idlist = TableIdListKey {
db_id: util.db_id,
table_name: table.to_string(),
};
util.mt
.as_kv_api()
.upsert_kv(UpsertKV::delete(table_id_idlist.to_string_key()))
.await?;

// drop table
util.drop_table_by_id().await?;

// after drop table, check if table id list has been added
let value = util
.mt
.as_kv_api()
.get_kv(&table_id_idlist.to_string_key())
.await?;

assert!(value.is_some());
let seqv = value.unwrap();
let id_list: TableIdList = deserialize_struct(&seqv.data)?;
assert_eq!(id_list.id_list[0], tid);

Ok(())
}

#[minitrace::trace]
async fn table_rename<MT: SchemaApi>(&self, mt: &MT) -> anyhow::Result<()> {
let tenant = "tenant1";
Expand Down