Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions rust/lance-index/src/vector/distributed/index_merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ pub async fn init_writer_for_pq(
FileWriterOptions::default(),
)?;
let mut pm_init = pm.clone();
pm_init.transposed = true;
let cb = pm_init.codebook.as_ref().ok_or_else(|| Error::Index {
message: "PQ codebook missing".to_string(),
location: snafu::location!(),
Expand Down Expand Up @@ -777,16 +778,20 @@ pub async fn merge_partial_vector_auxiliary_files(
if existing_pm.num_sub_vectors != pm.num_sub_vectors
|| existing_pm.nbits != pm.nbits
|| existing_pm.dimension != pm.dimension
|| existing_pm.transposed != pm.transposed
{
return Err(Error::Index {
message: format!(
"Distributed PQ merge: structural mismatch across shards; first(dim={}, m={}, nbits={}), current(dim={}, m={}, nbits={})",
"Distributed PQ merge: structural mismatch across shards; first(\
dim={}, m={}, nbits={}, transposed={}), current(dim={}, m={}, nbits={}, transposed={})",
existing_pm.dimension,
existing_pm.num_sub_vectors,
existing_pm.nbits,
existing_pm.transposed,
pm.dimension,
pm.num_sub_vectors,
pm.nbits
pm.nbits,
pm.transposed
),
location: location!(),
});
Expand Down
6 changes: 6 additions & 0 deletions rust/lance/src/index/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,9 @@ please provide PQBuildParams.codebook for distributed indexing"
)?
.with_ivf(ivf_model)
.with_quantizer(global_pq)
// For distributed shards, keep PQ codes in their original layout
// and transpose only after all shards are merged.
.with_transpose(false)
.with_fragment_filter(fragment_filter)
.build()
.await?;
Expand Down Expand Up @@ -615,6 +618,9 @@ please provide PQBuildParams.codebook for distributed indexing"
)?
.with_ivf(ivf_model)
.with_quantizer(global_pq)
// For distributed shards, keep PQ codes in their original layout
// and transpose only after all shards are merged.
.with_transpose(false)
.with_fragment_filter(fragment_filter)
.build()
.await?;
Expand Down
97 changes: 95 additions & 2 deletions rust/lance/src/index/vector/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,26 @@ use super::{
utils::{self, get_vector_type},
};

/// Stably sort a RecordBatch by the ROW_ID column in ascending order.
///
/// If the batch has no ROW_ID column or has fewer than 2 rows, it is
/// returned unchanged. When sorting, the relative order of rows with the
/// same ROW_ID is preserved.
fn stable_sort_batch_by_row_id(batch: &RecordBatch) -> Result<RecordBatch> {
if let Some(row_id_col) = batch.column_by_name(ROW_ID) {
let row_ids = row_id_col.as_primitive::<UInt64Type>();
if row_ids.len() > 1 {
let mut order: Vec<usize> = (0..row_ids.len()).collect();
// Vec::sort_by is stable, so equal ROW_IDs keep their
// original relative order.
order.sort_by(|&i, &j| row_ids.value(i).cmp(&row_ids.value(j)));
let indices = UInt32Array::from_iter_values(order.into_iter().map(|i| i as u32));
return Ok(batch.take(&indices)?);
}
}
Ok(batch.clone())
}

// the number of partitions to evaluate for reassigning
const REASSIGN_RANGE: usize = 64;

Expand Down Expand Up @@ -128,6 +148,8 @@ pub struct IvfIndexBuilder<S: IvfSubIndex, Q: Quantization> {
optimize_options: Option<OptimizeOptions>,
// number of indices merged
merged_num: usize,
// whether to transpose codes when building storage
transpose_codes: bool,
}

type BuildStream<S, Q> =
Expand Down Expand Up @@ -169,6 +191,7 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
fragment_filter: None,
optimize_options: None,
merged_num: 0,
transpose_codes: true,
})
}

Expand Down Expand Up @@ -235,6 +258,7 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
fragment_filter: None,
optimize_options: None,
merged_num: 0,
transpose_codes: true,
})
}

Expand Down Expand Up @@ -334,6 +358,13 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
self
}

/// Control whether codes are transposed when building storage.
/// This mainly affects intermediate PQ/RQ storage when building distributed indices.
pub fn with_transpose(&mut self, transpose: bool) -> &mut Self {
self.transpose_codes = transpose;
self
}

#[instrument(name = "load_or_build_ivf", level = "debug", skip_all)]
async fn load_or_build_ivf(&self) -> Result<IvfModel> {
match &self.ivf {
Expand Down Expand Up @@ -935,6 +966,15 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
}
_ => {}
}

// Normalize each batch for this partition to be stably sorted by ROW_ID.
for batch in part_batches.iter_mut() {
if batch.num_rows() == 0 {
continue;
}
*batch = stable_sort_batch_by_row_id(batch)?;
}

batches.extend(part_batches);
}

