diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index f6ea14b107e..3e4451ef81a 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -670,6 +670,7 @@ mod tests { const NUM_ROWS: usize = 512; const DIM: usize = 32; + const PARTITION_SPLIT_APPEND_ROWS: usize = 50_000; async fn generate_test_dataset( test_uri: &str, @@ -729,6 +730,32 @@ mod tests { vectors } + async fn append_identical_vectors(dataset: &mut Dataset, num_rows: usize, vector: &[f32]) { + assert_eq!( + vector.len(), + DIM, + "vector length ({}) must match DIM ({})", + vector.len(), + DIM + ); + let start_id = dataset.count_all_rows().await.unwrap() as u64; + let ids: ArrayRef = Arc::new(UInt64Array::from_iter_values( + start_id..start_id + num_rows as u64, + )); + let mut values = Vec::with_capacity(num_rows * DIM); + for _ in 0..num_rows { + values.extend_from_slice(vector); + } + let vectors: ArrayRef = Arc::new( + FixedSizeListArray::try_new_from_values(Float32Array::from(values), DIM as i32) + .unwrap(), + ); + let schema = Arc::new(Schema::from(dataset.schema())); + let batch = RecordBatch::try_new(schema.clone(), vec![ids, vectors]).unwrap(); + let batches = RecordBatchIterator::new(vec![Ok(batch)], schema); + dataset.append(batches, None).await.unwrap(); + } + fn generate_batch( num_rows: usize, start_id: Option, @@ -843,9 +870,9 @@ mod tests { test_uri: &str, params: VectorIndexParams, description: &str, + append_override: Option>, ) { const INDEX_NAME: &str = "vector_idx"; - const APPEND_ROWS: usize = 50_000; dataset .create_index( @@ -867,8 +894,13 @@ mod tests { initial_ctx.stats_json() ); - // Append tightly clustered vectors so data flows into the same partition. - append_dataset::(&mut dataset, APPEND_ROWS, 0.0..0.05).await; + // Append additional data to trigger a split. + if let Some(vector) = append_override { + append_identical_vectors(&mut dataset, PARTITION_SPLIT_APPEND_ROWS, &vector).await; + } else { + append_dataset::(&mut dataset, PARTITION_SPLIT_APPEND_ROWS, 0.0..0.05) + .await; + } dataset .optimize_indices(&OptimizeOptions::new()) @@ -2168,15 +2200,73 @@ mod tests { let test_dir = TempStrDir::default(); let test_uri = test_dir.as_str(); - // Create initial dataset with just enough rows for 2 partitions - // Using a small initial dataset to avoid long test times - let (dataset, _) = generate_test_dataset::(test_uri, 0.0..1.0).await; + // Build deterministic two-cluster data so centroids are predictable. + const ROWS_PER_CLUSTER: usize = 2_048; + let cluster_vectors = vec![ + { + let mut v = vec![0.0; DIM]; + v[0] = 1.0; + v + }, + { + let mut v = vec![0.0; DIM]; + v[1] = 1.0; + v + }, + ]; + let total_rows = ROWS_PER_CLUSTER * cluster_vectors.len(); + let mut ids = Vec::with_capacity(total_rows); + let mut vector_values = Vec::with_capacity(total_rows * DIM); + let mut current_id = 0u64; + for cluster_vector in &cluster_vectors { + for _ in 0..ROWS_PER_CLUSTER { + ids.push(current_id); + current_id += 1; + vector_values.extend_from_slice(cluster_vector); + } + } + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt64, false), + Field::new( + "vector", + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float32, true)), + DIM as i32, + ), + false, + ), + ])); + let ids_array: ArrayRef = Arc::new(UInt64Array::from(ids)); + let vectors_array: ArrayRef = Arc::new( + FixedSizeListArray::try_new_from_values(Float32Array::from(vector_values), DIM as i32) + .unwrap(), + ); + let batch = RecordBatch::try_new(schema.clone(), vec![ids_array, vectors_array]).unwrap(); + let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let dataset = Dataset::write( + batches, + test_uri, + Some(WriteParams { + mode: crate::dataset::WriteMode::Overwrite, + ..Default::default() + }), + ) + .await + .unwrap(); // Create an IVF-PQ index with 2 partitions // For IvfPq, target_partition_size = 8192 // Split triggers when partition_size > 4 * 8192 = 32,768 let params = VectorIndexParams::ivf_pq(2, 8, DIM / 8, DistanceType::L2, 50); - verify_partition_split_after_append(dataset, test_uri, params, "scalar vector data").await; + verify_partition_split_after_append( + dataset, + test_uri, + params, + "scalar vector data", + Some(cluster_vectors[0].clone()), + ) + .await; } #[tokio::test] @@ -2424,7 +2514,8 @@ mod tests { // For IvfPq, target_partition_size = 8192 // Split triggers when partition_size > 4 * 8192 = 32,768 let params = VectorIndexParams::ivf_pq(2, 8, DIM / 8, DistanceType::Cosine, 50); - verify_partition_split_after_append(dataset, test_uri, params, "multivector data").await; + verify_partition_split_after_append(dataset, test_uri, params, "multivector data", None) + .await; } #[tokio::test]