diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs b/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs index efb2df4daa834..fc8c924e8eaf4 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs @@ -17,44 +17,33 @@ use std::time::Instant; use databend_common_catalog::table::Table; use databend_common_exception::Result; +use databend_common_meta_app::schema::TableInfo; use databend_common_storages_fuse::FuseTable; use futures_util::TryStreamExt; use log::info; use opendal::EntryMode; use opendal::Metakey; +use opendal::Operator; #[async_backtrace::framed] -async fn do_vacuum_drop_table( - table: Arc, +pub async fn do_vacuum_drop_table( + table_info: &TableInfo, + operator: &Operator, dry_run_limit: Option, ) -> Result>> { - // only operate fuse table - if table.engine() != "FUSE" { - info!( - "ignore table {} not of FUSE engine, table engine {}", - table.get_table_info().name, - table.engine() - ); - return Ok(None); - } - let table_info = table.get_table_info(); // storage_params is_some means it is an external table, ignore if table_info.meta.storage_params.is_some() { - info!("ignore external table {}", table.get_table_info().name); + info!("ignore external table {}", table_info.name); return Ok(None); } - let fuse_table = FuseTable::try_from_table(table.as_ref())?; - - let operator = fuse_table.get_operator_ref(); let dir = format!("{}/", FuseTable::parse_storage_prefix(table_info)?); - info!("vacuum drop table {:?} dir {:?}", table.name(), dir); + info!("vacuum drop table {:?} dir {:?}", table_info.name, dir); let start = Instant::now(); let ret = match dry_run_limit { None => { - let _ = operator.remove_all(&dir).await; - + operator.remove_all(&dir).await?; Ok(None) } Some(dry_run_limit) => { @@ -67,7 +56,7 @@ async fn do_vacuum_drop_table( while let Some(de) = ds.try_next().await? { let meta = de.metadata(); if EntryMode::FILE == meta.mode() { - list_files.push((fuse_table.name().to_string(), de.name().to_string())); + list_files.push((table_info.name.clone(), de.name().to_string())); if list_files.len() >= dry_run_limit { break; } @@ -80,7 +69,7 @@ async fn do_vacuum_drop_table( info!( "vacuum drop table {:?} dir {:?}, cost:{} sec", - table.name(), + table_info.name, dir, start.elapsed().as_secs() ); @@ -98,7 +87,20 @@ pub async fn do_vacuum_drop_tables( let mut list_files = Vec::new(); let mut left_limit = dry_run_limit; for table in tables { - let ret = do_vacuum_drop_table(table, left_limit).await?; + // only operate fuse table + let ret = if let Ok(fuse_table) = FuseTable::try_from_table(table.as_ref()) { + let table_info = table.get_table_info(); + let operator = fuse_table.get_operator_ref(); + do_vacuum_drop_table(table_info, operator, left_limit).await? + } else { + info!( + "ignore table {}, which is not of FUSE engine. Table engine {}", + table.get_table_info().name, + table.engine() + ); + continue; + }; + if let Some(ret) = ret { list_files.extend(ret); if list_files.len() >= dry_run_limit.unwrap() { diff --git a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs index bc2eab3b9d314..0089fcf61b4e7 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs @@ -12,13 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Debug; + use databend_common_base::base::tokio; use databend_common_exception::Result; +use databend_common_meta_app::schema::TableInfo; +use databend_common_meta_app::schema::TableMeta; +use databend_common_meta_app::storage::StorageParams; use databend_enterprise_query::storages::fuse::do_vacuum_drop_tables; +use databend_enterprise_query::storages::fuse::operations::vacuum_drop_tables::do_vacuum_drop_table; use databend_query::test_kits::*; +use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID; +use opendal::raw::Accessor; +use opendal::raw::AccessorInfo; +use opendal::raw::OpStat; +use opendal::raw::RpStat; +use opendal::EntryMode; +use opendal::Metadata; +use opendal::OperatorBuilder; #[tokio::test(flavor = "multi_thread")] -async fn test_fuse_do_vacuum_drop_table() -> Result<()> { +async fn test_fuse_do_vacuum_drop_tables() -> Result<()> { let fixture = TestFixture::setup().await?; fixture @@ -90,3 +104,130 @@ async fn test_fuse_do_vacuum_drop_table() -> Result<()> { Ok(()) } + +mod test_accessor { + use std::sync::atomic::AtomicBool; + use std::sync::atomic::Ordering; + + use opendal::raw::OpDelete; + use opendal::raw::OpList; + use opendal::raw::RpDelete; + use opendal::raw::RpList; + + use super::*; + + // Accessor that throws an error when deleting dir or files. + #[derive(Debug)] + pub(crate) struct AccessorFaultyDeletion { + hit_delete: AtomicBool, + } + + impl AccessorFaultyDeletion { + pub(crate) fn new() -> Self { + AccessorFaultyDeletion { + hit_delete: AtomicBool::new(false), + } + } + + pub(crate) fn hit_delete_operation(&self) -> bool { + self.hit_delete.load(Ordering::Acquire) + } + } + #[async_trait::async_trait] + impl Accessor for AccessorFaultyDeletion { + type Reader = (); + type BlockingReader = (); + type Writer = (); + type BlockingWriter = (); + type Pager = (); + type BlockingPager = (); + + fn info(&self) -> AccessorInfo { + let mut info = AccessorInfo::default(); + let cap = info.full_capability_mut(); + cap.stat = true; + cap.batch = true; + cap.delete = true; + cap.list = true; + cap.list_with_delimiter_slash = true; + info + } + + async fn stat(&self, _path: &str, _args: OpStat) -> opendal::Result { + let stat = RpStat::new(Metadata::new(EntryMode::DIR)); + Ok(stat) + } + + async fn delete(&self, _path: &str, _args: OpDelete) -> opendal::Result { + self.hit_delete.store(true, Ordering::Release); + Err(opendal::Error::new( + opendal::ErrorKind::Unexpected, + "does not matter (delete)", + )) + } + + async fn list(&self, _path: &str, _args: OpList) -> opendal::Result<(RpList, Self::Pager)> { + Ok((RpList::default(), ())) + } + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_fuse_do_vacuum_drop_table_deletion_error() -> Result<()> { + // do_vacuum_drop_table should return Err if file deletion failed + + let mut table_info = TableInfo::default(); + table_info + .meta + .options + .insert(OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned()); + + use test_accessor::AccessorFaultyDeletion; + // Operator with mocked accessor that will fail on `remove_all` + // + // Note that: + // In real case, `Accessor::batch` will be called (instead of Accessor::delete) + // but all that we need here is let Operator::remove_all failed + let faulty_accessor = std::sync::Arc::new(AccessorFaultyDeletion::new()); + let operator = OperatorBuilder::new(faulty_accessor.clone()).finish(); + + let result = do_vacuum_drop_table(&table_info, &operator, None).await; + assert!(result.is_err()); + + // verify that accessor.delete() was called + assert!(faulty_accessor.hit_delete_operation()); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_fuse_do_vacuum_drop_table_external_storage() -> Result<()> { + // do_vacuum_drop_table should return Ok(None) if external storage detected + + let meta = TableMeta { + storage_params: Some(StorageParams::default()), + ..Default::default() + }; + + let table_info = TableInfo { + meta, + ..Default::default() + }; + + // Accessor passed in does NOT matter in this case, `do_vacuum_drop_table` should + // return Ok(None) before accessor is used. + use test_accessor::AccessorFaultyDeletion; + let accessor = std::sync::Arc::new(AccessorFaultyDeletion::new()); + let operator = OperatorBuilder::new(accessor.clone()).finish(); + + let result = do_vacuum_drop_table(&table_info, &operator, None).await; + + // verify that Ok(None) is returned + assert!(result.is_ok()); + assert!(result.unwrap().is_none()); + + // verify that accessor.delete() was NOT called + assert!(!accessor.hit_delete_operation()); + + Ok(()) +}