Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Dec 18, 2023
1 parent 21e762e commit 6b1b015
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 21 deletions.
43 changes: 23 additions & 20 deletions src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,28 @@ use std::time::Instant;

use databend_common_catalog::table::Table;
use databend_common_exception::Result;
use databend_common_meta_app::schema::TableInfo;
use databend_common_storages_fuse::FuseTable;
use futures_util::TryStreamExt;
use log::info;
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;

#[async_backtrace::framed]
async fn do_vacuum_drop_table(
table: Arc<dyn Table>,
pub async fn do_vacuum_drop_table(
table_info: &TableInfo,
operator: &Operator,
dry_run_limit: Option<usize>,
) -> Result<Option<Vec<(String, String)>>> {
// only operate fuse table
if table.engine() != "FUSE" {
info!(
"ignore table {} not of FUSE engine, table engine {}",
table.get_table_info().name,
table.engine()
);
return Ok(None);
}
let table_info = table.get_table_info();
// storage_params is_some means it is an external table, ignore
if table_info.meta.storage_params.is_some() {
info!("ignore external table {}", table.get_table_info().name);
info!("ignore external table {}", table_info.name);
return Ok(None);
}
let fuse_table = FuseTable::try_from_table(table.as_ref())?;

let operator = fuse_table.get_operator_ref();

let dir = format!("{}/", FuseTable::parse_storage_prefix(table_info)?);
info!("vacuum drop table {:?} dir {:?}", table.name(), dir);
info!("vacuum drop table {:?} dir {:?}", table_info.name, dir);
let start = Instant::now();

let ret = match dry_run_limit {
Expand All @@ -66,7 +56,7 @@ async fn do_vacuum_drop_table(
while let Some(de) = ds.try_next().await? {
let meta = de.metadata();
if EntryMode::FILE == meta.mode() {
list_files.push((fuse_table.name().to_string(), de.name().to_string()));
list_files.push((table_info.name.clone(), de.name().to_string()));
if list_files.len() >= dry_run_limit {
break;
}
Expand All @@ -79,7 +69,7 @@ async fn do_vacuum_drop_table(

info!(
"vacuum drop table {:?} dir {:?}, cost:{} sec",
table.name(),
table_info.name,
dir,
start.elapsed().as_secs()
);
Expand All @@ -97,7 +87,20 @@ pub async fn do_vacuum_drop_tables(
let mut list_files = Vec::new();
let mut left_limit = dry_run_limit;
for table in tables {
let ret = do_vacuum_drop_table(table, left_limit).await?;
// only operate fuse table
let ret = if let Ok(fuse_table) = FuseTable::try_from_table(table.as_ref()) {
let table_info = table.get_table_info();
let operator = fuse_table.get_operator_ref();
do_vacuum_drop_table(table_info, operator, left_limit).await?
} else {
info!(
"ignore table {}, which is not of FUSE engine. Table engine {}",
table.get_table_info().name,
table.engine()
);
continue;
};

if let Some(ret) = ret {
list_files.extend(ret);
if list_files.len() >= dry_run_limit.unwrap() {
Expand Down
134 changes: 133 additions & 1 deletion src/query/ee/tests/it/storages/fuse/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Debug;

use databend_common_base::base::tokio;
use databend_common_exception::Result;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::storage::StorageParams;
use databend_enterprise_query::storages::fuse::do_vacuum_drop_tables;
use databend_enterprise_query::storages::fuse::operations::vacuum_drop_tables::do_vacuum_drop_table;
use databend_query::test_kits::*;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
use opendal::raw::Accessor;
use opendal::raw::AccessorInfo;
use opendal::raw::OpStat;
use opendal::raw::RpStat;
use opendal::EntryMode;
use opendal::Metadata;
use opendal::OperatorBuilder;

#[tokio::test(flavor = "multi_thread")]
async fn test_fuse_do_vacuum_drop_table() -> Result<()> {
async fn test_fuse_do_vacuum_drop_tables() -> Result<()> {
let fixture = TestFixture::setup().await?;

fixture
Expand Down Expand Up @@ -90,3 +104,121 @@ async fn test_fuse_do_vacuum_drop_table() -> Result<()> {

Ok(())
}

mod test_accessor {
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;

use opendal::raw::OpDelete;
use opendal::raw::OpList;
use opendal::raw::RpDelete;
use opendal::raw::RpList;

use super::*;

// Accessor that throws an error when deleting dir or files.
#[derive(Debug)]
pub(crate) struct AccessorFaultyDeletion {
hit_delete: AtomicBool,
}

impl AccessorFaultyDeletion {
pub(crate) fn new() -> Self {
AccessorFaultyDeletion {
hit_delete: AtomicBool::new(false),
}
}

pub(crate) fn hit_delete_operation(&self) -> bool {
self.hit_delete.load(Ordering::Acquire)
}
}
#[async_trait::async_trait]
impl Accessor for AccessorFaultyDeletion {
type Reader = ();
type BlockingReader = ();
type Writer = ();
type BlockingWriter = ();
type Pager = ();
type BlockingPager = ();

fn info(&self) -> AccessorInfo {
let mut info = AccessorInfo::default();
let cap = info.full_capability_mut();
cap.stat = true;
cap.batch = true;
cap.delete = true;
cap.list = true;
cap.list_with_delimiter_slash = true;
info
}

async fn stat(&self, _path: &str, _args: OpStat) -> opendal::Result<RpStat> {
let stat = RpStat::new(Metadata::new(EntryMode::DIR));
Ok(stat)
}

async fn delete(&self, _path: &str, _args: OpDelete) -> opendal::Result<RpDelete> {
self.hit_delete.store(true, Ordering::Release);
Err(opendal::Error::new(
opendal::ErrorKind::Unexpected,
"does not matter (delete)",
))
}

async fn list(&self, _path: &str, _args: OpList) -> opendal::Result<(RpList, Self::Pager)> {
Ok((RpList::default(), ()))
}
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_fuse_do_vacuum_drop_table_deletion_error() -> Result<()> {
// do_vacuum_drop_table should return Err if file deletion failed

let mut table_info = TableInfo::default();
table_info
.meta
.options
.insert(OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned());

// mocked accessor that will fail to delete dir/files
use test_accessor::AccessorFaultyDeletion;
let faulty_accessor = std::sync::Arc::new(AccessorFaultyDeletion::new());
let operator = OperatorBuilder::new(faulty_accessor.clone()).finish();

let result = do_vacuum_drop_table(&table_info, &operator, None).await;
assert!(result.is_err());
// verify that accessor.delete() was called
assert!(faulty_accessor.hit_delete_operation());

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_fuse_do_vacuum_drop_table_external_storage() -> Result<()> {
// do_vacuum_drop_table should return Ok(None) if external storage detected

let meta = TableMeta {
storage_params: Some(StorageParams::default()),
..Default::default()
};

let table_info = TableInfo {
meta,
..Default::default()
};

// Accessor passed in does NOT matter in this case, `do_vacuum_drop_table` should
// return Ok(None) before accessor is used.
use test_accessor::AccessorFaultyDeletion;
let accessor = std::sync::Arc::new(AccessorFaultyDeletion::new());
let operator = OperatorBuilder::new(accessor.clone()).finish();

let result = do_vacuum_drop_table(&table_info, &operator, None).await;
assert!(result.is_ok());
assert!(result.unwrap().is_none());
// verify that accessor.delete() was NOT called
assert!(!accessor.hit_delete_operation());

Ok(())
}

0 comments on commit 6b1b015

Please sign in to comment.