Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/src/guide/distributed_indexing.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ At that point the caller has two execution choices:
After the physical segments are built, publish them with
`commit_existing_index_segments(...)`.

Within a single commit, built segments must have disjoint fragment coverage.

## Internal Segmented Finalize Model

Internally, Lance models distributed vector segment build as:
Expand Down Expand Up @@ -155,5 +157,10 @@ Lance is responsible for:
- merging segment storage into physical segment artifacts
- committing physical segments into the manifest

If a staging root or built segment directory is never committed, it remains an
unreferenced index directory under `_indices/`. These artifacts are cleaned up
by `cleanup_old_versions(...)` using the same age-based rules as other
unreferenced index files.

This split keeps distributed scheduling outside the storage engine while still
letting Lance own the on-disk index format.
198 changes: 198 additions & 0 deletions rust/lance/src/dataset/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,7 @@ mod tests {
use crate::blob::{BlobArrayBuilder, blob_field};
use crate::index::DatasetIndexExt;
use crate::{
dataset::transaction::{Operation, Transaction},
dataset::{ReadParams, WriteMode, WriteParams, builder::DatasetBuilder},
index::vector::VectorIndexParams,
};
Expand All @@ -1202,6 +1203,7 @@ mod tests {
use lance_table::io::commit::RenameCommitHandler;
use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector, some_batch};
use mock_instant::thread_local::MockClock;
use uuid::Uuid;

#[derive(Debug)]
struct MockObjectStore {
Expand Down Expand Up @@ -1530,6 +1532,59 @@ mod tests {
}
}

async fn write_dummy_index_artifact(dataset: &Dataset, uuid: Uuid) -> Result<()> {
let index_dir = dataset.indices_dir().child(uuid.to_string());
dataset
.object_store()
.put(&index_dir.child("index.idx"), b"idx")
.await?;
dataset
.object_store()
.put(&index_dir.child("auxiliary.idx"), b"aux")
.await?;
Ok(())
}

async fn write_dummy_staging_partial(
dataset: &Dataset,
staging_uuid: Uuid,
shard_uuid: Uuid,
) -> Result<()> {
let shard_dir = dataset
.indices_dir()
.child(staging_uuid.to_string())
.child(format!("partial_{}", shard_uuid));
dataset
.object_store()
.put(&shard_dir.child("index.idx"), b"idx")
.await?;
dataset
.object_store()
.put(&shard_dir.child("auxiliary.idx"), b"aux")
.await?;
Ok(())
}

fn dummy_index_metadata(
dataset: &Dataset,
field_id: i32,
uuid: Uuid,
fragment_bitmap: impl IntoIterator<Item = u32>,
) -> IndexMetadata {
IndexMetadata {
uuid,
name: "some_index".to_string(),
fields: vec![field_id],
dataset_version: dataset.version().version,
fragment_bitmap: Some(fragment_bitmap.into_iter().collect()),
index_details: None,
index_version: IndexType::Vector.version(),
created_at: None,
base_id: None,
files: None,
}
}

fn blob_v2_batch(blob_len: usize) -> Box<dyn RecordBatchReader + Send> {
let mut blobs = BlobArrayBuilder::new(1);
blobs.push_bytes(vec![0u8; blob_len]).unwrap();
Expand Down Expand Up @@ -2182,6 +2237,149 @@ mod tests {
assert_eq!(before_count, after_count);
}

#[tokio::test]
async fn cleanup_old_replaced_segment_keeps_still_referenced_segments() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();

let mut dataset = fixture.open().await.unwrap();
let field_id = dataset.schema().field("indexable").unwrap().id;

let seg_a = Uuid::new_v4();
let seg_b = Uuid::new_v4();
write_dummy_index_artifact(&dataset, seg_a).await.unwrap();
write_dummy_index_artifact(&dataset, seg_b).await.unwrap();

let index_a = dummy_index_metadata(&dataset, field_id, seg_a, [0_u32]);
let index_b = dummy_index_metadata(&dataset, field_id, seg_b, [1_u32]);
let initial_tx = Transaction::new(
dataset.manifest.version,
Operation::CreateIndex {
new_indices: vec![index_a.clone(), index_b.clone()],
removed_indices: vec![],
},
None,
);
dataset
.apply_commit(initial_tx, &Default::default(), &Default::default())
.await
.unwrap();

MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());

let seg_c = Uuid::new_v4();
write_dummy_index_artifact(&dataset, seg_c).await.unwrap();
let index_c = dummy_index_metadata(&dataset, field_id, seg_c, [2_u32]);
let replace_tx = Transaction::new(
dataset.manifest.version,
Operation::CreateIndex {
new_indices: vec![index_c.clone()],
removed_indices: vec![index_a.clone()],
},
None,
);
dataset
.apply_commit(replace_tx, &Default::default(), &Default::default())
.await
.unwrap();

let removed = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap())
.await
.unwrap();

