Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/src/guide/distributed_indexing.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ At that point the caller has two execution choices:
After the physical segments are built, publish them with
`commit_existing_index_segments(...)`.

Within a single commit, built segments must have disjoint fragment coverage.

## Internal Segmented Finalize Model

Internally, Lance models distributed vector segment build as:
Expand Down Expand Up @@ -155,5 +157,10 @@ Lance is responsible for:
- merging segment storage into physical segment artifacts
- committing physical segments into the manifest

If a staging root or built segment directory is never committed, it remains an
unreferenced index directory under `_indices/`. These artifacts are cleaned up
by `cleanup_old_versions(...)` using the same age-based rules as other
unreferenced index files.

This split keeps distributed scheduling outside the storage engine while still
letting Lance own the on-disk index format.
198 changes: 198 additions & 0 deletions rust/lance/src/dataset/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,7 @@ mod tests {
use crate::blob::{BlobArrayBuilder, blob_field};
use crate::index::DatasetIndexExt;
use crate::{
dataset::transaction::{Operation, Transaction},
dataset::{ReadParams, WriteMode, WriteParams, builder::DatasetBuilder},
index::vector::VectorIndexParams,
};
Expand All @@ -1202,6 +1203,7 @@ mod tests {
use lance_table::io::commit::RenameCommitHandler;
use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector, some_batch};
use mock_instant::thread_local::MockClock;
use uuid::Uuid;

#[derive(Debug)]
struct MockObjectStore {
Expand Down Expand Up @@ -1530,6 +1532,59 @@ mod tests {
}
}

async fn write_dummy_index_artifact(dataset: &Dataset, uuid: Uuid) -> Result<()> {
let index_dir = dataset.indices_dir().child(uuid.to_string());
dataset
.object_store()
.put(&index_dir.child("index.idx"), b"idx")
.await?;
dataset
.object_store()
.put(&index_dir.child("auxiliary.idx"), b"aux")
.await?;
Ok(())
}

async fn write_dummy_staging_partial(
dataset: &Dataset,
staging_uuid: Uuid,
shard_uuid: Uuid,
) -> Result<()> {
let shard_dir = dataset
.indices_dir()
.child(staging_uuid.to_string())
.child(format!("partial_{}", shard_uuid));
dataset
.object_store()
.put(&shard_dir.child("index.idx"), b"idx")
.await?;
dataset
.object_store()
.put(&shard_dir.child("auxiliary.idx"), b"aux")
.await?;
Ok(())
}

fn dummy_index_metadata(
dataset: &Dataset,
field_id: i32,
uuid: Uuid,
fragment_bitmap: impl IntoIterator<Item = u32>,
) -> IndexMetadata {
IndexMetadata {
uuid,
name: "some_index".to_string(),
fields: vec![field_id],
dataset_version: dataset.version().version,
fragment_bitmap: Some(fragment_bitmap.into_iter().collect()),
index_details: None,
index_version: IndexType::Vector.version(),
created_at: None,
base_id: None,
files: None,
}
}

fn blob_v2_batch(blob_len: usize) -> Box<dyn RecordBatchReader + Send> {
let mut blobs = BlobArrayBuilder::new(1);
blobs.push_bytes(vec![0u8; blob_len]).unwrap();
Expand Down Expand Up @@ -2182,6 +2237,149 @@ mod tests {
assert_eq!(before_count, after_count);
}

#[tokio::test]
async fn cleanup_old_replaced_segment_keeps_still_referenced_segments() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();

let mut dataset = fixture.open().await.unwrap();
let field_id = dataset.schema().field("indexable").unwrap().id;

let seg_a = Uuid::new_v4();
let seg_b = Uuid::new_v4();
write_dummy_index_artifact(&dataset, seg_a).await.unwrap();
write_dummy_index_artifact(&dataset, seg_b).await.unwrap();

let index_a = dummy_index_metadata(&dataset, field_id, seg_a, [0_u32]);
let index_b = dummy_index_metadata(&dataset, field_id, seg_b, [1_u32]);
let initial_tx = Transaction::new(
dataset.manifest.version,
Operation::CreateIndex {
new_indices: vec![index_a.clone(), index_b.clone()],
removed_indices: vec![],
},
None,
);
dataset
.apply_commit(initial_tx, &Default::default(), &Default::default())
.await
.unwrap();

MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());

let seg_c = Uuid::new_v4();
write_dummy_index_artifact(&dataset, seg_c).await.unwrap();
let index_c = dummy_index_metadata(&dataset, field_id, seg_c, [2_u32]);
let replace_tx = Transaction::new(
dataset.manifest.version,
Operation::CreateIndex {
new_indices: vec![index_c.clone()],
removed_indices: vec![index_a.clone()],
},
None,
);
dataset
.apply_commit(replace_tx, &Default::default(), &Default::default())
.await
.unwrap();

let removed = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap())
.await
.unwrap();

