Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 194 additions & 2 deletions rust/lance/src/dataset/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use std::{
future,
sync::{Mutex, MutexGuard},
};
use tracing::{info, instrument, Span};
use tracing::{debug, info, instrument, Span};

use super::refs::TagContents;
use crate::{utils::temporal::utc_now, Dataset};
Expand Down Expand Up @@ -390,6 +390,72 @@ impl<'a> CleanupTask<'a> {
Ok(None)
}
}
Some("blob") => {
// Blob v2 sidecar files are keyed by the data file stem:
// data/{data_file_key}/{blob_id:08x}.blob
//
// These files are not referenced directly by the manifest. Instead, treat them
// as referenced if their parent data file is referenced.
if !relative_path.as_ref().starts_with("data") {
debug!(
path = relative_path.as_ref(),
"Will not garbage collect blob file because it does not follow convention"
);
return Ok(None);
Comment thread
Xuanwo marked this conversation as resolved.
}

let mut parts = relative_path.parts();
let data_dir = parts.next();
let data_file_key = parts.next();
let blob_file = parts.next();
// Be conservative: only handle the expected 3-part layout.
if data_dir.is_none() || data_file_key.is_none() || blob_file.is_none() {
debug!(
path = relative_path.as_ref(),
"Will not garbage collect blob file because it does not follow convention"
);
return Ok(None);
Comment thread
Xuanwo marked this conversation as resolved.
}
if parts.next().is_some() {
debug!(
path = relative_path.as_ref(),
"Will not garbage collect blob file because it does not follow convention"
);
return Ok(None);
Comment thread
Xuanwo marked this conversation as resolved.
}

let data_file_key = data_file_key.expect("checked is_some");
let Ok(parent_data_path) =
Path::parse(format!("data/{}.lance", data_file_key.as_ref()))
else {
debug!(
path = relative_path.as_ref(),
derived_parent = format!("data/{}.lance", data_file_key.as_ref()),
"Will not garbage collect blob file because derived parent data file path is invalid"
);
return Ok(None);
Comment thread
Xuanwo marked this conversation as resolved.
};

if inspection
.referenced_files
.data_paths
.contains(&parent_data_path)
{
Ok(None)
} else if !maybe_in_progress {
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE_UNVERIFIED, r#type=AUDIT_TYPE_DATA, path = path.to_string());
Ok(Some(path))
} else if inspection
.verified_files
.data_paths
.contains(&parent_data_path)
{
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_DATA, path = path.to_string());
Ok(Some(path))
} else {
Ok(None)
}
}
Some("manifest") => {
// We already scanned the manifest files
Ok(None)
Expand Down Expand Up @@ -637,7 +703,8 @@ fn tagged_old_versions_cleanup_error(
mod tests {
use std::{collections::HashMap, sync::Arc};

use arrow_array::RecordBatchReader;
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, RecordBatchReader};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use datafusion::common::assert_contains;
use lance_core::utils::testing::{ProxyObjectStore, ProxyObjectStorePolicy};
use lance_index::{DatasetIndexExt, IndexType};
Expand All @@ -651,6 +718,7 @@ mod tests {
use snafu::location;

use super::*;
use crate::blob::{blob_field, BlobArrayBuilder};
use crate::{
dataset::{builder::DatasetBuilder, ReadParams, WriteMode, WriteParams},
index::vector::VectorIndexParams,
Expand Down Expand Up @@ -931,13 +999,49 @@ mod tests {
Ok(file_count)
}

async fn count_blob_files(&self) -> Result<usize> {
let registry = Arc::new(ObjectStoreRegistry::default());
let (os, path) =
ObjectStore::from_uri_and_params(registry, &self.dataset_path, &self.os_params())
.await?;
let mut file_stream = os.read_dir_all(&path, None);
let mut blob_count = 0usize;
while let Some(path) = file_stream.try_next().await? {
if path.location.extension() == Some("blob") {
blob_count += 1;
}
}
Ok(blob_count)
}

async fn count_rows(&self) -> Result<usize> {
let db = self.open().await?;
let count = db.count_rows(None).await?;
Ok(count)
}
}

fn blob_v2_batch(blob_len: usize) -> Box<dyn RecordBatchReader + Send> {
let mut blobs = BlobArrayBuilder::new(1);
blobs.push_bytes(vec![0u8; blob_len]).unwrap();

let schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false),
blob_field("blob", true),
]));

let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![1])), blobs.finish().unwrap()],
)
.unwrap();

Box::new(RecordBatchIterator::new(
vec![Ok(batch)].into_iter(),
schema,
))
}

#[tokio::test]
async fn cleanup_unreferenced_data_files() {
// We should clean up data files that are only referenced
Expand Down Expand Up @@ -978,6 +1082,94 @@ mod tests {
assert_gt!(after_count.num_tx_files, 0);
}

#[tokio::test]
async fn cleanup_blob_v2_sidecar_files() {
let fixture = MockDatasetFixture::try_new().unwrap();

// First version: write a packed blob (sidecar .blob file).
Dataset::write(
blob_v2_batch(100 * 1024),
&fixture.dataset_path,
Some(WriteParams {
store_params: Some(fixture.os_params()),
commit_handler: Some(Arc::new(RenameCommitHandler)),
mode: WriteMode::Create,
data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2),
..Default::default()
}),
)
.await
.unwrap();
assert_gt!(fixture.count_blob_files().await.unwrap(), 0);

// Second version: overwrite with an inline blob (no sidecar).
Dataset::write(
blob_v2_batch(1024),
&fixture.dataset_path,
Some(WriteParams {
store_params: Some(fixture.os_params()),
commit_handler: Some(Arc::new(RenameCommitHandler)),
mode: WriteMode::Overwrite,
data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2),
..Default::default()
}),
)
.await
.unwrap();

// Advance time so the unverified threshold doesn't interfere.
MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());

fixture
.run_cleanup(utc_now() - TimeDelta::try_days(8).unwrap())
.await
.unwrap();

assert_eq!(fixture.count_blob_files().await.unwrap(), 0);
}

#[tokio::test]
async fn cleanup_recent_blob_v2_sidecar_files_when_verified() {
let fixture = MockDatasetFixture::try_new().unwrap();

Dataset::write(
blob_v2_batch(100 * 1024),
&fixture.dataset_path,
Some(WriteParams {
store_params: Some(fixture.os_params()),
commit_handler: Some(Arc::new(RenameCommitHandler)),
mode: WriteMode::Create,
data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2),
..Default::default()
}),
)
.await
.unwrap();

Dataset::write(
blob_v2_batch(1024),
&fixture.dataset_path,
Some(WriteParams {
store_params: Some(fixture.os_params()),
commit_handler: Some(Arc::new(RenameCommitHandler)),
mode: WriteMode::Overwrite,
data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2),
..Default::default()
}),
)
.await
.unwrap();

// Old version is verified (referenced by an old manifest) even though the files are
// recent; cleanup should remove them without waiting 7 days.
fixture
.run_cleanup(utc_now() + TimeDelta::seconds(1))
.await
.unwrap();

assert_eq!(fixture.count_blob_files().await.unwrap(), 0);
}

#[tokio::test]
async fn do_not_cleanup_newer_data() {
// Even though an old manifest is removed the data files should
Expand Down