assert_eq!(removed.index_files_removed, 2);
assert!(
!dataset
.object_store()
.exists(
&dataset
.indices_dir()
.child(seg_a.to_string())
.child("index.idx")
)
.await
.unwrap()
);
assert!(
dataset
.object_store()
.exists(
&dataset
.indices_dir()
.child(seg_b.to_string())
.child("index.idx")
)
.await
.unwrap()
);
assert!(
dataset
.object_store()
.exists(
&dataset
.indices_dir()
.child(seg_c.to_string())
.child("index.idx")
)
.await
.unwrap()
);
}

#[tokio::test]
async fn cleanup_old_uncommitted_index_artifacts() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();

let dataset = fixture.open().await.unwrap();
let staging_uuid = Uuid::new_v4();
let shard_uuid = Uuid::new_v4();
let built_segment_uuid = Uuid::new_v4();

write_dummy_staging_partial(&dataset, staging_uuid, shard_uuid)
.await
.unwrap();
write_dummy_index_artifact(&dataset, built_segment_uuid)
.await
.unwrap();

MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());

let removed = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap())
.await
.unwrap();

assert_eq!(removed.old_versions, 0);
assert_eq!(removed.index_files_removed, 4);
assert!(
!dataset
.object_store()
.exists(
&dataset
.indices_dir()
.child(staging_uuid.to_string())
.child(format!("partial_{}", shard_uuid))
.child("index.idx"),
)
.await
.unwrap()
);
assert!(
!dataset
.object_store()
.exists(
&dataset
.indices_dir()
.child(built_segment_uuid.to_string())
.child("index.idx"),
)
.await
.unwrap()
);
}

#[tokio::test]
async fn cleanup_failed_commit_data_file() {
// We should clean up data files that are written but the commit failed
Expand Down
117 changes: 117 additions & 0 deletions rust/lance/src/dataset/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,120 @@ impl LanceIndexStoreExt for LanceIndexStore {
Ok(store.with_file_sizes(index.file_size_map()))
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use super::*;
use crate::dataset::WriteParams;
use crate::index::vector::VectorIndexParams;
use lance_datagen::{BatchCount, RowCount, array};
use lance_index::{DatasetIndexExt, IndexType};
use lance_linalg::distance::MetricType;
use uuid::Uuid;

#[tokio::test]
async fn test_remapper_only_touches_segments_with_affected_fragments() {
let test_dir = tempfile::tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();

let reader = lance_datagen::gen_batch()
.col("id", array::step::<arrow_array::types::Int32Type>())
.col(
"vector",
array::rand_vec::<arrow_array::types::Float32Type>(16.into()),
)
.into_reader_rows(RowCount::from(40), BatchCount::from(2));

let mut dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
max_rows_per_file: 20,
max_rows_per_group: 20,
..Default::default()
}),
)
.await
.unwrap();

let fragments = dataset.get_fragments();
assert!(
fragments.len() >= 2,
"expected at least two fragments for this test"
);
let target_fragments = fragments.iter().take(2).collect::<Vec<_>>();

let params = VectorIndexParams::ivf_flat(2, MetricType::L2);
let first_segment_uuid = Uuid::new_v4();
let second_segment_uuid = Uuid::new_v4();
let built_index = dataset
.create_index_builder(&["vector"], IndexType::Vector, &params)
.name("vector_idx".to_string())
.index_uuid(first_segment_uuid.to_string())
.execute_uncommitted()
.await
.unwrap();
let first_segment_dir = dataset.indices_dir().child(first_segment_uuid.to_string());
let second_segment_dir = dataset.indices_dir().child(second_segment_uuid.to_string());
for file_name in ["index.idx", "auxiliary.idx"] {
dataset
.object_store()
.copy(
&first_segment_dir.child(file_name),
&second_segment_dir.child(file_name),
)
.await
.unwrap();
}

let segments = vec![
lance_index::IndexSegment::new(
first_segment_uuid,
[target_fragments[0].id() as u32],
built_index.index_details.clone().unwrap(),
built_index.index_version,
),
lance_index::IndexSegment::new(
second_segment_uuid,
[target_fragments[1].id() as u32],
built_index.index_details.clone().unwrap(),
built_index.index_version,
),
];

dataset
.commit_existing_index_segments("vector_idx", "vector", segments)
.await
.unwrap();
let committed = dataset.load_indices_by_name("vector_idx").await.unwrap();
let committed_ids = committed
.iter()
.map(|segment| segment.uuid)
.collect::<Vec<_>>();
let unaffected_segment_id = committed
.iter()
.find(|segment| {
segment
.fragment_bitmap
.as_ref()
.is_some_and(|bitmap| bitmap.contains(target_fragments[1].id() as u32))
})
.map(|segment| segment.uuid)
.expect("expected one committed segment to cover the unaffected fragment");

let remapper = DatasetIndexRemapperOptions::default()
.create_remapper(&dataset)
.unwrap();
let remapped = remapper
.remap_indices(HashMap::new(), &[target_fragments[0].id() as u64])
.await
.unwrap();

assert_eq!(remapped.len(), 1);
assert!(committed_ids.contains(&remapped[0].old_id));
assert_ne!(remapped[0].old_id, unaffected_segment_id);
assert_ne!(remapped[0].new_id, unaffected_segment_id);
}
}
Loading
Loading