Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 99 additions & 8 deletions rust/lance/src/index/vector/ivf/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ mod tests {

const NUM_ROWS: usize = 512;
const DIM: usize = 32;
const PARTITION_SPLIT_APPEND_ROWS: usize = 50_000;

async fn generate_test_dataset<T: ArrowPrimitiveType>(
test_uri: &str,
Expand Down Expand Up @@ -729,6 +730,32 @@ mod tests {
vectors
}

async fn append_identical_vectors(dataset: &mut Dataset, num_rows: usize, vector: &[f32]) {
assert_eq!(
vector.len(),
DIM,
"vector length ({}) must match DIM ({})",
vector.len(),
DIM
);
let start_id = dataset.count_all_rows().await.unwrap() as u64;
let ids: ArrayRef = Arc::new(UInt64Array::from_iter_values(
start_id..start_id + num_rows as u64,
));
let mut values = Vec::with_capacity(num_rows * DIM);
for _ in 0..num_rows {
values.extend_from_slice(vector);
}
let vectors: ArrayRef = Arc::new(
FixedSizeListArray::try_new_from_values(Float32Array::from(values), DIM as i32)
.unwrap(),
);
let schema = Arc::new(Schema::from(dataset.schema()));
let batch = RecordBatch::try_new(schema.clone(), vec![ids, vectors]).unwrap();
let batches = RecordBatchIterator::new(vec![Ok(batch)], schema);
dataset.append(batches, None).await.unwrap();
}

fn generate_batch<T: ArrowPrimitiveType>(
num_rows: usize,
start_id: Option<u64>,
Expand Down Expand Up @@ -843,9 +870,9 @@ mod tests {
test_uri: &str,
params: VectorIndexParams,
description: &str,
append_override: Option<Vec<f32>>,
) {
const INDEX_NAME: &str = "vector_idx";
const APPEND_ROWS: usize = 50_000;

dataset
.create_index(
Expand All @@ -867,8 +894,13 @@ mod tests {
initial_ctx.stats_json()
);

// Append tightly clustered vectors so data flows into the same partition.
append_dataset::<Float32Type>(&mut dataset, APPEND_ROWS, 0.0..0.05).await;
// Append additional data to trigger a split.
if let Some(vector) = append_override {
append_identical_vectors(&mut dataset, PARTITION_SPLIT_APPEND_ROWS, &vector).await;
} else {
append_dataset::<Float32Type>(&mut dataset, PARTITION_SPLIT_APPEND_ROWS, 0.0..0.05)
.await;
}

dataset
.optimize_indices(&OptimizeOptions::new())
Expand Down Expand Up @@ -2168,15 +2200,73 @@ mod tests {
let test_dir = TempStrDir::default();
let test_uri = test_dir.as_str();

// Create initial dataset with just enough rows for 2 partitions
// Using a small initial dataset to avoid long test times
let (dataset, _) = generate_test_dataset::<Float32Type>(test_uri, 0.0..1.0).await;
// Build deterministic two-cluster data so centroids are predictable.
const ROWS_PER_CLUSTER: usize = 2_048;
let cluster_vectors = vec![
{
let mut v = vec![0.0; DIM];
v[0] = 1.0;
v
},
{
let mut v = vec![0.0; DIM];
v[1] = 1.0;
v
},
];
let total_rows = ROWS_PER_CLUSTER * cluster_vectors.len();
let mut ids = Vec::with_capacity(total_rows);
let mut vector_values = Vec::with_capacity(total_rows * DIM);
let mut current_id = 0u64;
for cluster_vector in &cluster_vectors {
for _ in 0..ROWS_PER_CLUSTER {
ids.push(current_id);
current_id += 1;
vector_values.extend_from_slice(cluster_vector);
}
}

let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::UInt64, false),
Field::new(
"vector",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
DIM as i32,
),
false,
),
]));
let ids_array: ArrayRef = Arc::new(UInt64Array::from(ids));
let vectors_array: ArrayRef = Arc::new(
FixedSizeListArray::try_new_from_values(Float32Array::from(vector_values), DIM as i32)
.unwrap(),
);
let batch = RecordBatch::try_new(schema.clone(), vec![ids_array, vectors_array]).unwrap();
let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
let dataset = Dataset::write(
batches,
test_uri,
Some(WriteParams {
mode: crate::dataset::WriteMode::Overwrite,
..Default::default()
}),
)
.await
.unwrap();

// Create an IVF-PQ index with 2 partitions
// For IvfPq, target_partition_size = 8192
// Split triggers when partition_size > 4 * 8192 = 32,768
let params = VectorIndexParams::ivf_pq(2, 8, DIM / 8, DistanceType::L2, 50);
verify_partition_split_after_append(dataset, test_uri, params, "scalar vector data").await;
verify_partition_split_after_append(
dataset,
test_uri,
params,
"scalar vector data",
Some(cluster_vectors[0].clone()),
)
.await;
}

#[tokio::test]
Expand Down Expand Up @@ -2424,7 +2514,8 @@ mod tests {
// For IvfPq, target_partition_size = 8192
// Split triggers when partition_size > 4 * 8192 = 32,768
let params = VectorIndexParams::ivf_pq(2, 8, DIM / 8, DistanceType::Cosine, 50);
verify_partition_split_after_append(dataset, test_uri, params, "multivector data").await;
verify_partition_split_after_append(dataset, test_uri, params, "multivector data", None)
.await;
}

#[tokio::test]
Expand Down
Loading