Skip to content

Commit

Permalink
Fix build
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Oct 23, 2024
1 parent adf8333 commit e6b1931
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 19 deletions.
5 changes: 0 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,6 @@ backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "7226
"serialize-serde",
] }
color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "57795da" }
ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" }
jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "672e423" }
openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub async fn do_vacuum_temporary_files(

let operator = DataOperator::instance().operator();

let temporary_dir = format!("{}/", temporary_dir);
let temporary_dir = format!("{}/", temporary_dir.trim_end_matches('/'));

let mut ds = operator
.lister_with(&temporary_dir)
Expand All @@ -66,6 +66,10 @@ pub async fn do_vacuum_temporary_files(
let mut batch_size = 0;

while let Some(de) = ds.try_next().await? {
if de.path() == temporary_dir {
continue;
}

let meta = de.metadata();

match meta.mode() {
Expand Down Expand Up @@ -159,7 +163,7 @@ async fn vacuum_finished_query(
removed_temp_files: &mut usize,
total_cleaned_size: &mut usize,
batch_size: &mut usize,
de: &Entry,
parent: &Entry,
limit: usize,
timestamp: i64,
life_mills: i64,
Expand All @@ -168,7 +172,7 @@ async fn vacuum_finished_query(

let mut all_files_removed = true;
let mut ds = operator
.lister_with(de.path())
.lister_with(parent.path())
.metakey(Metakey::Mode | Metakey::LastModified)
.await?;

Expand All @@ -180,6 +184,10 @@ async fn vacuum_finished_query(
let mut remove_temp_files_path = Vec::with_capacity(1001);

while let Some(de) = ds.try_next().await? {
if de.path() == parent.path() {
continue;
}

let meta = de.metadata();
if meta.is_file() {
if de.name() == "finished" {
Expand Down Expand Up @@ -218,7 +226,7 @@ async fn vacuum_finished_query(
info!(
"vacuum removed {} temp files in {:?}(elapsed: {} seconds), batch size: {} bytes",
cur_removed,
de.path(),
parent.path(),
instant.elapsed().as_secs(),
*batch_size
);
Expand All @@ -238,8 +246,10 @@ async fn vacuum_finished_query(
}

if all_files_removed {
operator.delete(&format!("{}finished", de.path())).await?;
operator.delete(de.path()).await?;
operator
.delete(&format!("{}finished", parent.path()))
.await?;
operator.delete(parent.path()).await?;
}

Ok(())
Expand Down
33 changes: 26 additions & 7 deletions src/query/ee/tests/it/storages/fuse/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ async fn test_fuse_do_vacuum_drop_tables() -> Result<()> {

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_do_vacuum_temporary_files() -> Result<()> {
let _fixture = TestFixture::setup().await?;
Expand All @@ -121,15 +122,14 @@ async fn test_do_vacuum_temporary_files() -> Result<()> {
operator.write("test_dir/test2", vec![1, 2]).await?;
operator.write("test_dir/test3", vec![1, 2]).await?;

assert_eq!(
3,
operator.list_with("test_dir/").recursive(true).await?.len()
);
let size = operator.list_with("test_dir/").recursive(true).await?.len();
assert!((3..=4).contains(&size));

tokio::time::sleep(Duration::from_secs(2)).await;
do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(2)), 1).await?;

assert_eq!(2, operator.list("test_dir/").await?.len());
let size = operator.list("test_dir/").await?.len();
assert!((2..=3).contains(&size));

operator.write("test_dir/test4/test4", vec![1, 2]).await?;
operator.write("test_dir/test5/test5", vec![1, 2]).await?;
Expand All @@ -138,11 +138,16 @@ async fn test_do_vacuum_temporary_files() -> Result<()> {
.await?;

do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(2)), 2).await?;
assert_eq!(operator.list("test_dir/").await?.len(), 2);
let size = operator.list("test_dir/").await?.len();
assert!((2..=3).contains(&size));

tokio::time::sleep(Duration::from_secs(3)).await;
do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(3)), 1000).await?;
assert!(operator.list_with("test_dir/").await?.is_empty());

dbg!(operator.list_with("test_dir/").await?);

let size = operator.list("test_dir/").await?.len();
assert!((0..=1).contains(&size));

Ok(())
}
Expand All @@ -155,8 +160,10 @@ mod test_accessor {
use opendal::raw::oio;
use opendal::raw::oio::Entry;
use opendal::raw::MaybeSend;
use opendal::raw::OpBatch;
use opendal::raw::OpDelete;
use opendal::raw::OpList;
use opendal::raw::RpBatch;
use opendal::raw::RpDelete;
use opendal::raw::RpList;

Expand Down Expand Up @@ -266,6 +273,18 @@ mod test_accessor {
}
}

async fn batch(&self, _args: OpBatch) -> opendal::Result<RpBatch> {
self.hit_delete.store(true, Ordering::Release);
if self.inject_delete_faulty {
Err(opendal::Error::new(
opendal::ErrorKind::Unexpected,
"does not matter (delete)",
))
} else {
Ok(RpBatch::new(vec![]))
}
}

async fn list(&self, path: &str, _args: OpList) -> opendal::Result<(RpList, Self::Lister)> {
if self.inject_delete_faulty {
// While injecting faulty for delete operation, return an empty list;
Expand Down

0 comments on commit e6b1931

Please sign in to comment.