Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize vacuum drop table, execute in parallel #15478

Merged
merged 6 commits into from
May 11, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: optimize vacuum drop table, execute in parallel
lichuang committed May 11, 2024

Verified

This commit was signed with the committer’s verified signature.
commit e9b5307dfb2e1274b37a05404a5bb1a5d39572c6
3 changes: 2 additions & 1 deletion src/query/ee/src/storages/fuse/operations/handler.rs
Original file line number Diff line number Diff line change
@@ -46,10 +46,11 @@ impl VacuumHandler for RealVacuumHandler {

async fn do_vacuum_drop_tables(
&self,
threads_nums: usize,
tables: Vec<Arc<dyn Table>>,
dry_run_limit: Option<usize>,
) -> Result<Option<Vec<VacuumDropFileInfo>>> {
do_vacuum_drop_tables(tables, dry_run_limit).await
do_vacuum_drop_tables(threads_nums, tables, dry_run_limit).await
}

async fn do_vacuum_temporary_files(
171 changes: 97 additions & 74 deletions src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs
Original file line number Diff line number Diff line change
@@ -15,120 +15,143 @@
use std::sync::Arc;
use std::time::Instant;

use databend_common_base::runtime::execute_futures_in_parallel;
use databend_common_catalog::table::Table;
use databend_common_exception::Result;
use databend_common_meta_app::schema::TableInfo;
use databend_common_storages_fuse::FuseTable;
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropFileInfo;
use futures_util::TryStreamExt;
use log::info;
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;

#[async_backtrace::framed]
pub async fn do_vacuum_drop_table(
table_info: &TableInfo,
operator: &Operator,
tables: Vec<Arc<dyn Table>>,
dry_run_limit: Option<usize>,
) -> Result<Option<Vec<VacuumDropFileInfo>>> {
let dir = format!("{}/", FuseTable::parse_storage_prefix(table_info)?);
let mut list_files = vec![];
for table in tables {
let (table_info, operator) =
if let Ok(fuse_table) = FuseTable::try_from_table(table.as_ref()) {
(fuse_table.get_table_info(), fuse_table.get_operator())
} else {
info!(
"ignore table {}, which is not of FUSE engine. Table engine {}",
table.get_table_info().name,
table.engine()
);
continue;
};

info!(
"vacuum drop table {:?} dir {:?}, is_external_table:{:?}",
table_info.name,
dir,
table_info.meta.storage_params.is_some()
);
let dir = format!("{}/", FuseTable::parse_storage_prefix(table_info)?);

let start = Instant::now();
info!(
"vacuum drop table {:?} dir {:?}, is_external_table:{:?}",
table_info.name,
dir,
table_info.meta.storage_params.is_some()
);

let ret = match dry_run_limit {
None => {
operator.remove_all(&dir).await?;
Ok(None)
}
Some(dry_run_limit) => {
let mut ds = operator
.lister_with(&dir)
.recursive(true)
.metakey(Metakey::Mode)
.metakey(Metakey::ContentLength)
.await?;
let mut list_files = Vec::new();
while let Some(de) = ds.try_next().await? {
let meta = de.metadata();
if EntryMode::FILE == meta.mode() {
list_files.push((
table_info.name.clone(),
de.name().to_string(),
meta.content_length(),
));
if list_files.len() >= dry_run_limit {
break;
let start = Instant::now();

match dry_run_limit {
None => {
operator.remove_all(&dir).await?;
}
Some(dry_run_limit) => {
let mut ds = operator
.lister_with(&dir)
.recursive(true)
.metakey(Metakey::Mode)
.metakey(Metakey::ContentLength)
.await?;

while let Some(de) = ds.try_next().await? {
let meta = de.metadata();
if EntryMode::FILE == meta.mode() {
list_files.push((
table_info.name.clone(),
de.name().to_string(),
meta.content_length(),
));
if list_files.len() >= dry_run_limit {
break;
}
}
}
}
};

Ok(Some(list_files))
}
};

info!(
"vacuum drop table {:?} dir {:?}, cost:{} sec",
table_info.name,
dir,
start.elapsed().as_secs()
);
ret
info!(
"vacuum drop table {:?} dir {:?}, cost:{} sec",
table_info.name,
dir,
start.elapsed().as_secs()
);
}
Ok(if dry_run_limit.is_some() {
Some(list_files)
} else {
None
})
}

#[async_backtrace::framed]
pub async fn do_vacuum_drop_tables(
threads_nums: usize,
tables: Vec<Arc<dyn Table>>,
dry_run_limit: Option<usize>,
) -> Result<Option<Vec<VacuumDropFileInfo>>> {
let start = Instant::now();
let tables_len = tables.len();
info!("do_vacuum_drop_tables {} tables", tables_len);
let mut list_files = Vec::new();
let mut left_limit = dry_run_limit;
for table in tables {
// only operate fuse table
let ret = if let Ok(fuse_table) = FuseTable::try_from_table(table.as_ref()) {
let table_info = table.get_table_info();
let operator = fuse_table.get_operator_ref();
do_vacuum_drop_table(table_info, operator, left_limit).await?

let batch_size = (tables_len / threads_nums).min(50).max(1);

let result = if batch_size >= tables.len() {
do_vacuum_drop_table(tables, dry_run_limit).await?
} else {
let mut chunks = tables.chunks(batch_size);
let thread_limit = if let Some(dry_run_limit) = dry_run_limit {
Some((dry_run_limit / threads_nums).min(dry_run_limit).max(1))
} else {
info!(
"ignore table {}, which is not of FUSE engine. Table engine {}",
table.get_table_info().name,
table.engine()
);
continue;
None
};
if let Some(ret) = ret {
list_files.extend(ret);
if list_files.len() >= dry_run_limit.unwrap() {
info!(
"do_vacuum_drop_tables {} tables, cost:{} sec",
tables_len,
start.elapsed().as_secs()
);
return Ok(Some(list_files));
} else {
left_limit = Some(dry_run_limit.unwrap() - list_files.len());
let tasks = std::iter::from_fn(move || {
chunks
.next()
.map(|tables| do_vacuum_drop_table(tables.to_vec(), thread_limit))
});

let result = execute_futures_in_parallel(
tasks,
threads_nums,
threads_nums * 2,
"batch-vacuum-drop-tables-worker".to_owned(),
)
.await?;
if dry_run_limit.is_some() {
let mut ret_files = vec![];
for file in result {
if let Ok(Some(files)) = file {
ret_files.extend(files);
}
}
Some(ret_files)
} else {
None
}
}
};

info!(
"do_vacuum_drop_tables {} tables, cost:{} sec",
tables_len,
start.elapsed().as_secs()
);

Ok(if dry_run_limit.is_some() {
Some(list_files)
Ok(if let Some(result) = result {
Some(result)
} else {
None
})
6 changes: 4 additions & 2 deletions src/query/ee/tests/it/storages/fuse/operations/vacuum.rs
Original file line number Diff line number Diff line change
@@ -69,10 +69,12 @@ async fn test_fuse_do_vacuum_drop_tables() -> Result<()> {
let tbl = fixture.default_table_name();
let qry = format!("drop table {}.{}", db, tbl);
fixture.execute_command(&qry).await?;
let ctx = fixture.new_query_ctx().await?;
let threads_nums = ctx.get_settings().get_max_threads()? as usize;

// verify dry run never delete files
{
do_vacuum_drop_tables(vec![table.clone()], Some(100)).await?;
do_vacuum_drop_tables(threads_nums, vec![table.clone()], Some(100)).await?;
check_data_dir(
&fixture,
"test_fuse_do_vacuum_drop_table: verify generate files",
@@ -88,7 +90,7 @@ async fn test_fuse_do_vacuum_drop_tables() -> Result<()> {
}

{
do_vacuum_drop_tables(vec![table], None).await?;
do_vacuum_drop_tables(threads_nums, vec![table], None).await?;

// after vacuum drop tables, verify the files number
check_data_dir(