From 21e762ec1879fa1dda9222c38f2acbcc57b06d42 Mon Sep 17 00:00:00 2001
From: dantengsky <dantengsky@gmail.com>
Date: Mon, 18 Dec 2023 12:55:05 +0800
Subject: [PATCH 1/2] fix: abort vacuum if there are io execption

---
 .../ee/src/storages/fuse/operations/vacuum_drop_tables.rs      | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs b/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs
index efb2df4daa834..96e7d655910b3 100644
--- a/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs
+++ b/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs
@@ -53,8 +53,7 @@ async fn do_vacuum_drop_table(
 
     let ret = match dry_run_limit {
         None => {
-            let _ = operator.remove_all(&dir).await;
-
+            operator.remove_all(&dir).await?;
             Ok(None)
         }
         Some(dry_run_limit) => {

From 54fba74db4c0bbab0a62203dd5a248d9aebc2382 Mon Sep 17 00:00:00 2001
From: dantengsky <dantengsky@gmail.com>
Date: Mon, 18 Dec 2023 15:31:08 +0800
Subject: [PATCH 2/2] add unit test

---
 .../fuse/operations/vacuum_drop_tables.rs     |  43 +++---
 .../it/storages/fuse/operations/vacuum.rs     | 143 +++++++++++++++++-
 2 files changed, 165 insertions(+), 21 deletions(-)

diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs b/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs
index 96e7d655910b3..fc8c924e8eaf4 100644
--- a/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs
+++ b/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs
@@ -17,38 +17,28 @@ use std::time::Instant;
 
 use databend_common_catalog::table::Table;
 use databend_common_exception::Result;
+use databend_common_meta_app::schema::TableInfo;
 use databend_common_storages_fuse::FuseTable;
 use futures_util::TryStreamExt;
 use log::info;
 use opendal::EntryMode;
 use opendal::Metakey;
+use opendal::Operator;
 
 #[async_backtrace::framed]
-async fn do_vacuum_drop_table(
-    table: Arc<dyn Table>,
+pub async fn do_vacuum_drop_table(
+    table_info: &TableInfo,
+    operator: &Operator,
     dry_run_limit: Option<usize>,
 ) -> Result<Option<Vec<(String, String)>>> {
-    // only operate fuse table
-    if table.engine() != "FUSE" {
-        info!(
-            "ignore table {} not of FUSE engine, table engine {}",
-            table.get_table_info().name,
-            table.engine()
-        );
-        return Ok(None);
-    }
-    let table_info = table.get_table_info();
     // storage_params is_some means it is an external table, ignore
     if table_info.meta.storage_params.is_some() {
-        info!("ignore external table {}", table.get_table_info().name);
+        info!("ignore external table {}", table_info.name);
         return Ok(None);
     }
-    let fuse_table = FuseTable::try_from_table(table.as_ref())?;
-
-    let operator = fuse_table.get_operator_ref();
 
     let dir = format!("{}/", FuseTable::parse_storage_prefix(table_info)?);
-    info!("vacuum drop table {:?} dir {:?}", table.name(), dir);
+    info!("vacuum drop table {:?} dir {:?}", table_info.name, dir);
     let start = Instant::now();
 
     let ret = match dry_run_limit {
@@ -66,7 +56,7 @@ async fn do_vacuum_drop_table(
             while let Some(de) = ds.try_next().await? {
                 let meta = de.metadata();
                 if EntryMode::FILE == meta.mode() {
-                    list_files.push((fuse_table.name().to_string(), de.name().to_string()));
+                    list_files.push((table_info.name.clone(), de.name().to_string()));
                     if list_files.len() >= dry_run_limit {
                         break;
                     }
@@ -79,7 +69,7 @@ async fn do_vacuum_drop_table(
 
     info!(
         "vacuum drop table {:?} dir {:?}, cost:{} sec",
-        table.name(),
+        table_info.name,
         dir,
         start.elapsed().as_secs()
     );
@@ -97,7 +87,20 @@ pub async fn do_vacuum_drop_tables(
     let mut list_files = Vec::new();
     let mut left_limit = dry_run_limit;
     for table in tables {
-        let ret = do_vacuum_drop_table(table, left_limit).await?;
+        // only operate fuse table
+        let ret = if let Ok(fuse_table) = FuseTable::try_from_table(table.as_ref()) {
+            let table_info = table.get_table_info();
+            let operator = fuse_table.get_operator_ref();
+            do_vacuum_drop_table(table_info, operator, left_limit).await?
+        } else {
+            info!(
+                "ignore table {}, which is not of FUSE engine. Table engine {}",
+                table.get_table_info().name,
+                table.engine()
+            );
+            continue;
+        };
+
         if let Some(ret) = ret {
             list_files.extend(ret);
             if list_files.len() >= dry_run_limit.unwrap() {
diff --git a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs
index bc2eab3b9d314..0089fcf61b4e7 100644
--- a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs
+++ b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs
@@ -12,13 +12,27 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+use std::fmt::Debug;
+
 use databend_common_base::base::tokio;
 use databend_common_exception::Result;
+use databend_common_meta_app::schema::TableInfo;
+use databend_common_meta_app::schema::TableMeta;
+use databend_common_meta_app::storage::StorageParams;
 use databend_enterprise_query::storages::fuse::do_vacuum_drop_tables;
+use databend_enterprise_query::storages::fuse::operations::vacuum_drop_tables::do_vacuum_drop_table;
 use databend_query::test_kits::*;
+use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
+use opendal::raw::Accessor;
+use opendal::raw::AccessorInfo;
+use opendal::raw::OpStat;
+use opendal::raw::RpStat;
+use opendal::EntryMode;
+use opendal::Metadata;
+use opendal::OperatorBuilder;
 
 #[tokio::test(flavor = "multi_thread")]
-async fn test_fuse_do_vacuum_drop_table() -> Result<()> {
+async fn test_fuse_do_vacuum_drop_tables() -> Result<()> {
     let fixture = TestFixture::setup().await?;
 
     fixture
@@ -90,3 +104,130 @@ async fn test_fuse_do_vacuum_drop_table() -> Result<()> {
 
     Ok(())
 }
+
+mod test_accessor {
+    use std::sync::atomic::AtomicBool;
+    use std::sync::atomic::Ordering;
+
+    use opendal::raw::OpDelete;
+    use opendal::raw::OpList;
+    use opendal::raw::RpDelete;
+    use opendal::raw::RpList;
+
+    use super::*;
+
+    // Accessor that throws an error when deleting dir or files.
+    #[derive(Debug)]
+    pub(crate) struct AccessorFaultyDeletion {
+        hit_delete: AtomicBool,
+    }
+
+    impl AccessorFaultyDeletion {
+        pub(crate) fn new() -> Self {
+            AccessorFaultyDeletion {
+                hit_delete: AtomicBool::new(false),
+            }
+        }
+
+        pub(crate) fn hit_delete_operation(&self) -> bool {
+            self.hit_delete.load(Ordering::Acquire)
+        }
+    }
+    #[async_trait::async_trait]
+    impl Accessor for AccessorFaultyDeletion {
+        type Reader = ();
+        type BlockingReader = ();
+        type Writer = ();
+        type BlockingWriter = ();
+        type Pager = ();
+        type BlockingPager = ();
+
+        fn info(&self) -> AccessorInfo {
+            let mut info = AccessorInfo::default();
+            let cap = info.full_capability_mut();
+            cap.stat = true;
+            cap.batch = true;
+            cap.delete = true;
+            cap.list = true;
+            cap.list_with_delimiter_slash = true;
+            info
+        }
+
+        async fn stat(&self, _path: &str, _args: OpStat) -> opendal::Result<RpStat> {
+            let stat = RpStat::new(Metadata::new(EntryMode::DIR));
+            Ok(stat)
+        }
+
+        async fn delete(&self, _path: &str, _args: OpDelete) -> opendal::Result<RpDelete> {
+            self.hit_delete.store(true, Ordering::Release);
+            Err(opendal::Error::new(
+                opendal::ErrorKind::Unexpected,
+                "does not matter (delete)",
+            ))
+        }
+
+        async fn list(&self, _path: &str, _args: OpList) -> opendal::Result<(RpList, Self::Pager)> {
+            Ok((RpList::default(), ()))
+        }
+    }
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn test_fuse_do_vacuum_drop_table_deletion_error() -> Result<()> {
+    // do_vacuum_drop_table should return Err if file deletion failed
+
+    let mut table_info = TableInfo::default();
+    table_info
+        .meta
+        .options
+        .insert(OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned());
+
+    use test_accessor::AccessorFaultyDeletion;
+    // Operator with mocked accessor that will fail on `remove_all`
+    //
+    // Note that:
+    // In real case, `Accessor::batch` will be called (instead of Accessor::delete)
+    // but all that we need here is let Operator::remove_all failed
+    let faulty_accessor = std::sync::Arc::new(AccessorFaultyDeletion::new());
+    let operator = OperatorBuilder::new(faulty_accessor.clone()).finish();
+
+    let result = do_vacuum_drop_table(&table_info, &operator, None).await;
+    assert!(result.is_err());
+
+    // verify that accessor.delete() was called
+    assert!(faulty_accessor.hit_delete_operation());
+
+    Ok(())
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn test_fuse_do_vacuum_drop_table_external_storage() -> Result<()> {
+    // do_vacuum_drop_table should return Ok(None) if external storage detected
+
+    let meta = TableMeta {
+        storage_params: Some(StorageParams::default()),
+        ..Default::default()
+    };
+
+    let table_info = TableInfo {
+        meta,
+        ..Default::default()
+    };
+
+    // Accessor passed in does NOT matter in this case, `do_vacuum_drop_table` should
+    // return Ok(None) before accessor is used.
+    use test_accessor::AccessorFaultyDeletion;
+    let accessor = std::sync::Arc::new(AccessorFaultyDeletion::new());
+    let operator = OperatorBuilder::new(accessor.clone()).finish();
+
+    let result = do_vacuum_drop_table(&table_info, &operator, None).await;
+
+    // verify that Ok(None) is returned
+    assert!(result.is_ok());
+    assert!(result.unwrap().is_none());
+
+    // verify that accessor.delete() was NOT called
+    assert!(!accessor.hit_delete_operation());
+
+    Ok(())
+}