diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 75cf4a60996..0483c943bc0 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -58,7 +58,7 @@ use std::{ future, sync::{Mutex, MutexGuard}, }; -use tracing::{info, instrument, Span}; +use tracing::{debug, info, instrument, Span}; use super::refs::TagContents; use crate::{utils::temporal::utc_now, Dataset}; @@ -390,6 +390,72 @@ impl<'a> CleanupTask<'a> { Ok(None) } } + Some("blob") => { + // Blob v2 sidecar files are keyed by the data file stem: + // data/{data_file_key}/{blob_id:08x}.blob + // + // These files are not referenced directly by the manifest. Instead, treat them + // as referenced if their parent data file is referenced. + if !relative_path.as_ref().starts_with("data") { + debug!( + path = relative_path.as_ref(), + "Will not garbage collect blob file because it does not follow convention" + ); + return Ok(None); + } + + let mut parts = relative_path.parts(); + let data_dir = parts.next(); + let data_file_key = parts.next(); + let blob_file = parts.next(); + // Be conservative: only handle the expected 3-part layout. + if data_dir.is_none() || data_file_key.is_none() || blob_file.is_none() { + debug!( + path = relative_path.as_ref(), + "Will not garbage collect blob file because it does not follow convention" + ); + return Ok(None); + } + if parts.next().is_some() { + debug!( + path = relative_path.as_ref(), + "Will not garbage collect blob file because it does not follow convention" + ); + return Ok(None); + } + + let data_file_key = data_file_key.expect("checked is_some"); + let Ok(parent_data_path) = + Path::parse(format!("data/{}.lance", data_file_key.as_ref())) + else { + debug!( + path = relative_path.as_ref(), + derived_parent = format!("data/{}.lance", data_file_key.as_ref()), + "Will not garbage collect blob file because derived parent data file path is invalid" + ); + return Ok(None); + }; + + if inspection + .referenced_files + .data_paths + .contains(&parent_data_path) + { + Ok(None) + } else if !maybe_in_progress { + info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE_UNVERIFIED, r#type=AUDIT_TYPE_DATA, path = path.to_string()); + Ok(Some(path)) + } else if inspection + .verified_files + .data_paths + .contains(&parent_data_path) + { + info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_DATA, path = path.to_string()); + Ok(Some(path)) + } else { + Ok(None) + } + } Some("manifest") => { // We already scanned the manifest files Ok(None) @@ -637,7 +703,8 @@ fn tagged_old_versions_cleanup_error( mod tests { use std::{collections::HashMap, sync::Arc}; - use arrow_array::RecordBatchReader; + use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, RecordBatchReader}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use datafusion::common::assert_contains; use lance_core::utils::testing::{ProxyObjectStore, ProxyObjectStorePolicy}; use lance_index::{DatasetIndexExt, IndexType}; @@ -651,6 +718,7 @@ mod tests { use snafu::location; use super::*; + use crate::blob::{blob_field, BlobArrayBuilder}; use crate::{ dataset::{builder::DatasetBuilder, ReadParams, WriteMode, WriteParams}, index::vector::VectorIndexParams, @@ -931,6 +999,21 @@ mod tests { Ok(file_count) } + async fn count_blob_files(&self) -> Result { + let registry = Arc::new(ObjectStoreRegistry::default()); + let (os, path) = + ObjectStore::from_uri_and_params(registry, &self.dataset_path, &self.os_params()) + .await?; + let mut file_stream = os.read_dir_all(&path, None); + let mut blob_count = 0usize; + while let Some(path) = file_stream.try_next().await? { + if path.location.extension() == Some("blob") { + blob_count += 1; + } + } + Ok(blob_count) + } + async fn count_rows(&self) -> Result { let db = self.open().await?; let count = db.count_rows(None).await?; @@ -938,6 +1021,27 @@ mod tests { } } + fn blob_v2_batch(blob_len: usize) -> Box { + let mut blobs = BlobArrayBuilder::new(1); + blobs.push_bytes(vec![0u8; blob_len]).unwrap(); + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + blob_field("blob", true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1])), blobs.finish().unwrap()], + ) + .unwrap(); + + Box::new(RecordBatchIterator::new( + vec![Ok(batch)].into_iter(), + schema, + )) + } + #[tokio::test] async fn cleanup_unreferenced_data_files() { // We should clean up data files that are only referenced @@ -978,6 +1082,94 @@ mod tests { assert_gt!(after_count.num_tx_files, 0); } + #[tokio::test] + async fn cleanup_blob_v2_sidecar_files() { + let fixture = MockDatasetFixture::try_new().unwrap(); + + // First version: write a packed blob (sidecar .blob file). + Dataset::write( + blob_v2_batch(100 * 1024), + &fixture.dataset_path, + Some(WriteParams { + store_params: Some(fixture.os_params()), + commit_handler: Some(Arc::new(RenameCommitHandler)), + mode: WriteMode::Create, + data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2), + ..Default::default() + }), + ) + .await + .unwrap(); + assert_gt!(fixture.count_blob_files().await.unwrap(), 0); + + // Second version: overwrite with an inline blob (no sidecar). + Dataset::write( + blob_v2_batch(1024), + &fixture.dataset_path, + Some(WriteParams { + store_params: Some(fixture.os_params()), + commit_handler: Some(Arc::new(RenameCommitHandler)), + mode: WriteMode::Overwrite, + data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2), + ..Default::default() + }), + ) + .await + .unwrap(); + + // Advance time so the unverified threshold doesn't interfere. + MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap()); + + fixture + .run_cleanup(utc_now() - TimeDelta::try_days(8).unwrap()) + .await + .unwrap(); + + assert_eq!(fixture.count_blob_files().await.unwrap(), 0); + } + + #[tokio::test] + async fn cleanup_recent_blob_v2_sidecar_files_when_verified() { + let fixture = MockDatasetFixture::try_new().unwrap(); + + Dataset::write( + blob_v2_batch(100 * 1024), + &fixture.dataset_path, + Some(WriteParams { + store_params: Some(fixture.os_params()), + commit_handler: Some(Arc::new(RenameCommitHandler)), + mode: WriteMode::Create, + data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2), + ..Default::default() + }), + ) + .await + .unwrap(); + + Dataset::write( + blob_v2_batch(1024), + &fixture.dataset_path, + Some(WriteParams { + store_params: Some(fixture.os_params()), + commit_handler: Some(Arc::new(RenameCommitHandler)), + mode: WriteMode::Overwrite, + data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2), + ..Default::default() + }), + ) + .await + .unwrap(); + + // Old version is verified (referenced by an old manifest) even though the files are + // recent; cleanup should remove them without waiting 7 days. + fixture + .run_cleanup(utc_now() + TimeDelta::seconds(1)) + .await + .unwrap(); + + assert_eq!(fixture.count_blob_files().await.unwrap(), 0); + } + #[tokio::test] async fn do_not_cleanup_newer_data() { // Even though an old manifest is removed the data files should