assert_eq!(removed.index_files_removed, 2);
assert!(
!dataset
.object_store()
.exists(
&dataset
.indices_dir()
.child(seg_a.to_string())
.child("index.idx")
)
.await
.unwrap()
);
assert!(
dataset
.object_store()
.exists(
&dataset
.indices_dir()
.child(seg_b.to_string())
.child("index.idx")
)
.await
.unwrap()
);
assert!(
dataset
.object_store()
.exists(
&dataset
.indices_dir()
.child(seg_c.to_string())
.child("index.idx")
)
.await
.unwrap()
);
}

#[tokio::test]
async fn cleanup_old_uncommitted_index_artifacts() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();

let dataset = fixture.open().await.unwrap();
let staging_uuid = Uuid::new_v4();
let shard_uuid = Uuid::new_v4();
let built_segment_uuid = Uuid::new_v4();

write_dummy_staging_partial(&dataset, staging_uuid, shard_uuid)
.await
.unwrap();
write_dummy_index_artifact(&dataset, built_segment_uuid)
.await
.unwrap();

MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());

let removed = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap())
.await
.unwrap();

assert_eq!(removed.old_versions, 0);
assert_eq!(removed.index_files_removed, 4);
assert!(
!dataset
.object_store()
.exists(
&dataset
.indices_dir()
.child(staging_uuid.to_string())
.child(format!("partial_{}", shard_uuid))
.child("index.idx"),
)
.await
.unwrap()
);
assert!(
!dataset
.object_store()
.exists(
&dataset
.indices_dir()
.child(built_segment_uuid.to_string())
.child("index.idx"),
)
.await
.unwrap()
);
}

#[tokio::test]
async fn cleanup_failed_commit_data_file() {
// We should clean up data files that are written but the commit failed
Expand Down
118 changes: 118 additions & 0 deletions rust/lance/src/dataset/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,121 @@ impl LanceIndexStoreExt for LanceIndexStore {
Ok(store.with_file_sizes(index.file_size_map()))
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use super::*;
use crate::dataset::WriteParams;
use crate::index::vector::VectorIndexParams;
use crate::index::{DatasetIndexExt, IndexSegment};
use lance_datagen::{BatchCount, RowCount, array};
use lance_index::IndexType;
use lance_linalg::distance::MetricType;
use uuid::Uuid;

#[tokio::test]
async fn test_remapper_only_touches_segments_with_affected_fragments() {
let test_dir = tempfile::tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();

let reader = lance_datagen::gen_batch()
.col("id", array::step::<arrow_array::types::Int32Type>())
.col(
"vector",
array::rand_vec::<arrow_array::types::Float32Type>(16.into()),
)
.into_reader_rows(RowCount::from(40), BatchCount::from(2));

let mut dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
max_rows_per_file: 20,
max_rows_per_group: 20,
..Default::default()
}),
)
.await
.unwrap();

let fragments = dataset.get_fragments();
assert!(
fragments.len() >= 2,
"expected at least two fragments for this test"
);
let target_fragments = fragments.iter().take(2).collect::<Vec<_>>();

let params = VectorIndexParams::ivf_flat(2, MetricType::L2);
let first_segment_uuid = Uuid::new_v4();
let second_segment_uuid = Uuid::new_v4();
let built_index = dataset
.create_index_builder(&["vector"], IndexType::Vector, &params)
.name("vector_idx".to_string())
.index_uuid(first_segment_uuid.to_string())
.execute_uncommitted()
.await
.unwrap();
let first_segment_dir = dataset.indices_dir().child(first_segment_uuid.to_string());
let second_segment_dir = dataset.indices_dir().child(second_segment_uuid.to_string());
for file_name in ["index.idx", "auxiliary.idx"] {
dataset
.object_store()
.copy(
&first_segment_dir.child(file_name),
&second_segment_dir.child(file_name),
)
.await
.unwrap();
}

let segments = vec![
IndexSegment::new(
first_segment_uuid,
[target_fragments[0].id() as u32],
built_index.index_details.clone().unwrap(),
built_index.index_version,
),
IndexSegment::new(
second_segment_uuid,
[target_fragments[1].id() as u32],
built_index.index_details.clone().unwrap(),
built_index.index_version,
),
];

dataset
.commit_existing_index_segments("vector_idx", "vector", segments)
.await
.unwrap();
let committed = dataset.load_indices_by_name("vector_idx").await.unwrap();
let committed_ids = committed
.iter()
.map(|segment| segment.uuid)
.collect::<Vec<_>>();
let unaffected_segment_id = committed
.iter()
.find(|segment| {
segment
.fragment_bitmap
.as_ref()
.is_some_and(|bitmap| bitmap.contains(target_fragments[1].id() as u32))
})
.map(|segment| segment.uuid)
.expect("expected one committed segment to cover the unaffected fragment");

let remapper = DatasetIndexRemapperOptions::default()
.create_remapper(&dataset)
.unwrap();
let remapped = remapper
.remap_indices(HashMap::new(), &[target_fragments[0].id() as u64])
.await
.unwrap();

assert_eq!(remapped.len(), 1);
assert!(committed_ids.contains(&remapped[0].old_id));
assert_ne!(remapped[0].old_id, unaffected_segment_id);
assert_ne!(remapped[0].new_id, unaffected_segment_id);
}
}
Loading
Loading