Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: abort vacuum if there are io execption #14052

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,33 @@ use std::time::Instant;

use databend_common_catalog::table::Table;
use databend_common_exception::Result;
use databend_common_meta_app::schema::TableInfo;
use databend_common_storages_fuse::FuseTable;
use futures_util::TryStreamExt;
use log::info;
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;

#[async_backtrace::framed]
async fn do_vacuum_drop_table(
table: Arc<dyn Table>,
pub async fn do_vacuum_drop_table(
table_info: &TableInfo,
operator: &Operator,
dry_run_limit: Option<usize>,
) -> Result<Option<Vec<(String, String)>>> {
// only operate fuse table
if table.engine() != "FUSE" {
info!(
"ignore table {} not of FUSE engine, table engine {}",
table.get_table_info().name,
table.engine()
);
return Ok(None);
}
let table_info = table.get_table_info();
// storage_params is_some means it is an external table, ignore
if table_info.meta.storage_params.is_some() {
info!("ignore external table {}", table.get_table_info().name);
info!("ignore external table {}", table_info.name);
return Ok(None);
}
let fuse_table = FuseTable::try_from_table(table.as_ref())?;

let operator = fuse_table.get_operator_ref();

let dir = format!("{}/", FuseTable::parse_storage_prefix(table_info)?);
info!("vacuum drop table {:?} dir {:?}", table.name(), dir);
info!("vacuum drop table {:?} dir {:?}", table_info.name, dir);
let start = Instant::now();

let ret = match dry_run_limit {
None => {
let _ = operator.remove_all(&dir).await;

operator.remove_all(&dir).await?;
Ok(None)
}
Some(dry_run_limit) => {
Expand All @@ -67,7 +56,7 @@ async fn do_vacuum_drop_table(
while let Some(de) = ds.try_next().await? {
let meta = de.metadata();
if EntryMode::FILE == meta.mode() {
list_files.push((fuse_table.name().to_string(), de.name().to_string()));
list_files.push((table_info.name.clone(), de.name().to_string()));
if list_files.len() >= dry_run_limit {
break;
}
Expand All @@ -80,7 +69,7 @@ async fn do_vacuum_drop_table(

info!(
"vacuum drop table {:?} dir {:?}, cost:{} sec",
table.name(),
table_info.name,
dir,
start.elapsed().as_secs()
);
Expand All @@ -98,7 +87,20 @@ pub async fn do_vacuum_drop_tables(
let mut list_files = Vec::new();
let mut left_limit = dry_run_limit;
for table in tables {
let ret = do_vacuum_drop_table(table, left_limit).await?;
// only operate fuse table
let ret = if let Ok(fuse_table) = FuseTable::try_from_table(table.as_ref()) {
let table_info = table.get_table_info();
let operator = fuse_table.get_operator_ref();
do_vacuum_drop_table(table_info, operator, left_limit).await?
} else {
info!(
"ignore table {}, which is not of FUSE engine. Table engine {}",
table.get_table_info().name,
table.engine()
);
continue;
};

if let Some(ret) = ret {
list_files.extend(ret);
if list_files.len() >= dry_run_limit.unwrap() {
Expand Down
143 changes: 142 additions & 1 deletion src/query/ee/tests/it/storages/fuse/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Debug;

use databend_common_base::base::tokio;
use databend_common_exception::Result;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::storage::StorageParams;
use databend_enterprise_query::storages::fuse::do_vacuum_drop_tables;
use databend_enterprise_query::storages::fuse::operations::vacuum_drop_tables::do_vacuum_drop_table;
use databend_query::test_kits::*;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
use opendal::raw::Accessor;
use opendal::raw::AccessorInfo;
use opendal::raw::OpStat;
use opendal::raw::RpStat;
use opendal::EntryMode;
use opendal::Metadata;
use opendal::OperatorBuilder;

#[tokio::test(flavor = "multi_thread")]
async fn test_fuse_do_vacuum_drop_table() -> Result<()> {
async fn test_fuse_do_vacuum_drop_tables() -> Result<()> {
let fixture = TestFixture::setup().await?;

fixture
Expand Down Expand Up @@ -90,3 +104,130 @@ async fn test_fuse_do_vacuum_drop_table() -> Result<()> {

Ok(())
}

mod test_accessor {
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;

use opendal::raw::OpDelete;
use opendal::raw::OpList;
use opendal::raw::RpDelete;
use opendal::raw::RpList;

use super::*;

// Accessor that throws an error when deleting dir or files.
#[derive(Debug)]
pub(crate) struct AccessorFaultyDeletion {
hit_delete: AtomicBool,
}

impl AccessorFaultyDeletion {
pub(crate) fn new() -> Self {
AccessorFaultyDeletion {
hit_delete: AtomicBool::new(false),
}
}

pub(crate) fn hit_delete_operation(&self) -> bool {
self.hit_delete.load(Ordering::Acquire)
}
}
#[async_trait::async_trait]
impl Accessor for AccessorFaultyDeletion {
type Reader = ();
type BlockingReader = ();
type Writer = ();
type BlockingWriter = ();
type Pager = ();
type BlockingPager = ();

fn info(&self) -> AccessorInfo {
let mut info = AccessorInfo::default();
let cap = info.full_capability_mut();
cap.stat = true;
cap.batch = true;
cap.delete = true;
cap.list = true;
cap.list_with_delimiter_slash = true;
info
}

async fn stat(&self, _path: &str, _args: OpStat) -> opendal::Result<RpStat> {
let stat = RpStat::new(Metadata::new(EntryMode::DIR));
Ok(stat)
}

async fn delete(&self, _path: &str, _args: OpDelete) -> opendal::Result<RpDelete> {
self.hit_delete.store(true, Ordering::Release);
Err(opendal::Error::new(
opendal::ErrorKind::Unexpected,
"does not matter (delete)",
))
}

async fn list(&self, _path: &str, _args: OpList) -> opendal::Result<(RpList, Self::Pager)> {
Ok((RpList::default(), ()))
}
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_fuse_do_vacuum_drop_table_deletion_error() -> Result<()> {
// do_vacuum_drop_table should return Err if file deletion failed

let mut table_info = TableInfo::default();
table_info
.meta
.options
.insert(OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned());

use test_accessor::AccessorFaultyDeletion;
// Operator with mocked accessor that will fail on `remove_all`
//
// Note that:
// In real case, `Accessor::batch` will be called (instead of Accessor::delete)
// but all that we need here is let Operator::remove_all failed
let faulty_accessor = std::sync::Arc::new(AccessorFaultyDeletion::new());
let operator = OperatorBuilder::new(faulty_accessor.clone()).finish();

let result = do_vacuum_drop_table(&table_info, &operator, None).await;
assert!(result.is_err());

// verify that accessor.delete() was called
assert!(faulty_accessor.hit_delete_operation());

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_fuse_do_vacuum_drop_table_external_storage() -> Result<()> {
// do_vacuum_drop_table should return Ok(None) if external storage detected

let meta = TableMeta {
storage_params: Some(StorageParams::default()),
..Default::default()
};

let table_info = TableInfo {
meta,
..Default::default()
};

// Accessor passed in does NOT matter in this case, `do_vacuum_drop_table` should
// return Ok(None) before accessor is used.
use test_accessor::AccessorFaultyDeletion;
let accessor = std::sync::Arc::new(AccessorFaultyDeletion::new());
let operator = OperatorBuilder::new(accessor.clone()).finish();

let result = do_vacuum_drop_table(&table_info, &operator, None).await;

// verify that Ok(None) is returned
assert!(result.is_ok());
assert!(result.unwrap().is_none());

// verify that accessor.delete() was NOT called
assert!(!accessor.hit_delete_operation());

Ok(())
}
Loading