Expand All @@ -958,6 +998,7 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
.map(|s| s.parse::<f64>().unwrap_or(0.0))
.unwrap_or(0.0);
let batch = batch.drop_column(PART_ID_COLUMN)?;
let batch = stable_sort_batch_by_row_id(&batch)?;
batches.push(batch);
}
}
Expand All @@ -981,6 +1022,8 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
));
};

let is_pq = Q::quantization_type() == QuantizationType::Product;

// prepare the final writers
let storage_path = self.index_dir.child(INDEX_AUXILIARY_FILE_NAME);
let index_path = self.index_dir.child(INDEX_FILE_NAME);
Expand Down Expand Up @@ -1024,7 +1067,51 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
storage_ivf.add_partition(0);
} else {
let batches = storage.to_batches()?.collect::<Vec<_>>();
let batch = arrow::compute::concat_batches(&batches[0].schema(), batches.iter())?;
let mut batch =
arrow::compute::concat_batches(&batches[0].schema(), batches.iter())?;

if is_pq && batch.column_by_name(PQ_CODE_COLUMN).is_some() {
// The PQ storage keeps codes in a transposed layout (bytes grouped
// across all rows). Convert them back to per-row layout so that a
// stable ROW_ID sort moves PQ_CODE_COLUMN together with ROW_ID.
let codes_fsl = batch
.column_by_name(PQ_CODE_COLUMN)
.unwrap()
.as_fixed_size_list();
let num_rows = batch.num_rows();
let bytes_per_code = codes_fsl.value_length() as usize;
let codes = codes_fsl.values().as_primitive::<datatypes::UInt8Type>();
let original_codes = transpose(codes, bytes_per_code, num_rows);
let original_fsl = Arc::new(FixedSizeListArray::try_new_from_values(
original_codes,
bytes_per_code as i32,
)?);
batch = batch.replace_column_by_name(PQ_CODE_COLUMN, original_fsl)?;
}

// Enforce a stable ROW_ID ordering for all auxiliary batches so that the
// PQ code column moves together with ROW_ID.
batch = stable_sort_batch_by_row_id(&batch)?;

// For PQ storages, optionally convert codes back to transposed layout
// in the unified auxiliary file. This keeps final PQ storage column-major
// when `transpose_pq_codes` is enabled.
if is_pq && self.transpose_codes && batch.column_by_name(PQ_CODE_COLUMN).is_some() {
let codes_fsl = batch
.column_by_name(PQ_CODE_COLUMN)
.unwrap()
.as_fixed_size_list();
let num_rows = batch.num_rows();
let bytes_per_code = codes_fsl.value_length() as usize;
let codes = codes_fsl.values().as_primitive::<datatypes::UInt8Type>();
let transposed_codes = transpose(codes, num_rows, bytes_per_code);
let transposed_fsl = Arc::new(FixedSizeListArray::try_new_from_values(
transposed_codes,
bytes_per_code as i32,
)?);
batch = batch.replace_column_by_name(PQ_CODE_COLUMN, transposed_fsl)?;
}

storage_writer.write_batch(&batch).await?;
storage_ivf.add_partition(batch.num_rows() as u32);
}
Expand Down Expand Up @@ -1066,12 +1153,18 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
.add_global_buffer(storage_ivf_pb.encode_to_vec().into())
.await?;
storage_writer.add_schema_metadata(IVF_METADATA_KEY, ivf_buffer_pos.to_string());
let quant_type = Q::quantization_type();
let transposed = match quant_type {
QuantizationType::Product => self.transpose_codes,
QuantizationType::Rabit => true,
_ => false,
};
// For now, each partition's metadata is just the quantizer,
// it's all the same for now, so we just take the first one
let mut metadata = quantizer.metadata(Some(QuantizationMetadata {
codebook_position: Some(0),
codebook: None,
transposed: true,
transposed,
}));
if let Some(extra_metadata) = metadata.extra_metadata()? {
let idx = storage_writer.add_global_buffer(extra_metadata).await?;
Expand Down
20 changes: 13 additions & 7 deletions rust/lance/src/index/vector/ivf/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,20 +201,26 @@ pub(super) async fn write_pq_partitions(
location: location!(),
})?;
if let Some(pq_code) = pq_index.code.as_ref() {
let original_pq_codes = transpose(
pq_code,
pq_index.pq.num_sub_vectors,
pq_code.len() / pq_index.pq.code_dim(),
);
let row_ids = pq_index.row_ids.as_ref().unwrap();
let num_vectors = row_ids.len();
if num_vectors == 0 || pq_code.is_empty() {
continue;
}
if pq_code.len() % num_vectors != 0 {
continue;
}
let num_bytes_per_code = pq_code.len() / num_vectors;
let original_pq_codes = transpose(pq_code, num_bytes_per_code, num_vectors);
let fsl = Arc::new(
FixedSizeListArray::try_new_from_values(
original_pq_codes,
pq_index.pq.code_dim() as i32,
num_bytes_per_code as i32,
)
.unwrap(),
);

pq_array.push(fsl);
row_id_array.push(pq_index.row_ids.as_ref().unwrap().clone());
row_id_array.push(row_ids.clone());
}
}
}
Expand Down
Loading
Loading