Skip to content

Commit

Permalink
refactor: SchemaApi::get_table_meta_history() (#16586)
Browse files Browse the repository at this point in the history
It does not need to accept a `database-name` argument just for building
an error.

It does not need to return application level error: if a table is not
found, just return an empty list.

Rename SchemaApi::get_tables_history to list_retainable_tables;
Rename SchemaApi::get_table_meta_history to get_retainable_tables;

refactor: rename list_tables_history->list_gc_ready_tables;get_table_meta_history->get_gc_ready_tables;
drmingdrmer authored Oct 10, 2024
1 parent 8c50071 commit 9e5ffba
Showing 4 changed files with 41 additions and 58 deletions.
13 changes: 8 additions & 5 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
@@ -213,13 +213,16 @@ pub trait SchemaApi: Send + Sync {
/// Get a [`TableNIV`] by `database_id, table_name`.
async fn get_table_in_db(&self, req: &DBIdTableName) -> Result<Option<TableNIV>, MetaError>;

async fn get_table_meta_history(
/// Retrieves the tables under the given `database-id, table-name`
/// that is dropped after retention boundary time, i.e., the table that can be undropped.
async fn get_retainable_tables(
&self,
database_name: &str,
table_id_history: &TableIdHistoryIdent,
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, KVAppError>;
history_ident: &TableIdHistoryIdent,
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, MetaError>;

async fn get_tables_history(&self, req: ListTableReq) -> Result<Vec<TableNIV>, KVAppError>;
/// Get history of all tables in the specified database,
/// that are dropped after retention boundary time, i.e., the tables that can be undropped.
async fn list_retainable_tables(&self, req: ListTableReq) -> Result<Vec<TableNIV>, KVAppError>;

/// List all tables in the database.
///
47 changes: 15 additions & 32 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
@@ -1547,36 +1547,20 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

#[logcall::logcall]
#[fastrace::trace]
async fn get_table_meta_history(
async fn get_retainable_tables(
&self,
database_name: &str,
table_id_history: &TableIdHistoryIdent,
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, KVAppError> {
let table_name = &table_id_history.table_name;

let (_seq, table_id_list) = self.get_pb_seq_and_value(table_id_history).await?;

let table_id_list = table_id_list.ok_or_else(|| {
AppError::from(UnknownTable::new(
table_name,
format!("get_table_history: {}.{}", database_name, table_name),
))
})?;

let now = Utc::now();

let metas = get_table_meta_history(self, &now, table_id_list).await?;
history_ident: &TableIdHistoryIdent,
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, MetaError> {
let Some(seq_table_id_list) = self.get_pb(history_ident).await? else {
return Ok(vec![]);
};

debug!(
name :% =(&table_id_history);
"get_table_meta_history"
);
return Ok(metas);
get_retainable_table_metas(self, &Utc::now(), seq_table_id_list.data).await
}

#[logcall::logcall]
#[fastrace::trace]
async fn get_tables_history(&self, req: ListTableReq) -> Result<Vec<TableNIV>, KVAppError> {
async fn list_retainable_tables(&self, req: ListTableReq) -> Result<Vec<TableNIV>, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

// List tables by tenant, db_id.
@@ -1595,7 +1579,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
for (ident, history) in ident_histories {
debug!(name :% =(&ident); "get_tables_history");

let id_metas = get_table_meta_history(self, &now, history.data).await?;
let id_metas = get_retainable_table_metas(self, &now, history.data).await?;

let table_nivs = id_metas.into_iter().map(|(table_id, seq_meta)| {
let name = DBIdTableName::new(ident.database_id, ident.table_name.clone());
@@ -3050,27 +3034,26 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}

async fn get_table_meta_history(
async fn get_retainable_table_metas(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
now: &DateTime<Utc>,
tb_id_list: TableIdList,
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, KVAppError> {
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, MetaError> {
let mut tb_metas = vec![];

let inner_keys = tb_id_list.id_list.into_iter().map(TableId::new);
let table_ids = tb_id_list.id_list.into_iter().map(TableId::new);

let kvs = kv_api.get_pb_vec(inner_keys).await?;
let kvs = kv_api.get_pb_vec(table_ids).await?;

for (k, table_meta) in kvs {
let Some(table_meta) = table_meta else {
error!("get_table_history cannot find {:?} table_meta", k);
continue;
};

if !is_drop_time_retainable(table_meta.drop_on, *now) {
continue;
if is_drop_time_retainable(table_meta.drop_on, *now) {
tb_metas.push((k, table_meta));
}
tb_metas.push((k, table_meta));
}

Ok(tb_metas)
26 changes: 13 additions & 13 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
@@ -1835,7 +1835,7 @@ impl SchemaApiTestSuite {
let cur_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;

let got = mt
.get_table_meta_history(db_name, &TableIdHistoryIdent {
.get_retainable_tables(&TableIdHistoryIdent {
database_id: cur_db.database_id.db_id,
table_name: tbl_name.to_string(),
})
@@ -4003,7 +4003,7 @@ impl SchemaApiTestSuite {
assert!(table_id >= 1, "table id >= 1");

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;

assert_eq!(res.len(), 1);
@@ -4021,7 +4021,7 @@ impl SchemaApiTestSuite {
upsert_test_data(mt.as_kv_api(), &tbid, data).await?;
// assert not return out of retention time data
let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;

assert_eq!(res.len(), 0);
@@ -4659,7 +4659,7 @@ impl SchemaApiTestSuite {
assert!(res.table_id >= 1, "table id >= 1");

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;

calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
@@ -4691,7 +4691,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
name: DBIdTableName::new(*db_id, tbl_name).to_string_key(),
@@ -4709,7 +4709,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
name: DBIdTableName::new(*db_id, tbl_name).to_string_key(),
@@ -4736,7 +4736,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
name: DBIdTableName::new(*db_id, tbl_name).to_string_key(),
@@ -4759,7 +4759,7 @@ impl SchemaApiTestSuite {
assert!(res.table_id >= 1, "table id >= 1");

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;

calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
@@ -4787,7 +4787,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
name: DBIdTableName::new(*db_id, tbl_name).to_string_key(),
@@ -4804,7 +4804,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;

calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
@@ -4837,7 +4837,7 @@ impl SchemaApiTestSuite {
let old_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
let _res = mt.create_table(req.clone()).await?;
let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;
let cur_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
assert!(old_db.meta.seq < cur_db.meta.seq);
@@ -4876,7 +4876,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![
DroponInfo {
@@ -4905,7 +4905,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![
DroponInfo {
13 changes: 5 additions & 8 deletions src/query/service/src/databases/default/default_database.rs
Original file line number Diff line number Diff line change
@@ -183,13 +183,10 @@ impl Database for DefaultDatabase {
let metas = self
.ctx
.meta
.get_table_meta_history(
self.db_info.name_ident.database_name(),
&TableIdHistoryIdent {
database_id: self.db_info.database_id.db_id,
table_name: table_name.to_string(),
},
)
.get_retainable_tables(&TableIdHistoryIdent {
database_id: self.db_info.database_id.db_id,
table_name: table_name.to_string(),
})
.await?;

let table_infos: Vec<Arc<TableInfo>> = metas
@@ -233,7 +230,7 @@ impl Database for DefaultDatabase {
let mut dropped = self
.ctx
.meta
.get_tables_history(ListTableReq::new(
.list_retainable_tables(ListTableReq::new(
self.get_tenant(),
self.db_info.database_id,
))

0 comments on commit 9e5ffba

Please sign in to comment.