diff --git a/python/Cargo.toml b/python/Cargo.toml index f0c088007d7..b3b87436b67 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -15,12 +15,12 @@ arrow = { version = "49.0.0", features = ["pyarrow"] } arrow-array = "49.0" arrow-data = "49.0" arrow-schema = "49.0" -object_store = "0.8.0" +object_store = "0.9.0" async-trait = "0.1" chrono = "0.4.31" env_logger = "0.10" futures = "0.3" -half = { version = "2.1", default-features = false, features = ["num-traits"] } +half = { version = "2.3", default-features = false, features = ["num-traits", "std"] } lance = { path = "../rust/lance", features = ["tensorflow", "dynamodb"] } lance-arrow = { path = "../rust/lance-arrow" } lance-core = { path = "../rust/lance-core" } diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 4b223c59315..88c82c2b8c0 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -866,7 +866,12 @@ def create_index( metric: str = "L2", replace: bool = False, num_partitions: Optional[int] = None, - ivf_centroids: Optional[Union[np.ndarray, pa.FixedSizeListArray]] = None, + ivf_centroids: Optional[ + Union[np.ndarray, pa.FixedSizeListArray, pa.FixedShapeTensorArray] + ] = None, + pq_codebook: Optional[ + Union[np.ndarray, pa.FixedSizeListArray, pa.FixedShapeTensorArray] + ] = None, num_sub_vectors: Optional[int] = None, accelerator: Optional[Union[str, "torch.Device"]] = None, index_cache_size: Optional[int] = None, @@ -894,9 +899,16 @@ def create_index( Replace the existing index if it exists. num_partitions : int, optional The number of partitions of IVF (Inverted File Index). - ivf_centroids : ``np.ndarray`` or ``pyarrow.FixedSizeListArray``. Optional. + ivf_centroids : ``np.ndarray``, ``pyarrow.FixedSizeListArray`` + or ``pyarrow.FixedShapeTensorArray``. Optional. A ``num_partitions x dimension`` array of K-mean centroids for IVF clustering. If not provided, a new Kmean model will be trained. + pq_codebook : ``np.ndarray``, ``pyarrow.FixedSizeListArray`` + or ``pyarrow.FixedShapeTensorArray``. Optional. + A ``num_sub_vectors x (2 ^ nbits * dimensions // num_sub_vectors)`` + array of K-mean centroids for PQ codebook. + Note: nbits is always 8 for now. + If not provided, a new PQ model will be trained. num_sub_vectors : int, optional The number of sub-vectors for PQ (Product Quantization). accelerator : str or ``torch.Device``, optional @@ -1049,6 +1061,11 @@ def create_index( ) kwargs["precomputed_partitions_file"] = partitions_file + if (ivf_centroids is None) and (pq_codebook is not None): + raise ValueError( + "ivf_centroids must be specified when pq_codebook is provided" + ) + if ivf_centroids is not None: # User provided IVF centroids if _check_for_numpy(ivf_centroids) and isinstance( @@ -1075,6 +1092,35 @@ def create_index( [ivf_centroids], ["_ivf_centroids"] ) kwargs["ivf_centroids"] = ivf_centroids_batch + + if pq_codebook is not None: + # User provided IVF centroids + if _check_for_numpy(pq_codebook) and isinstance( + pq_codebook, np.ndarray + ): + if ( + len(pq_codebook.shape) != 3 + or pq_codebook.shape[0] != num_sub_vectors + or pq_codebook.shape[1] != 256 + ): + raise ValueError( + f"PQ codebook must be 3D array: (sub_vectors, 256, dim), " + f"got {pq_codebook.shape}" + ) + if pq_codebook.dtype not in [np.float16, np.float32, np.float64]: + raise TypeError( + "PQ codebook must be floating number" + + f"got {pq_codebook.dtype}" + ) + values = pa.array(pq_codebook.reshape(-1)) + pq_codebook = pa.FixedSizeListArray.from_arrays( + values, num_sub_vectors * 256 + ) + pq_codebook_batch = pa.RecordBatch.from_arrays( + [pq_codebook], ["_pq_codebook"] + ) + kwargs["pq_codebook"] = pq_codebook_batch + if shuffle_partition_batches is not None: kwargs["shuffle_partition_batches"] = shuffle_partition_batches if shuffle_partition_concurrency is not None: diff --git a/python/python/lance/torch/kmeans.py b/python/python/lance/torch/kmeans.py index 20aa7420cfe..70d5eb55d73 100644 --- a/python/python/lance/torch/kmeans.py +++ b/python/python/lance/torch/kmeans.py @@ -249,6 +249,16 @@ def _fit_once( del dists del chunk + # this happens when there are too many NaNs or the data is just the same + # vectors repeated over and over. Performance may be bad but we don't + # want to crash. + if total_dist == 0: + logging.warning( + "Kmeans::train: total_dist is 0, this is unusual." + " This could result in bad performance during search." + ) + raise StopIteration("kmeans: converged") + if abs(total_dist - last_dist) / total_dist < self.tolerance: raise StopIteration("kmeans: converged") diff --git a/python/python/lance/util.py b/python/python/lance/util.py index faf34a7d9be..4c20b9c8b6a 100644 --- a/python/python/lance/util.py +++ b/python/python/lance/util.py @@ -176,3 +176,54 @@ def predict( """Predict the cluster for each vector in the data.""" arr = self._to_fixed_size_list(data) return self._kmeans.predict(arr) + + +def sanity_check_vector_index( + dataset, + column: str, + refine_factor: int = 5, + sample_size: Optional[int] = None, + pass_threshold: float = 1.0, +): + """Run in-sample queries and make sure that the recall + for k=1 is very high (should be near 100%) + Parameters + ---------- + dataset: LanceDataset + The dataset to sanity check. + column: str + The column name of the vector column. + refine_factor: int, default=5 + The refine factor to use for the nearest neighbor query. + sample_size: int, optional + The number of vectors to sample from the dataset. + If None, the entire dataset will be used. + pass_threshold: float, default=1.0 + The minimum fraction of vectors that must pass the sanity check. + If less than this fraction of vectors pass, a ValueError will be raised. + """ + + data = dataset.to_table() if sample_size is None else dataset.sample(sample_size) + vecs = data[column].to_numpy(zero_copy_only=False) + passes = 0 + total = len(vecs) + + for vec in vecs: + if np.isnan(vec).any(): + total -= 1 + continue + distance = dataset.to_table( + nearest={ + "column": column, + "q": vec, + "k": 1, + "nprobes": 1, + "refine_factor": refine_factor, + } + )["_distance"].to_pylist()[0] + passes += 1 if abs(distance) < 1e-6 else 0 + + if passes / total < pass_threshold: + raise ValueError( + f"Vector index failed sanity check, only {passes}/{total} passed" + ) diff --git a/python/python/tests/test_arrow.py b/python/python/tests/test_arrow.py index c7c42b3d3b7..c262bdf587a 100644 --- a/python/python/tests/test_arrow.py +++ b/python/python/tests/test_arrow.py @@ -162,6 +162,27 @@ def test_bf16_fixed_size_list_cast(): assert casted == fsl +def test_bf16_roundtrip(tmp_path: Path): + import numpy as np + from ml_dtypes import bfloat16 + + values = BFloat16Array.from_numpy(np.random.random(9).astype(bfloat16)) + vectors = pa.FixedSizeListArray.from_arrays(values, 3) + tensors = pa.ExtensionArray.from_storage( + pa.fixed_shape_tensor(values.type, [3]), vectors + ) + data = pa.table( + { + "values": values.slice(0, 3), + "vector": vectors, + "tensors": tensors, + } + ) + ds = lance.write_dataset(data, tmp_path) + assert ds.schema == data.schema + assert ds.to_table() == data + + def test_roundtrip_take_ext_types(tmp_path: Path): tensor_type = pa.fixed_shape_tensor(pa.float32(), [2, 3]) inner = pa.array([float(x) for x in range(0, 18)], pa.float32()) diff --git a/python/python/tests/test_tf.py b/python/python/tests/test_tf.py index fc132bfb019..b7030274f9a 100644 --- a/python/python/tests/test_tf.py +++ b/python/python/tests/test_tf.py @@ -438,8 +438,6 @@ def test_tfrecord_parsing(tmp_path, sample_tf_example): def test_tfrecord_roundtrip(tmp_path, sample_tf_example): - del sample_tf_example.features.feature["9_tensor_bf16"] - serialized = sample_tf_example.SerializeToString() path = tmp_path / "test.tfrecord" diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index f2af92c898a..b2fc60f89db 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -22,22 +22,28 @@ import pyarrow as pa import pyarrow.compute as pc import pytest +import torch +from lance.util import sanity_check_vector_index from lance.vector import vec_to_table -def create_table(nvec=1000, ndim=128): +def create_table(nvec=1000, ndim=128, nans=0): mat = np.random.randn(nvec, ndim) - price = np.random.rand(nvec) * 100 + if nans > 0: + nans_mat = np.empty((nans, ndim)) + nans_mat[:] = np.nan + mat = np.concatenate((mat, nans_mat), axis=0) + price = np.random.rand(nvec + nans) * 100 def gen_str(n): return "".join(random.choices(string.ascii_letters + string.digits, k=n)) - meta = np.array([gen_str(100) for _ in range(nvec)]) + meta = np.array([gen_str(100) for _ in range(nvec + nans)]) tbl = ( vec_to_table(data=mat) .append_column("price", pa.array(price)) .append_column("meta", pa.array(meta)) - .append_column("id", pa.array(range(nvec))) + .append_column("id", pa.array(range(nvec + nans))) ) return tbl @@ -128,6 +134,69 @@ def func(rs: pa.Table): print(run(dataset, q=np.array(q), assert_func=func)) +def test_index_with_nans(tmp_path): + # 1024 rows, the entire table should be sampled + tbl = create_table(nvec=1000, nans=24) + + dataset = lance.write_dataset(tbl, tmp_path) + dataset = dataset.create_index( + "vector", + index_type="IVF_PQ", + num_partitions=4, + num_sub_vectors=16, + accelerator=torch.device("cpu"), + ) + sanity_check_vector_index(dataset, "vector") + + +def test_index_with_no_centroid_movement(tmp_path): + # this test makes the centroids essentially [1..] + # this makes sure the early stop condition in the index building code + # doesn't do divide by zero + mat = np.concatenate([np.ones((256, 32))]) + + tbl = vec_to_table(data=mat) + + dataset = lance.write_dataset(tbl, tmp_path) + dataset = dataset.create_index( + "vector", + index_type="IVF_PQ", + num_partitions=1, + num_sub_vectors=4, + accelerator=torch.device("cpu"), + ) + sanity_check_vector_index(dataset, "vector") + + +def test_index_with_pq_codebook(tmp_path): + tbl = create_table(nvec=1024, ndim=128) + dataset = lance.write_dataset(tbl, tmp_path) + pq_codebook = np.random.randn(4, 256, 128 // 4).astype(np.float32) + + dataset = dataset.create_index( + "vector", + index_type="IVF_PQ", + num_partitions=1, + num_sub_vectors=4, + ivf_centroids=np.random.randn(1, 128).astype(np.float32), + pq_codebook=pq_codebook, + ) + sanity_check_vector_index(dataset, "vector", refine_factor=10, pass_threshold=0.99) + + pq_codebook = pa.FixedShapeTensorArray.from_numpy_ndarray(pq_codebook) + + dataset = dataset.create_index( + "vector", + index_type="IVF_PQ", + num_partitions=1, + num_sub_vectors=4, + ivf_centroids=np.random.randn(1, 128).astype(np.float32), + pq_codebook=pq_codebook, + replace=True, + ) + sanity_check_vector_index(dataset, "vector", refine_factor=10, pass_threshold=0.99) + + @pytest.mark.cuda def test_create_index_using_cuda(tmp_path): tbl = create_table() @@ -297,8 +366,7 @@ def test_pre_populated_ivf_centroids(dataset, tmp_path: Path): if platform.system() == "Windows": expected_filepath = expected_filepath.replace("\\", "/") expected_statistics = { - "index_cache_entry_count": 1, - "index_cache_hit_rate": 0, + "index_cache_entry_count": 2, "index_type": "IVF", "uuid": index_uuid, "uri": expected_filepath, @@ -316,15 +384,20 @@ def test_pre_populated_ivf_centroids(dataset, tmp_path: Path): } with pytest.raises(KeyError, match='Index "non-existent_idx" not found'): + # increase 1 miss of index_cache.metadata_cache assert dataset_with_index.stats.index_stats("non-existent_idx") with pytest.raises(KeyError, match='Index "" not found'): + # increase 1 miss of index_cache.metadata_cache assert dataset_with_index.stats.index_stats("") with pytest.raises(TypeError): dataset_with_index.stats.index_stats() + # increase 1 hit of index_cache.metadata_cache actual_statistics = dataset_with_index.stats.index_stats("vector_idx") partitions = actual_statistics.pop("partitions") + hit_rate = actual_statistics.pop("index_cache_hit_rate") assert actual_statistics == expected_statistics + assert np.isclose(hit_rate, 7 / 11) assert len(partitions) == 5 partition_keys = {"index", "length", "offset", "centroid"} @@ -494,24 +567,24 @@ def query_index(ds, ntimes): ) assert ( - indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 1 + indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 2 ) query_index(indexed_dataset, 1) assert ( - indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 2 + indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 3 ) - assert ( - indexed_dataset.stats.index_stats("vector_idx")["index_cache_hit_rate"] == 0.5 + assert np.isclose( + indexed_dataset.stats.index_stats("vector_idx")["index_cache_hit_rate"], 18 / 25 ) query_index(indexed_dataset, 128) assert ( - indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 10 + indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 11 ) indexed_dataset = lance.LanceDataset(indexed_dataset.uri, index_cache_size=5) query_index(indexed_dataset, 128) assert ( - indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 5 + indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 6 ) diff --git a/python/src/arrow.rs b/python/src/arrow.rs index 3a0332d95cd..6ed5f920508 100644 --- a/python/src/arrow.rs +++ b/python/src/arrow.rs @@ -20,6 +20,7 @@ use arrow_array::{RecordBatch, RecordBatchIterator, RecordBatchReader}; use arrow_schema::{DataType, Field, Schema}; use half::bf16; use lance::arrow::bfloat16::BFloat16Array; +use lance_arrow::bfloat16::{ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME}; use pyo3::{ exceptions::PyValueError, ffi::Py_uintptr_t, @@ -72,8 +73,8 @@ impl BFloat16 { } const EXPORT_METADATA: [(&str, &str); 2] = [ - ("ARROW:extension:name", "lance.bfloat16"), - ("ARROW:extension:metadata", ""), + (ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME), + (ARROW_EXT_META_KEY, ""), ]; #[pyfunction] diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 672a52f9376..5d6f2445b8f 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -31,17 +31,15 @@ use lance::dataset::{ scanner::Scanner as LanceScanner, transaction::Operation as LanceOperation, Dataset as LanceDataset, ReadParams, Version, WriteMode, WriteParams, }; -use lance::index::IndexParams; use lance::index::{ scalar::ScalarIndexParams, vector::{diskann::DiskANNParams, VectorIndexParams}, - DatasetIndexExt, }; use lance_arrow::as_fixed_size_list_array; use lance_core::{datatypes::Schema, format::Fragment, io::object_store::ObjectStoreParams}; use lance_index::{ vector::{ivf::IvfBuildParams, pq::PQBuildParams}, - IndexType, + DatasetIndexExt, IndexParams, IndexType, }; use lance_linalg::distance::MetricType; use pyo3::exceptions::PyStopIteration; @@ -702,6 +700,17 @@ impl Dataset { pq_params.use_opq = PyAny::downcast::(o)?.extract()? }; + if let Some(c) = kwargs.get_item("pq_codebook")? { + let batch = RecordBatch::from_pyarrow(c)?; + if "_pq_codebook" != batch.schema().field(0).name() { + return Err(PyValueError::new_err( + "Expected '_pq_codebook' as the first column name.", + )); + } + let codebook = as_fixed_size_list_array(batch.column(0)); + pq_params.codebook = Some(codebook.values().clone()) + }; + if let Some(o) = kwargs.get_item("max_opq_iterations")? { pq_params.max_opq_iters = PyAny::downcast::(o)?.extract()? }; @@ -808,7 +817,9 @@ impl Dataset { } fn count_unindexed_rows(&self, index_name: String) -> PyResult> { - let idx = RT.block_on(None, self.ds.load_index_by_name(index_name.as_str()))?; + let idx = RT + .block_on(None, self.ds.load_index_by_name(index_name.as_str()))? + .map_err(|err| PyIOError::new_err(err.to_string()))?; if let Some(index) = idx { RT.block_on( None, @@ -825,7 +836,9 @@ impl Dataset { } fn count_indexed_rows(&self, index_name: String) -> PyResult> { - let idx = RT.block_on(None, self.ds.load_index_by_name(index_name.as_str()))?; + let idx = RT + .block_on(None, self.ds.load_index_by_name(index_name.as_str()))? + .map_err(|err| PyIOError::new_err(err.to_string()))?; if let Some(index) = idx { RT.block_on( None, diff --git a/rust/Cargo.toml b/rust/Cargo.toml index ed1cc4f4b50..e25386027fc 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -33,7 +33,7 @@ categories = [ "development-tools", "science", ] -rust-version = "1.70" +rust-version = "1.75" [workspace.dependencies] lance = { version = "=0.9.6", path = "./lance" } @@ -87,7 +87,7 @@ mock_instant = { version = "0.3.1", features = ["sync"] } moka = "0.11" num_cpus = "1.0" num-traits = "0.2" -object_store = { version = "0.8.0", features = ["aws", "gcp", "azure"] } +object_store = { version = "0.9.0", features = ["aws", "gcp", "azure"] } parquet = "49.0" pin-project = "1.0" pprof = { version = "0.13", features = ["flamegraph", "criterion"] } @@ -111,6 +111,7 @@ tokio = { version = "1.23", features = [ tracing = "0.1" url = "2.3" uuid = { version = "1.2", features = ["v4", "serde"] } +pretty_assertions = "1.4.0" [profile.bench] opt-level = 3 diff --git a/rust/lance-arrow/src/bfloat16.rs b/rust/lance-arrow/src/bfloat16.rs index bb1bc61929c..57c672be150 100644 --- a/rust/lance-arrow/src/bfloat16.rs +++ b/rust/lance-arrow/src/bfloat16.rs @@ -23,11 +23,25 @@ use arrow_array::{ }; use arrow_buffer::MutableBuffer; use arrow_data::ArrayData; -use arrow_schema::{ArrowError, DataType}; +use arrow_schema::{ArrowError, DataType, Field as ArrowField}; use half::bf16; use crate::FloatArray; +pub const ARROW_EXT_NAME_KEY: &str = "ARROW:extension:name"; +pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata"; +pub const BFLOAT16_EXT_NAME: &str = "lance.bfloat16"; + +/// Check whether the given field is a bfloat16 field. +pub fn is_bfloat16_field(field: &ArrowField) -> bool { + field.data_type() == &DataType::FixedSizeBinary(2) + && field + .metadata() + .get(ARROW_EXT_NAME_KEY) + .map(|name| name == BFLOAT16_EXT_NAME) + .unwrap_or_default() +} + #[derive(Debug)] pub struct BFloat16Type {} @@ -81,6 +95,10 @@ impl BFloat16Array { let binary_value = self.inner.value_unchecked(i); bf16::from_bits(u16::from_le_bytes([binary_value[0], binary_value[1]])) } + + pub fn into_inner(self) -> FixedSizeBinaryArray { + self.inner + } } impl<'a> ArrayAccessor for &'a BFloat16Array { diff --git a/rust/lance-arrow/src/floats.rs b/rust/lance-arrow/src/floats.rs index a8adf20837e..a9d9348fd99 100644 --- a/rust/lance-arrow/src/floats.rs +++ b/rust/lance-arrow/src/floats.rs @@ -25,11 +25,12 @@ use arrow_array::{ types::{Float16Type, Float32Type, Float64Type}, Array, Float16Array, Float32Array, Float64Array, }; -use arrow_schema::DataType; +use arrow_schema::{DataType, Field}; use half::{bf16, f16}; use num_traits::{AsPrimitive, Bounded, Float, FromPrimitive}; use super::bfloat16::{BFloat16Array, BFloat16Type}; +use crate::bfloat16::is_bfloat16_field; use crate::Result; /// Float data type. @@ -55,6 +56,8 @@ impl std::fmt::Display for FloatType { } } +/// Try to convert a [DataType] to a [FloatType]. To support bfloat16, always +/// prefer using the `TryFrom<&Field>` implementation. impl TryFrom<&DataType> for FloatType { type Error = crate::ArrowError; @@ -71,6 +74,17 @@ impl TryFrom<&DataType> for FloatType { } } +impl TryFrom<&Field> for FloatType { + type Error = crate::ArrowError; + + fn try_from(field: &Field) -> Result { + match field.data_type() { + DataType::FixedSizeBinary(2) if is_bfloat16_field(field) => Ok(Self::BFloat16), + _ => Self::try_from(field.data_type()), + } + } +} + /// Trait for float types used in Arrow Array. /// pub trait ArrowFloatType: Debug { diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index 5b2b375facd..e747bdbb797 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -19,6 +19,9 @@ use std::sync::Arc; use arrow_array::ArrayRef; use arrow_schema::{DataType, Field as ArrowField, TimeUnit}; +use lance_arrow::bfloat16::{ + is_bfloat16_field, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME, +}; use snafu::{location, Location}; mod field; @@ -137,11 +140,19 @@ impl TryFrom<&DataType> for LogicalType { DataType::Struct(_) => "large_list.struct".to_string(), _ => "large_list".to_string(), }, - DataType::FixedSizeList(dt, len) => format!( - "fixed_size_list:{}:{}", - Self::try_from(dt.data_type())?.0, - *len - ), + DataType::FixedSizeList(field, len) => { + if is_bfloat16_field(field) { + // Don't want to directly use `blfoat16`, in case a built-in type is added + // that isn't identical to our extension type. + format!("fixed_size_list:lance.bfloat16:{}", *len) + } else { + format!( + "fixed_size_list:{}:{}", + Self::try_from(field.data_type())?.0, + *len + ) + } + } DataType::FixedSizeBinary(len) => format!("fixed_size_binary:{}", *len), _ => { return Err(Error::Schema { @@ -196,20 +207,37 @@ impl TryFrom<&LogicalType> for DataType { match splits[0] { "fixed_size_list" => { if splits.len() != 3 { - Err(Error::Schema { + return Err(Error::Schema { message: format!("Unsupported logical type: {}", lt), location: location!(), - }) - } else { - let elem_type = (&LogicalType(splits[1].to_string())).try_into()?; - let size: i32 = splits[2].parse::().map_err(|e: _| Error::Schema { - message: e.to_string(), - location: location!(), - })?; - Ok(FixedSizeList( - Arc::new(ArrowField::new("item", elem_type, true)), - size, - )) + }); + } + + let size: i32 = splits[2].parse::().map_err(|e: _| Error::Schema { + message: e.to_string(), + location: location!(), + })?; + + match splits[1] { + BFLOAT16_EXT_NAME => { + let field = ArrowField::new("item", Self::FixedSizeBinary(2), true) + .with_metadata( + [ + (ARROW_EXT_NAME_KEY.into(), BFLOAT16_EXT_NAME.into()), + (ARROW_EXT_META_KEY.into(), "".into()), + ] + .into(), + ); + Ok(FixedSizeList(Arc::new(field), size)) + } + data_type => { + let elem_type = (&LogicalType(data_type.to_string())).try_into()?; + + Ok(FixedSizeList( + Arc::new(ArrowField::new("item", elem_type, true)), + size, + )) + } } } "fixed_size_binary" => { diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index fee24307c29..abd455e0ffb 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -30,7 +30,7 @@ use arrow_array::{ }; use arrow_schema::{DataType, Field as ArrowField}; use async_recursion::async_recursion; -use lance_arrow::*; +use lance_arrow::{bfloat16::ARROW_EXT_NAME_KEY, *}; use snafu::{location, Location}; use super::{Dictionary, LogicalType}; @@ -228,9 +228,7 @@ impl Field { } pub fn extension_name(&self) -> Option<&str> { - self.metadata - .get("ARROW:extension:name") - .map(String::as_str) + self.metadata.get(ARROW_EXT_NAME_KEY).map(String::as_str) } pub fn child(&self, name: &str) -> Option<&Self> { @@ -781,10 +779,7 @@ impl From<&pb::Field> for Field { }) .collect(); if !field.extension_name.is_empty() { - lance_metadata.insert( - "ARROW:extension:name".to_string(), - field.extension_name.clone(), - ); + lance_metadata.insert(ARROW_EXT_NAME_KEY.to_string(), field.extension_name.clone()); } Self { name: field.name.clone(), diff --git a/rust/lance-core/src/encodings/plain.rs b/rust/lance-core/src/encodings/plain.rs index 5db2e5714e9..27e492a8850 100644 --- a/rust/lance-core/src/encodings/plain.rs +++ b/rust/lance-core/src/encodings/plain.rs @@ -312,9 +312,12 @@ impl<'a> PlainDecoder<'a> { let item_array = item_decoder .get(start * list_size as usize..end * list_size as usize) .await?; - Ok(Arc::new(FixedSizeListArray::try_new_from_values( - item_array, list_size, - )?) as ArrayRef) + Ok(Arc::new(FixedSizeListArray::new( + Arc::new(items.clone()), + list_size, + item_array, + None, + )) as ArrayRef) } async fn decode_fixed_size_binary( diff --git a/rust/lance-core/src/format/index.rs b/rust/lance-core/src/format/index.rs index 3ee81e2ff14..42a25335a92 100644 --- a/rust/lance-core/src/format/index.rs +++ b/rust/lance-core/src/format/index.rs @@ -1,19 +1,16 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at +// Copyright 2024 Lance Developers. // -// http://www.apache.org/licenses/LICENSE-2.0 +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. //! Metadata for index diff --git a/rust/lance-index/Cargo.toml b/rust/lance-index/Cargo.toml index d7bbe18c732..c3003b81d38 100644 --- a/rust/lance-index/Cargo.toml +++ b/rust/lance-index/Cargo.toml @@ -41,9 +41,10 @@ roaring.workspace = true serde_json.workspace = true serde.workspace = true snafu.workspace = true +tempfile.workspace = true tokio.workspace = true tracing.workspace = true -tempfile.workspace = true +uuid.workspace = true [dev-dependencies] approx.workspace = true diff --git a/rust/lance-index/src/lib.rs b/rust/lance-index/src/lib.rs index 8827c3fba94..450aeaa1916 100644 --- a/rust/lance-index/src/lib.rs +++ b/rust/lance-index/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2023 Lance Developers. +// Copyright 2024 Lance Developers. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,8 +22,11 @@ use async_trait::async_trait; use lance_core::Result; use roaring::RoaringBitmap; +pub mod optimize; pub mod scalar; +pub mod traits; pub mod vector; +pub use crate::traits::*; pub const INDEX_FILE_NAME: &str = "index.idx"; @@ -33,16 +36,21 @@ pub mod pb { } /// Generic methods common across all types of secondary indices +/// #[async_trait] pub trait Index: Send + Sync { /// Cast to [Any]. fn as_any(&self) -> &dyn Any; + /// Cast to [Index] fn as_index(self: Arc) -> Arc; + /// Retrieve index statistics as a JSON string fn statistics(&self) -> Result; + /// Get the type of the index fn index_type(&self) -> IndexType; + /// Read through the index and determine which fragment ids are covered by the index /// /// This is a kind of slow operation. It's better to use the fragment_bitmap. This @@ -51,6 +59,7 @@ pub trait Index: Send + Sync { } /// Index Type +#[derive(PartialEq)] pub enum IndexType { // Preserve 0-100 for simple indices. Scalar = 0, @@ -67,3 +76,7 @@ impl std::fmt::Display for IndexType { } } } + +pub trait IndexParams: Send + Sync { + fn as_any(&self) -> &dyn Any; +} diff --git a/rust/lance-index/src/optimize.rs b/rust/lance-index/src/optimize.rs new file mode 100644 index 00000000000..3522c697c6c --- /dev/null +++ b/rust/lance-index/src/optimize.rs @@ -0,0 +1,32 @@ +// Copyright 2024 Lance Developers. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Options for optimizing all indices. +pub struct OptimizeOptions { + /// Number of delta indices to merge for one column. Default: 1. + /// + /// If `num_indices_to_merge` is 0, a new delta index will be created. + /// If `num_indices_to_merge` is 1, the delta updates will be merged into the latest index. + /// If `num_indices_to_merge` is more than 1, the delta updates and latest N indices + /// will be merged into one single index. + pub num_indices_to_merge: usize, +} + +impl Default for OptimizeOptions { + fn default() -> Self { + Self { + num_indices_to_merge: 1, + } + } +} diff --git a/rust/lance-index/src/traits.rs b/rust/lance-index/src/traits.rs new file mode 100644 index 00000000000..21807a07738 --- /dev/null +++ b/rust/lance-index/src/traits.rs @@ -0,0 +1,91 @@ +// Copyright 2024 Lance Developers. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use lance_core::{format::Index, Result}; + +use crate::{optimize::OptimizeOptions, IndexParams, IndexType}; + +// Extends Lance Dataset with secondary index. +/// +#[async_trait] +pub trait DatasetIndexExt { + /// Create indices on columns. + /// + /// Upon finish, a new dataset version is generated. + /// + /// Parameters: + /// + /// - `columns`: the columns to build the indices on. + /// - `index_type`: specify [`IndexType`]. + /// - `name`: optional index name. Must be unique in the dataset. + /// if not provided, it will auto-generate one. + /// - `params`: index parameters. + /// - `replace`: replace the existing index if it exists. + async fn create_index( + &mut self, + columns: &[&str], + index_type: IndexType, + name: Option, + params: &dyn IndexParams, + replace: bool, + ) -> Result<()>; + + /// Read all indices of this Dataset version. + /// + /// The indices are lazy loaded and cached in memory within the [`Dataset`] instance. + /// The cache is invalidated when the dataset version (Manifest) is changed. + async fn load_indices(&self) -> Result>>; + + /// Loads all the indies of a given UUID. + /// + /// Note that it is possible to have multiple indices with the same UUID, + /// as they are the deltas of the same index. + async fn load_index(&self, uuid: &str) -> Result> { + self.load_indices().await.map(|indices| { + indices + .iter() + .find(|idx| idx.uuid.to_string() == uuid) + .cloned() + }) + } + + /// Loads a specific index with the given index name + async fn load_index_by_name(&self, name: &str) -> Result> { + self.load_indices() + .await + .map(|indices| indices.iter().find(|idx| idx.name == name).cloned()) + } + + /// Loads a specific index with the given index name. + async fn load_scalar_index_for_column(&self, col: &str) -> Result>; + + /// Optimize indices. + async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()>; + + /// Find index with a given index_name and return its serialized statistics. + async fn index_statistics(&self, index_name: &str) -> Result>; + + /// Count the rows that are not indexed by the given index. + /// + /// TODO: move to [DatasetInternalExt] + async fn count_unindexed_rows(&self, index_name: &str) -> Result>; + + /// Count the rows that are indexed by the given index. + /// + /// TODO: move to [DatasetInternalExt] + async fn count_indexed_rows(&self, index_name: &str) -> Result>; +} diff --git a/rust/lance-index/src/vector/ivf/shuffler.rs b/rust/lance-index/src/vector/ivf/shuffler.rs index 6f40e19b97a..2e92b675045 100644 --- a/rust/lance-index/src/vector/ivf/shuffler.rs +++ b/rust/lance-index/src/vector/ivf/shuffler.rs @@ -36,7 +36,6 @@ use lance_arrow::FixedSizeListArrayExt; use lance_core::datatypes::Schema; use lance_core::io::{FileReader, FileWriter, ReadBatchParams, RecordBatchStream}; -use crate::vector::{PART_ID_COLUMN, PQ_CODE_COLUMN}; use lance_core::io::object_store::ObjectStore; use lance_core::{Error, Result, ROW_ID, ROW_ID_FIELD}; use log::info; @@ -44,6 +43,9 @@ use object_store::path::Path; use snafu::{location, Location}; use tempfile::TempDir; +use super::Ivf; +use crate::vector::{PART_ID_COLUMN, PQ_CODE_COLUMN}; + const UNSORTED_BUFFER: &str = "unsorted.lance"; fn get_temp_dir() -> Result { @@ -76,7 +78,7 @@ fn get_temp_dir() -> Result { pub async fn shuffle_dataset( data: impl RecordBatchStream + Unpin + 'static, column: &str, - ivf: Arc, + ivf: Arc, precomputed_partitions: Option>, num_partitions: u32, num_sub_vectors: usize, diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index fc5d3212767..3db4065eb43 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -102,6 +102,7 @@ prost-build.workspace = true [dev-dependencies] lance-test-macros = { workspace = true } +pretty_assertions = { workspace = true } clap = { version = "4.1.1", features = ["derive"] } criterion = { workspace = true } diff --git a/rust/lance/benches/ivf_pq.rs b/rust/lance/benches/ivf_pq.rs index 8cb02694f3d..600a6ea2964 100644 --- a/rust/lance/benches/ivf_pq.rs +++ b/rust/lance/benches/ivf_pq.rs @@ -17,14 +17,14 @@ use std::sync::Arc; use arrow_array::{FixedSizeListArray, RecordBatch, RecordBatchIterator}; use arrow_schema::{DataType, Field, FieldRef, Schema}; use criterion::{criterion_group, criterion_main, Criterion}; + use lance::{ arrow::*, dataset::{WriteMode, WriteParams}, - index::{vector::VectorIndexParams, DatasetIndexExt}, + index::vector::VectorIndexParams, Dataset, }; - -use lance_index::IndexType; +use lance_index::{DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; use lance_testing::datagen::generate_random_array; #[cfg(target_os = "linux")] diff --git a/rust/lance/benches/vector_index.rs b/rust/lance/benches/vector_index.rs index 07af8407df2..12e4e21855b 100644 --- a/rust/lance/benches/vector_index.rs +++ b/rust/lance/benches/vector_index.rs @@ -20,18 +20,17 @@ use arrow_array::{ use arrow_schema::{DataType, Field, FieldRef, Schema as ArrowSchema}; use criterion::{criterion_group, criterion_main, Criterion}; use futures::TryStreamExt; -use lance::dataset::builder::DatasetBuilder; -use lance_index::IndexType; #[cfg(target_os = "linux")] use pprof::criterion::{Output, PProfProfiler}; use rand::{self, Rng}; -use lance::dataset::{WriteMode, WriteParams}; +use lance::dataset::{builder::DatasetBuilder, Dataset, WriteMode, WriteParams}; use lance::index::vector::VectorIndexParams; -use lance::index::DatasetIndexExt; -use lance::{arrow::as_fixed_size_list_array, dataset::Dataset}; -use lance_arrow::FixedSizeListArrayExt; -use lance_index::vector::{ivf::IvfBuildParams, pq::PQBuildParams}; +use lance_arrow::{as_fixed_size_list_array, FixedSizeListArrayExt}; +use lance_index::{ + vector::{ivf::IvfBuildParams, pq::PQBuildParams}, + DatasetIndexExt, IndexType, +}; use lance_linalg::distance::MetricType; fn bench_ivf_pq_index(c: &mut Criterion) { diff --git a/rust/lance/src/bin/lq.rs b/rust/lance/src/bin/lq.rs index c66c867da0a..ecf64de9682 100644 --- a/rust/lance/src/bin/lq.rs +++ b/rust/lance/src/bin/lq.rs @@ -20,8 +20,9 @@ use futures::TryStreamExt; use snafu::{location, Location}; use lance::dataset::Dataset; -use lance::index::{vector::VectorIndexParams, DatasetIndexExt}; +use lance::index::vector::VectorIndexParams; use lance::{Error, Result}; +use lance_index::DatasetIndexExt; use lance_linalg::distance::MetricType; #[derive(Parser)] diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 9111d911aed..bb2bc45de68 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -35,7 +35,7 @@ use lance_core::io::{ commit::CommitError, object_store::{ObjectStore, ObjectStoreParams}, read_metadata_offset, read_struct, - reader::{read_manifest, read_manifest_indexes}, + reader::read_manifest, write_manifest, ObjectWriter, WriteExt, }; use log::warn; @@ -69,18 +69,16 @@ use self::fragment::FileFragment; use self::scanner::{DatasetRecordBatchStream, Scanner}; use self::transaction::{Operation, Transaction}; use self::write::{reader_to_stream, write_fragments_internal}; -use crate::dataset::index::unindexed_fragments; use crate::datatypes::Schema; use crate::error::box_error; use crate::format::{Fragment, Index, Manifest}; -use crate::index::DatasetIndexInternalExt; use crate::io::commit::{commit_new_dataset, commit_transaction}; use crate::session::Session; - use crate::utils::temporal::{timestamp_to_nanos, utc_now, SystemTime}; use crate::{Error, Result}; use hash_joiner::HashJoiner; pub use lance_core::ROW_ID; + pub use write::update::{UpdateBuilder, UpdateJob}; pub use write::{write_fragments, WriteMode, WriteParams}; @@ -1261,7 +1259,7 @@ impl Dataset { &self.object_store } - async fn manifest_file(&self, version: u64) -> Result { + pub(crate) async fn manifest_file(&self, version: u64) -> Result { self.object_store .commit_handler .resolve_version(&self.base, version, &self.object_store.inner) @@ -1356,94 +1354,6 @@ impl Dataset { &self.manifest.fragments } - /// Read all indices of this Dataset version. - pub async fn load_indices(&self) -> Result> { - let manifest_file = self.manifest_file(self.version().version).await?; - read_manifest_indexes(&self.object_store, &manifest_file, &self.manifest).await - } - - /// Loads a specific index with the given id - pub async fn load_index(&self, uuid: &str) -> Option { - self.load_indices() - .await - .unwrap() - .into_iter() - .find(|idx| idx.uuid.to_string() == uuid) - } - - pub async fn load_index_by_name(&self, name: &str) -> Option { - self.load_indices() - .await - .unwrap() - .into_iter() - .find(|idx| idx.name == name) - } - - pub(crate) async fn load_scalar_index_for_column(&self, col: &str) -> Result> { - Ok(self - .load_indices() - .await? - .into_iter() - .filter(|idx| idx.fields.len() == 1) - .find(|idx| { - let field = self.schema().field_by_id(idx.fields[0]); - if let Some(field) = field { - field.name == col - } else { - false - } - })) - } - - /// Find index with a given index_name and return its serialized statistics. - pub async fn index_statistics(&self, index_name: &str) -> Result> { - let index_uuid = self - .load_index_by_name(index_name) - .await - .map(|idx| idx.uuid.to_string()); - - if let Some(index_uuid) = index_uuid { - let index_statistics = self - .open_generic_index("vector", &index_uuid) - .await? - .statistics() - .unwrap(); - Ok(Some(index_statistics)) - } else { - Ok(None) - } - } - - pub async fn count_unindexed_rows(&self, index_uuid: &str) -> Result> { - let index = self.load_index(index_uuid).await; - - if let Some(index) = index { - let unindexed_frags = unindexed_fragments(&index, self).await?; - let unindexed_rows = unindexed_frags - .iter() - .map(Fragment::num_rows) - // sum the number of rows in each fragment if no fragment returned None from row_count - .try_fold(0, |a, b| b.map(|b| a + b).ok_or(())); - - Ok(unindexed_rows.ok()) - } else { - Ok(None) - } - } - - pub async fn count_indexed_rows(&self, index_uuid: &str) -> Result> { - let count_rows = self.count_rows(); - let count_unindexed_rows = self.count_unindexed_rows(index_uuid); - - let (count_rows, count_unindexed_rows) = - futures::try_join!(count_rows, count_unindexed_rows)?; - - match count_unindexed_rows { - Some(count_unindexed_rows) => Ok(Some(count_rows - count_unindexed_rows)), - None => Ok(None), - } - } - /// Gets the number of files that are so small they don't even have a full /// group. These are considered too small because reading many of them is /// much less efficient than reading a single file because the separate files @@ -1590,9 +1500,10 @@ mod tests { use crate::dataset::WriteMode::Overwrite; use crate::datatypes::Schema; use crate::index::scalar::ScalarIndexParams; - use crate::index::{vector::VectorIndexParams, DatasetIndexExt}; + use crate::index::vector::VectorIndexParams; use crate::io::deletion::read_deletion_file; + use arrow_array::FixedSizeListArray; use arrow_array::{ builder::StringDictionaryBuilder, cast::{as_string_array, as_struct_array}, @@ -1605,12 +1516,13 @@ mod tests { use arrow_schema::{DataType, Field, Fields as ArrowFields, Schema as ArrowSchema}; use arrow_select::take::take; use futures::stream::TryStreamExt; + use lance_arrow::bfloat16::{self, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME}; use lance_core::format::WriterVersion; use lance_datagen::{array, gen, BatchCount, RowCount}; - use lance_index::vector::DIST_COL; - use lance_index::IndexType; + use lance_index::{vector::DIST_COL, DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; use lance_testing::datagen::generate_random_array; + use pretty_assertions::assert_eq; use tempfile::{tempdir, TempDir}; // Used to validate that futures returned are Send. @@ -2610,7 +2522,11 @@ mod tests { .await .unwrap(); - let index = dataset.load_index_by_name(&index_name).await.unwrap(); + let index = dataset + .load_index_by_name(&index_name) + .await + .unwrap() + .unwrap(); assert_eq!(index.dataset_version, 1); assert_eq!(index.fields, vec![0]); @@ -3430,87 +3346,6 @@ mod tests { } } - #[tokio::test] - async fn test_count_index_rows() { - let test_dir = tempdir().unwrap(); - let dimensions = 16; - let column_name = "vec"; - let field = Field::new( - column_name, - DataType::FixedSizeList( - Arc::new(Field::new("item", DataType::Float32, true)), - dimensions, - ), - false, - ); - let schema = Arc::new(ArrowSchema::new(vec![field])); - - let float_arr = generate_random_array(512 * dimensions as usize); - - let vectors = - arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(); - - let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap(); - - let reader = - RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone()); - - let test_uri = test_dir.path().to_str().unwrap(); - let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); - dataset.validate().await.unwrap(); - - // Make sure it returns None if there's no index with the passed identifier - assert_eq!(dataset.count_unindexed_rows("bad_id").await.unwrap(), None); - assert_eq!(dataset.count_indexed_rows("bad_id").await.unwrap(), None); - - // Create an index - let params = VectorIndexParams::ivf_pq(10, 8, 2, false, MetricType::L2, 10); - dataset - .create_index( - &[column_name], - IndexType::Vector, - Some("vec_idx".into()), - ¶ms, - true, - ) - .await - .unwrap(); - - let index = dataset.load_index_by_name("vec_idx").await.unwrap(); - let index_uuid = &index.uuid.to_string(); - - // Make sure there are no unindexed rows - assert_eq!( - dataset.count_unindexed_rows(index_uuid).await.unwrap(), - Some(0) - ); - assert_eq!( - dataset.count_indexed_rows(index_uuid).await.unwrap(), - Some(512) - ); - - // Now we'll append some rows which shouldn't be indexed and see the - // count change - let float_arr = generate_random_array(512 * dimensions as usize); - let vectors = - arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(); - - let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap(); - - let reader = RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema); - dataset.append(reader, None).await.unwrap(); - - // Make sure the new rows are not indexed - assert_eq!( - dataset.count_unindexed_rows(index_uuid).await.unwrap(), - Some(512) - ); - assert_eq!( - dataset.count_indexed_rows(index_uuid).await.unwrap(), - Some(512) - ); - } - #[tokio::test] async fn test_num_small_files() { let test_dir = tempdir().unwrap(); @@ -3889,10 +3724,10 @@ mod tests { // Any transaction, no matter how simple, should trigger the fragment bitmap to be recalculated dataset.append(data, None).await.unwrap(); - for idx in dataset.load_indices().await.unwrap() { + for idx in dataset.load_indices().await.unwrap().iter() { // The corrupt fragment_bitmap does not contain 0 but the // restored one should - assert!(idx.fragment_bitmap.unwrap().contains(0)); + assert!(idx.fragment_bitmap.as_ref().unwrap().contains(0)); } let mut dataset = dataset.checkout_version(broken_version).await.unwrap(); @@ -3904,8 +3739,8 @@ mod tests { .await .unwrap(); - for idx in dataset.load_indices().await.unwrap() { - assert!(idx.fragment_bitmap.unwrap().contains(0)); + for idx in dataset.load_indices().await.unwrap().iter() { + assert!(idx.fragment_bitmap.as_ref().unwrap().contains(0)); } let mut scan = dataset.scan(); @@ -3925,4 +3760,44 @@ mod tests { let row_count = batches.iter().map(|batch| batch.num_rows()).sum::(); assert_eq!(row_count, 1900); } + + #[tokio::test] + async fn test_bfloat16_roundtrip() -> Result<()> { + let inner_field = Arc::new( + Field::new("item", DataType::FixedSizeBinary(2), true).with_metadata( + [ + (ARROW_EXT_NAME_KEY.into(), BFLOAT16_EXT_NAME.into()), + (ARROW_EXT_META_KEY.into(), "".into()), + ] + .into(), + ), + ); + let schema = Arc::new(ArrowSchema::new(vec![Field::new( + "fsl", + DataType::FixedSizeList(inner_field.clone(), 2), + false, + )])); + + let values = bfloat16::BFloat16Array::from_iter_values( + (0..6).map(|i| i as f32).map(half::bf16::from_f32), + ); + let vectors = FixedSizeListArray::new(inner_field, 2, Arc::new(values.into_inner()), None); + + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap(); + + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let dataset = Dataset::write( + RecordBatchIterator::new(vec![Ok(batch.clone())], schema.clone()), + test_uri, + None, + ) + .await?; + + let data = dataset.scan().try_into_batch().await?; + assert_eq!(batch, data); + + Ok(()) + } } diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 7a6f2db9e2e..98778cd4219 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -461,7 +461,7 @@ mod tests { utils::testing::{MockClock, ProxyObjectStore, ProxyObjectStorePolicy}, Error, Result, }; - use lance_index::IndexType; + use lance_index::{DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; use lance_testing::datagen::{some_batch, BatchGenerator, IncrementingInt32}; use snafu::{location, Location}; @@ -469,10 +469,7 @@ mod tests { use crate::{ dataset::{builder::DatasetBuilder, ReadParams, WriteMode, WriteParams}, - index::{ - vector::{StageParams, VectorIndexParams}, - DatasetIndexExt, - }, + index::vector::{StageParams, VectorIndexParams}, io::{ object_store::{ObjectStoreParams, WrappingObjectStore}, ObjectStore, diff --git a/rust/lance/src/dataset/index.rs b/rust/lance/src/dataset/index.rs index accd012bff4..2ea2576f981 100644 --- a/rust/lance/src/dataset/index.rs +++ b/rust/lance/src/dataset/index.rs @@ -20,6 +20,7 @@ use lance_core::{ format::{Fragment, Index}, Error, Result, }; +use lance_index::DatasetIndexExt; use serde::{Deserialize, Serialize}; use snafu::{location, Location}; @@ -67,7 +68,7 @@ impl IndexRemapper for DatasetIndexRemapper { let affected_frag_ids = HashSet::::from_iter(affected_fragment_ids.iter().copied()); let indices = self.dataset.load_indices().await?; let mut remapped = Vec::with_capacity(indices.len()); - for index in indices { + for index in indices.iter() { let needs_remapped = match &index.fragment_bitmap { None => true, Some(fragment_bitmap) => fragment_bitmap @@ -75,7 +76,7 @@ impl IndexRemapper for DatasetIndexRemapper { .any(|frag_idx| affected_frag_ids.contains(&(frag_idx as u64))), }; if needs_remapped { - remapped.push(self.remap_index(&index, &mapping).await?); + remapped.push(self.remap_index(index, &mapping).await?); } } Ok(remapped) diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 0a8b12c0384..f4a1bc67bc9 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -99,6 +99,7 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use datafusion::physical_plan::SendableRecordBatchStream; use futures::{StreamExt, TryStreamExt}; +use lance_index::DatasetIndexExt; use roaring::{RoaringBitmap, RoaringTreemap}; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -473,8 +474,8 @@ impl CandidateBin { async fn load_index_fragmaps(dataset: &Dataset) -> Result> { let indices = dataset.load_indices().await?; let mut index_fragmaps = Vec::with_capacity(indices.len()); - for index in indices { - if let Some(fragment_bitmap) = index.fragment_bitmap { + for index in indices.iter() { + if let Some(fragment_bitmap) = index.fragment_bitmap.as_ref() { index_fragmaps.push(fragment_bitmap.clone()); } else { let dataset_at_index = dataset.checkout_version(index.dataset_version).await?; @@ -784,7 +785,10 @@ async fn rewrite_files( // information. let fragments = migrate_fragments(dataset.as_ref(), &task.fragments, recompute_stats).await?; let mut scanner = dataset.scan(); - scanner.with_fragments(fragments.clone()).with_row_id(); + scanner + .with_fragments(fragments.clone()) + .scan_in_order(true) + .with_row_id(); let data = SendableRecordBatchStream::from(scanner.try_into_stream().await?); let row_ids = Arc::new(RwLock::new(RoaringTreemap::new())); diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index a341b277bf9..1e120f98bd7 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -40,8 +40,8 @@ use futures::TryStreamExt; use lance_arrow::floats::{coerce_float_vector, FloatType}; use lance_core::ROW_ID_FIELD; use lance_datafusion::exec::execute_plan; -use lance_index::scalar::expression::ScalarIndexExpr; use lance_index::vector::{Query, DIST_COL}; +use lance_index::{scalar::expression::ScalarIndexExpr, DatasetIndexExt}; use lance_linalg::distance::MetricType; use log::debug; use roaring::RoaringBitmap; @@ -851,7 +851,7 @@ impl Scanner { let indices = if use_index { self.dataset.load_indices().await? } else { - vec![] + Arc::new(vec![]) }; let knn_idx = indices.iter().find(|i| i.fields.contains(&column_id)); if let Some(index) = knn_idx { @@ -1338,8 +1338,7 @@ mod test { use futures::TryStreamExt; use lance_core::ROW_ID; use lance_datagen::{array, gen, BatchCount, Dimension, RowCount}; - use lance_index::vector::DIST_COL; - use lance_index::IndexType; + use lance_index::{vector::DIST_COL, DatasetIndexExt, IndexType}; use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; use tempfile::{tempdir, TempDir}; @@ -1349,7 +1348,7 @@ mod test { use crate::dataset::WriteMode; use crate::dataset::WriteParams; use crate::index::scalar::ScalarIndexParams; - use crate::index::{vector::VectorIndexParams, DatasetIndexExt}; + use crate::index::vector::VectorIndexParams; #[tokio::test] async fn test_batch_size() { @@ -2500,7 +2499,10 @@ mod test { scan.refine(100); scan.nprobs(100); - assert_eq!(dataset.index_cache_entry_count(), 0); + assert_eq!( + dataset.index_cache_entry_count(), + 1, // 1 for index metadata + ); let results = scan .try_into_stream() .await @@ -2509,7 +2511,10 @@ mod test { .await .unwrap(); - assert_eq!(dataset.index_cache_entry_count(), 5); + assert_eq!( + dataset.index_cache_entry_count(), + 5 + dataset.versions().await.unwrap().len() + ); assert_eq!(results.len(), 1); let batch = &results[0]; @@ -2753,7 +2758,7 @@ mod test { // UPDATE - dataset.optimize_indices().await.unwrap(); + dataset.optimize_indices(Default::default()).await.unwrap(); let updated_version = dataset.version().version; // APPEND -> DELETE diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 2d81c71dcd5..30a588e765e 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -15,18 +15,24 @@ //! Secondary Index //! -use std::any::Any; use std::collections::HashMap; use std::sync::Arc; use arrow_schema::DataType; use async_trait::async_trait; -use lance_core::io::{read_message, read_message_from_buf, read_metadata_offset, Reader}; +use lance_core::format::Fragment; +use lance_core::io::{ + read_message, read_message_from_buf, read_metadata_offset, reader::read_manifest_indexes, + Reader, +}; +use lance_index::optimize::OptimizeOptions; use lance_index::pb::index::Implementation; use lance_index::scalar::expression::IndexInformationProvider; use lance_index::scalar::lance_format::LanceIndexStore; use lance_index::scalar::ScalarIndex; -use lance_index::{pb, Index, IndexType, INDEX_FILE_NAME}; +pub use lance_index::IndexParams; +use lance_index::{pb, DatasetIndexExt, Index, IndexType, INDEX_FILE_NAME}; +use log::warn; use snafu::{location, Location}; use tracing::instrument; use uuid::Uuid; @@ -37,6 +43,7 @@ pub(crate) mod prefilter; pub mod scalar; pub mod vector; +use crate::dataset::index::unindexed_fragments; use crate::dataset::transaction::{Operation, Transaction}; use crate::format::Index as IndexMetadata; use crate::index::append::append_index; @@ -55,10 +62,6 @@ pub trait IndexBuilder { async fn build(&self) -> Result<()>; } -pub trait IndexParams: Send + Sync { - fn as_any(&self) -> &dyn Any; -} - pub(crate) async fn remap_index( dataset: &Dataset, index_id: &Uuid, @@ -130,34 +133,6 @@ impl IndexInformationProvider for ScalarIndexInfo { } } -/// Extends Dataset with secondary index. -#[async_trait] -pub trait DatasetIndexExt { - /// Create indices on columns. - /// - /// Upon finish, a new dataset version is generated. - /// - /// Parameters: - /// - /// - `columns`: the columns to build the indices on. - /// - `index_type`: specify [`IndexType`]. - /// - `name`: optional index name. Must be unique in the dataset. - /// if not provided, it will auto-generate one. - /// - `params`: index parameters. - /// - `replace`: replace the existing index if it exists. - async fn create_index( - &mut self, - columns: &[&str], - index_type: IndexType, - name: Option, - params: &dyn IndexParams, - replace: bool, - ) -> Result<()>; - - /// Optimize indices. - async fn optimize_indices(&mut self) -> Result<()>; -} - async fn open_index_proto(dataset: &Dataset, reader: &dyn Reader) -> Result { let object_store = dataset.object_store(); @@ -279,30 +254,94 @@ impl DatasetIndexExt for Dataset { Ok(()) } + async fn load_indices(&self) -> Result>> { + let dataset_dir = self.base.to_string(); + if let Some(indices) = self + .session + .index_cache + .get_metadata(&dataset_dir, self.version().version) + { + return Ok(indices); + } + + let manifest_file = self.manifest_file(self.version().version).await?; + let loaded_indices: Arc> = + read_manifest_indexes(&self.object_store, &manifest_file, &self.manifest) + .await? + .into(); + + self.session.index_cache.insert_metadata( + &dataset_dir, + self.version().version, + loaded_indices.clone(), + ); + Ok(loaded_indices) + } + + async fn load_scalar_index_for_column(&self, col: &str) -> Result> { + Ok(self + .load_indices() + .await? + .iter() + .filter(|idx| idx.fields.len() == 1) + .find(|idx| { + let field = self.schema().field_by_id(idx.fields[0]); + if let Some(field) = field { + field.name == col + } else { + false + } + }) + .cloned()) + } + #[instrument(skip_all)] - async fn optimize_indices(&mut self) -> Result<()> { + async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()> { let dataset = Arc::new(self.clone()); // Append index let indices = self.load_indices().await?; - let mut new_indices = vec![]; - let mut removed_indices = vec![]; - for idx in indices.as_slice() { - if idx.dataset_version == self.manifest.version { + // Collect indices by index name. + let mut column_to_indices_map: HashMap> = HashMap::new(); + for idx in indices.iter() { + if idx.fields.len() != 1 { + warn!( + "Index with multiple fields is not supported yet: fields={:?}, uuid={}", + idx.fields, idx.uuid + ); continue; } - let Some((new_id, new_frag_ids)) = append_index(dataset.clone(), idx).await? else { + column_to_indices_map + .entry(idx.name.to_owned()) + .or_default() + .push(idx); + } + + // Sort indices by its creating order. + for indices in column_to_indices_map.values_mut() { + indices.sort_by_key(|idx| idx.dataset_version); + } + + let mut new_indices = vec![]; + let mut removed_indices = vec![]; + + for indices in column_to_indices_map.values() { + let Some((new_id, removed, new_frag_ids)) = + append_index(dataset.clone(), &indices, &options).await? + else { continue; }; + let last_idx = indices.last().unwrap(); + let new_idx = IndexMetadata { uuid: new_id, - name: idx.name.clone(), - fields: idx.fields.clone(), + name: last_idx.name.clone(), + fields: last_idx.fields.clone(), dataset_version: self.manifest.version, - fragment_bitmap: new_frag_ids, + fragment_bitmap: Some(new_frag_ids), }; - removed_indices.push(idx.clone()); + removed_indices.extend(removed.iter().map(|&idx| idx.clone())); new_indices.push(new_idx); } @@ -331,6 +370,53 @@ impl DatasetIndexExt for Dataset { self.manifest = Arc::new(new_manifest); Ok(()) } + + async fn index_statistics(&self, index_name: &str) -> Result> { + let index_uuid = self + .load_index_by_name(index_name) + .await? + .map(|idx| idx.uuid.to_string()); + + if let Some(index_uuid) = index_uuid { + let index_statistics = self + .open_generic_index("vector", &index_uuid) + .await? + .statistics()?; + Ok(Some(index_statistics)) + } else { + Ok(None) + } + } + + async fn count_unindexed_rows(&self, index_uuid: &str) -> Result> { + let index = self.load_index(index_uuid).await?; + + if let Some(index) = index { + let unindexed_frags = unindexed_fragments(&index, self).await?; + let unindexed_rows = unindexed_frags + .iter() + .map(Fragment::num_rows) + // sum the number of rows in each fragment if no fragment returned None from row_count + .try_fold(0, |a, b| b.map(|b| a + b).ok_or(())); + + Ok(unindexed_rows.ok()) + } else { + Ok(None) + } + } + + async fn count_indexed_rows(&self, index_uuid: &str) -> Result> { + let count_rows = self.count_rows(); + let count_unindexed_rows = self.count_unindexed_rows(index_uuid); + + let (count_rows, count_unindexed_rows) = + futures::try_join!(count_rows, count_unindexed_rows)?; + + match count_unindexed_rows { + Some(count_unindexed_rows) => Ok(Some(count_rows - count_unindexed_rows)), + None => Ok(None), + } + } } /// A trait for internal dataset utilities @@ -503,4 +589,89 @@ mod tests { .await .is_err()); } + + #[tokio::test] + async fn test_count_index_rows() { + let test_dir = tempdir().unwrap(); + let dimensions = 16; + let column_name = "vec"; + let field = Field::new( + column_name, + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float32, true)), + dimensions, + ), + false, + ); + let schema = Arc::new(Schema::new(vec![field])); + + let float_arr = generate_random_array(512 * dimensions as usize); + + let vectors = + arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(); + + let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap(); + + let reader = + RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone()); + + let test_uri = test_dir.path().to_str().unwrap(); + let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + dataset.validate().await.unwrap(); + + // Make sure it returns None if there's no index with the passed identifier + assert_eq!(dataset.count_unindexed_rows("bad_id").await.unwrap(), None); + assert_eq!(dataset.count_indexed_rows("bad_id").await.unwrap(), None); + + // Create an index + let params = VectorIndexParams::ivf_pq(10, 8, 2, false, MetricType::L2, 10); + dataset + .create_index( + &[column_name], + IndexType::Vector, + Some("vec_idx".into()), + ¶ms, + true, + ) + .await + .unwrap(); + + let index = dataset + .load_index_by_name("vec_idx") + .await + .unwrap() + .unwrap(); + let index_uuid = &index.uuid.to_string(); + + // Make sure there are no unindexed rows + assert_eq!( + dataset.count_unindexed_rows(index_uuid).await.unwrap(), + Some(0) + ); + assert_eq!( + dataset.count_indexed_rows(index_uuid).await.unwrap(), + Some(512) + ); + + // Now we'll append some rows which shouldn't be indexed and see the + // count change + let float_arr = generate_random_array(512 * dimensions as usize); + let vectors = + arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(); + + let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap(); + + let reader = RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema); + dataset.append(reader, None).await.unwrap(); + + // Make sure the new rows are not indexed + assert_eq!( + dataset.count_unindexed_rows(index_uuid).await.unwrap(), + Some(512) + ); + assert_eq!( + dataset.count_indexed_rows(index_uuid).await.unwrap(), + Some(512) + ); + } } diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index a305391bac1..ff702318c33 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -16,54 +16,90 @@ use std::sync::Arc; use lance_core::{format::Index as IndexMetadata, Error, Result}; use lance_index::scalar::lance_format::LanceIndexStore; -use lance_index::IndexType; -use log::info; +use lance_index::{optimize::OptimizeOptions, IndexType}; use roaring::RoaringBitmap; use snafu::{location, Location}; use uuid::Uuid; -use crate::dataset::index::unindexed_fragments; +use super::vector::ivf::optimize_vector_indices; +use super::DatasetIndexInternalExt; use crate::dataset::scanner::ColumnOrdering; use crate::dataset::Dataset; -use crate::index::vector::ivf::IVFIndex; - -use super::DatasetIndexInternalExt; /// Append new data to the index, without re-train. /// /// Returns the UUID of the new index along with a vector of newly indexed fragment ids -pub async fn append_index( +/// +/// TODO: move this function to `lance-index` +pub async fn append_index<'a>( dataset: Arc, - old_index: &IndexMetadata, -) -> Result)>> { - let unindexed = unindexed_fragments(old_index, dataset.as_ref()).await?; - if unindexed.is_empty() { - return Ok(None); - }; - - let frag_bitmap = old_index.fragment_bitmap.as_ref().map(|bitmap| { - let mut bitmap = bitmap.clone(); - bitmap.extend(unindexed.iter().map(|frag| frag.id as u32)); - bitmap + old_indices: &[&'a IndexMetadata], + options: &OptimizeOptions, +) -> Result, RoaringBitmap)>> { + let mut frag_bitmap = RoaringBitmap::new(); + old_indices.iter().for_each(|idx| { + frag_bitmap.extend(idx.fragment_bitmap.as_ref().unwrap().iter()); }); + let unindexed = dataset + .fragments() + .iter() + .filter(|f| !frag_bitmap.contains(f.id as u32)) + .map(|f| f.clone()) + .collect::>(); + + let latest_idx = old_indices.last().ok_or(Error::Index { + message: "Append index: no index found".to_string(), + location: location!(), + })?; let column = dataset .schema() - .field_by_id(old_index.fields[0]) + .field_by_id(latest_idx.fields[0]) .ok_or(Error::Index { message: format!( "Append index: column {} does not exist", - old_index.fields[0] + latest_idx.fields[0] ), location: location!(), })?; - let index = dataset - .open_generic_index(&column.name, &old_index.uuid.to_string()) - .await?; + // Open all indices. + let mut indices = Vec::with_capacity(old_indices.len()); + for idx in old_indices { + let index = dataset + .open_generic_index(&column.name, &idx.uuid.to_string()) + .await?; + indices.push(index); + } + + // Sanity check. + if !indices + .windows(2) + .all(|w| w[0].index_type() == w[1].index_type()) + { + return Err(Error::Index { + message: "Append index: indices have different types".to_string(), + location: location!(), + }); + } - match index.index_type() { + let (new_uuid, indices_merged) = match indices[0].index_type() { IndexType::Scalar => { + if indices.len() != 1 { + return Err(Error::Index { + message: "Append index: scalar index does not support more than one index yet" + .to_string(), + location: location!(), + }); + } + if options.num_indices_to_merge != 1 { + return Err(Error::Index { + message: "Append index: scalar index does not support merge".to_string(), + location: location!(), + }); + } + + let old_index = &old_indices[0]; let index = dataset .open_scalar_index(&column.name, &old_index.uuid.to_string()) .await?; @@ -85,7 +121,7 @@ pub async fn append_index( index.update(new_data_stream.into(), &new_store).await?; - Ok(Some((new_uuid, frag_bitmap))) + Ok((new_uuid, 1)) } IndexType::Vector => { let mut scanner = dataset.scan(); @@ -94,30 +130,31 @@ pub async fn append_index( scanner.project(&[&column.name])?; let new_data_stream = scanner.try_into_stream().await?; - let index = dataset - .open_vector_index(&column.name, old_index.uuid.to_string().as_str()) - .await?; - - let Some(ivf_idx) = index.as_any().downcast_ref::() else { - info!("Index type: {:?} does not support append", index); - return Ok(None); - }; - - let new_index = ivf_idx - .append(dataset.as_ref(), new_data_stream, old_index, &column.name) - .await?; - - Ok(Some((new_index, frag_bitmap))) + optimize_vector_indices( + &dataset.object_store, + &dataset.indices_dir(), + dataset.version().version, + new_data_stream, + &column.name, + &indices, + options, + ) + .await } - } + }?; + + Ok(Some(( + new_uuid, + old_indices[old_indices.len() - indices_merged..].to_vec(), + frag_bitmap, + ))) } #[cfg(test)] mod tests { use super::*; - use arrow_array::cast::AsArray; - use arrow_array::{FixedSizeListArray, RecordBatch, RecordBatchIterator}; + use arrow_array::{cast::AsArray, FixedSizeListArray, RecordBatch, RecordBatchIterator}; use arrow_schema::{DataType, Field, Schema}; use futures::{stream, StreamExt, TryStreamExt}; use lance_arrow::FixedSizeListArrayExt; @@ -129,8 +166,11 @@ mod tests { use lance_testing::datagen::generate_random_array; use tempfile::tempdir; - use crate::index::vector::{pq::PQIndex, VectorIndexParams}; - use crate::index::DatasetIndexExt; + use crate::index::{vector::ivf::IVFIndex, DatasetIndexExt}; + use crate::{ + dataset::index::unindexed_fragments, + index::vector::{pq::PQIndex, VectorIndexParams}, + }; #[tokio::test] async fn test_append_index() { @@ -193,8 +233,17 @@ mod tests { .unwrap(); assert_eq!(results[0].num_rows(), 10); // Flat search. - dataset.optimize_indices().await.unwrap(); - let index = &dataset.load_indices().await.unwrap()[0]; + // Create a new delta index + dataset + .optimize_indices(OptimizeOptions { + num_indices_to_merge: 0, + }) + .await + .unwrap(); + let indices = dataset.load_indices().await.unwrap(); + assert_eq!(indices.len(), 2); + + let index = &indices[0]; assert!(unindexed_fragments(index, &dataset) .await .unwrap() @@ -246,6 +295,9 @@ mod tests { .iter() .sum::(); assert_eq!(row_in_index, 2000); - assert_eq!(dataset.index_cache_entry_count(), 6) + assert_eq!( + dataset.index_cache_entry_count(), + 6 + dataset.versions().await.unwrap().len() + ); } } diff --git a/rust/lance/src/index/cache.rs b/rust/lance/src/index/cache.rs index a39239864b0..f3e827d3431 100644 --- a/rust/lance/src/index/cache.rs +++ b/rust/lance/src/index/cache.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use lance_core::format::Index; use lance_index::scalar::ScalarIndex; use moka::sync::{Cache, ConcurrentCacheExt}; @@ -22,25 +23,33 @@ use super::vector::VectorIndex; use std::sync::atomic::{AtomicU64, Ordering}; #[derive(Debug, Default)] -pub struct CacheStats { +struct CacheStats { hits: AtomicU64, misses: AtomicU64, } impl CacheStats { - pub fn record_hit(&self) { + fn record_hit(&self) { self.hits.fetch_add(1, Ordering::Relaxed); } - pub fn record_miss(&self) { + fn record_miss(&self) { self.misses.fetch_add(1, Ordering::Relaxed); } } #[derive(Clone)] pub struct IndexCache { + // TODO: Can we merge these two caches into one for uniform memory management? scalar_cache: Arc>>, vector_cache: Arc>>, + + /// Index metadata cache. + /// + /// The key is "{dataset_base_path}:{version}". + /// Value is all the indies of a particular version of the dataset. + metadata_cache: Arc>>>, + cache_stats: Arc, } @@ -49,6 +58,7 @@ impl IndexCache { Self { scalar_cache: Arc::new(Cache::new(capacity as u64)), vector_cache: Arc::new(Cache::new(capacity as u64)), + metadata_cache: Arc::new(Cache::new(capacity as u64)), cache_stats: Arc::new(CacheStats::default()), } } @@ -62,21 +72,31 @@ impl IndexCache { pub(crate) fn get_size(&self) -> usize { self.scalar_cache.sync(); self.vector_cache.sync(); - self.scalar_cache.entry_count() as usize + self.vector_cache.entry_count() as usize + self.metadata_cache.sync(); + (self.scalar_cache.entry_count() + + self.vector_cache.entry_count() + + self.metadata_cache.entry_count()) as usize } /// Get an Index if present. Otherwise returns [None]. pub(crate) fn get_scalar(&self, key: &str) -> Option> { - self.scalar_cache.get(key) + if let Some(index) = self.scalar_cache.get(key) { + self.cache_stats.record_hit(); + Some(index) + } else { + self.cache_stats.record_miss(); + None + } } pub(crate) fn get_vector(&self, key: &str) -> Option> { - if self.vector_cache.contains_key(key) || self.scalar_cache.contains_key(key) { + if let Some(index) = self.vector_cache.get(key) { self.cache_stats.record_hit(); + Some(index) } else { self.cache_stats.record_miss(); + None } - self.vector_cache.get(key) } /// Insert a new entry into the cache. @@ -88,6 +108,29 @@ impl IndexCache { self.vector_cache.insert(key.to_string(), index); } + /// Construct a key for index metadata arrays. + fn metadata_key(dataset_uuid: &str, version: u64) -> String { + format!("{}:{}", dataset_uuid, version) + } + + /// Get all index metadata for a particular dataset version. + pub(crate) fn get_metadata(&self, key: &str, version: u64) -> Option>> { + let key = Self::metadata_key(key, version); + if let Some(indices) = self.metadata_cache.get(&key) { + self.cache_stats.record_hit(); + Some(indices) + } else { + self.cache_stats.record_miss(); + None + } + } + + pub(crate) fn insert_metadata(&self, key: &str, version: u64, indices: Arc>) { + let key = Self::metadata_key(key, version); + + self.metadata_cache.insert(key, indices); + } + /// Get cache hit ratio. #[allow(dead_code)] pub(crate) fn hit_rate(&self) -> f32 { @@ -95,8 +138,9 @@ impl IndexCache { let misses = self.cache_stats.misses.load(Ordering::Relaxed) as f32; // Returns 1.0 if hits + misses == 0 and avoids division by zero. if (hits + misses) == 0.0 { - return 1.0; + 1.0 + } else { + hits / (hits + misses) } - hits / (hits + misses) } } diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 3f12879963b..bf0a19009bc 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -36,12 +36,12 @@ use futures::{ }; use lance_arrow::*; use lance_core::io::{ - local::to_local_path, ObjectWriter, Reader, RecordBatchStream, WriteExt, Writer, -}; -use lance_core::{ - datatypes::Field, encodings::plain::PlainEncoder, format::Index as IndexMetadata, Error, Result, + local::to_local_path, object_store::ObjectStore, ObjectWriter, Reader, RecordBatchStream, + WriteExt, Writer, }; +use lance_core::{datatypes::Field, encodings::plain::PlainEncoder, Error, Result}; use lance_index::{ + optimize::OptimizeOptions, vector::{ ivf::{builder::load_precomputed_partitions, shuffler::shuffle_dataset, IvfBuildParams}, pq::{PQBuildParams, ProductQuantizer, ProductQuantizerImpl}, @@ -51,6 +51,7 @@ use lance_index::{ }; use lance_linalg::distance::{Cosine, Dot, MetricType, L2}; use log::{debug, info}; +use object_store::path::Path; use rand::{rngs::SmallRng, SeedableRng}; use roaring::RoaringBitmap; use serde::Serialize; @@ -86,9 +87,9 @@ pub struct IVFIndex { reader: Arc, /// Index in each partition. - sub_index: Arc, + pub(crate) sub_index: Arc, - metric_type: MetricType, + pub(crate) metric_type: MetricType, // The session cache holds an Arc to this object so we need to // hold a weak pointer to avoid cycles @@ -176,72 +177,6 @@ impl IVFIndex { let batch = part_index.search(&query, pre_filter).await?; Ok(batch) } - - pub(crate) async fn append( - &self, - dataset: &Dataset, - data: impl RecordBatchStream + Unpin + 'static, - metadata: &IndexMetadata, - column: &str, - ) -> Result { - let new_uuid = Uuid::new_v4(); - let object_store = dataset.object_store(); - let index_file = dataset - .indices_dir() - .child(new_uuid.to_string()) - .child(INDEX_FILE_NAME); - let mut writer = object_store.create(&index_file).await?; - - let pq_index = self - .sub_index - .as_any() - .downcast_ref::() - .ok_or(Error::Index { - message: "Only support append to IVF_PQ".to_string(), - location: location!(), - })?; - - // TODO: merge two IVF implementations. - let ivf = lance_index::vector::ivf::new_ivf_with_pq( - self.ivf.centroids.values(), - self.ivf.dimension(), - self.metric_type, - column, - pq_index.pq.clone(), - None, - )?; - - let shuffled = shuffle_dataset( - data, - column, - ivf, - None, - self.ivf.num_partitions() as u32, - pq_index.pq.num_sub_vectors(), - 10000, - 2, - ) - .await?; - let mut ivf_mut = Ivf::new(self.ivf.centroids.clone()); - write_index_partitions(&mut writer, &mut ivf_mut, shuffled, Some(&[self])).await?; - let metadata = IvfPQIndexMetadata { - name: metadata.name.clone(), - column: column.to_string(), - dimension: self.ivf.dimension() as u32, - dataset_version: dataset.version().version, - metric_type: self.metric_type, - ivf: ivf_mut, - pq: pq_index.pq.clone(), - transforms: vec![], - }; - - let metadata = pb::Index::try_from(&metadata)?; - let pos = writer.write_protobuf(&metadata).await?; - writer.write_magics(pos).await?; - writer.shutdown().await?; - - Ok(new_uuid) - } } impl std::fmt::Debug for IVFIndex { @@ -482,7 +417,7 @@ pub(crate) struct Ivf { /// /// It is a 2-D `(num_partitions * dimension)` of float32 array, 64-bit aligned via Arrow /// memory allocator. - centroids: Arc, + pub(crate) centroids: Arc, /// Offset of each partition in the file. offsets: Vec, @@ -492,7 +427,7 @@ pub(crate) struct Ivf { } impl Ivf { - fn new(centroids: Arc) -> Self { + pub(crate) fn new(centroids: Arc) -> Self { Self { centroids, offsets: vec![], @@ -501,12 +436,12 @@ impl Ivf { } /// Ivf model dimension. - fn dimension(&self) -> usize { + pub fn dimension(&self) -> usize { self.centroids.value_length() as usize } /// Number of IVF partitions. - fn num_partitions(&self) -> usize { + pub fn num_partitions(&self) -> usize { self.centroids.len() } @@ -1068,6 +1003,109 @@ async fn train_ivf_model( } } +// TODO: move to `lance-index` crate. +/// +/// Returns (new_uuid, num_indices_merged) +pub(crate) async fn optimize_vector_indices( + object_store: &ObjectStore, + index_dir: &Path, + dataset_version: u64, + unindexed: impl RecordBatchStream + Unpin + 'static, + vector_column: &str, + existing_indices: &[Arc], + options: &OptimizeOptions, +) -> Result<(Uuid, usize)> { + // Senity check the indices + if existing_indices.is_empty() { + return Err(Error::Index { + message: "optimizing vector index: no existing index found".to_string(), + location: location!(), + }); + } + + let new_uuid = Uuid::new_v4(); + let index_file = index_dir.child(new_uuid.to_string()).child(INDEX_FILE_NAME); + let mut writer = object_store.create(&index_file).await?; + + let first_idx = existing_indices[0] + .as_any() + .downcast_ref::() + .ok_or(Error::Index { + message: "optimizing vector index: first index is not IVF".to_string(), + location: location!(), + })?; + + let pq_index = first_idx + .sub_index + .as_any() + .downcast_ref::() + .ok_or(Error::Index { + message: "optimizing vector index: it is not a IVF_PQ index".to_string(), + location: location!(), + })?; + let metric_type = first_idx.metric_type; + let dim = first_idx.ivf.dimension(); + + // TODO: merge `lance::vector::ivf::IVF` and `lance-index::vector::ivf::Ivf`` implementations. + let ivf = lance_index::vector::ivf::new_ivf_with_pq( + first_idx.ivf.centroids.values(), + first_idx.ivf.dimension(), + metric_type, + vector_column, + pq_index.pq.clone(), + None, + )?; + + // Shuffled un-indexed data with partition. + let shuffled = shuffle_dataset( + unindexed, + vector_column, + ivf, + None, + first_idx.ivf.num_partitions() as u32, + pq_index.pq.num_sub_vectors(), + 10000, + 2, + ) + .await?; + + let mut ivf_mut = Ivf::new(first_idx.ivf.centroids.clone()); + + let start_pos = if options.num_indices_to_merge > existing_indices.len() { + 0 + } else { + existing_indices.len() - options.num_indices_to_merge + }; + + let indices_to_merge = existing_indices[start_pos..] + .iter() + .map(|idx| { + idx.as_any().downcast_ref::().ok_or(Error::Index { + message: "optimizing vector index: it is not a IVF index".to_string(), + location: location!(), + }) + }) + .collect::>>()?; + write_index_partitions(&mut writer, &mut ivf_mut, shuffled, Some(&indices_to_merge)).await?; + let metadata = IvfPQIndexMetadata { + name: format!("_{}_idx", vector_column), + column: vector_column.to_string(), + dimension: dim as u32, + dataset_version: dataset_version, + metric_type, + ivf: ivf_mut, + pq: pq_index.pq.clone(), + transforms: vec![], + }; + + let metadata = pb::Index::try_from(&metadata)?; + let pos = writer.write_protobuf(&metadata).await?; + writer.write_magics(pos).await?; + writer.shutdown().await?; + + Ok((new_uuid, existing_indices.len() - start_pos)) +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/lance/src/index/vector/ivf/io.rs b/rust/lance/src/index/vector/ivf/io.rs index 75db63ace09..9a3355ac46d 100644 --- a/rust/lance/src/index/vector/ivf/io.rs +++ b/rust/lance/src/index/vector/ivf/io.rs @@ -44,7 +44,7 @@ use crate::Result; /// These existing partitions must have the same centroids and PQ codebook. /// /// TODO: migrate this function to `lance-index` crate. -pub(super) async fn write_index_partitions( +pub(crate) async fn write_index_partitions( writer: &mut dyn Writer, ivf: &mut Ivf, streams: Vec>>, diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index c3a47c8f4bc..f96b0a89171 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -44,6 +44,7 @@ use lance_core::{ io::commit::{CommitConfig, CommitError}, Error, Result, }; +use lance_index::DatasetIndexExt; use object_store::path::Path; use prost::Message; @@ -332,7 +333,7 @@ pub(crate) async fn commit_transaction( } _ => transaction.build_manifest( Some(dataset.manifest.as_ref()), - dataset.load_indices().await?, + dataset.load_indices().await?.as_ref().clone(), &transaction_file, write_config, )?, @@ -413,14 +414,14 @@ mod tests { CommitError, CommitHandler, CommitLease, CommitLock, RenameCommitHandler, UnsafeCommitHandler, }; - use lance_index::IndexType; + use lance_index::{DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; use lance_testing::datagen::generate_random_array; use super::*; use crate::dataset::{transaction::Operation, WriteMode, WriteParams}; - use crate::index::{vector::VectorIndexParams, DatasetIndexExt}; + use crate::index::vector::VectorIndexParams; use crate::io::object_store::ObjectStoreParams; use crate::Dataset; diff --git a/rust/lance/src/io/exec/scalar_index.rs b/rust/lance/src/io/exec/scalar_index.rs index d55d84d3152..d4b05c5ea76 100644 --- a/rust/lance/src/io/exec/scalar_index.rs +++ b/rust/lance/src/io/exec/scalar_index.rs @@ -25,9 +25,12 @@ use lance_core::{ format::{Fragment, RowAddress}, Error, Result, ROW_ID_FIELD, }; -use lance_index::scalar::{ - expression::{ScalarIndexExpr, ScalarIndexLoader}, - ScalarIndex, +use lance_index::{ + scalar::{ + expression::{ScalarIndexExpr, ScalarIndexLoader}, + ScalarIndex, + }, + DatasetIndexExt, }; use pin_project::pin_project; use roaring::RoaringBitmap; diff --git a/rust/lance/src/utils/tfrecord.rs b/rust/lance/src/utils/tfrecord.rs index 0026bf1a921..50b23cfa09f 100644 --- a/rust/lance/src/utils/tfrecord.rs +++ b/rust/lance/src/utils/tfrecord.rs @@ -26,6 +26,7 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream; use futures::{StreamExt, TryStreamExt}; use half::{bf16, f16}; +use lance_arrow::bfloat16::{ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME}; use prost::Message; use std::collections::HashMap; use std::sync::Arc; @@ -284,7 +285,7 @@ fn make_field(name: &str, feature_meta: &FeatureMeta) -> Result { let inner_meta = match dtype { TensorDataType::DtBfloat16 => Some( - [("ARROW:extension:name", "lance.bfloat16")] + [(ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME)] .into_iter() .map(|(k, v)| (k.to_string(), v.to_string())) .collect::>(), @@ -309,11 +310,11 @@ fn make_field(name: &str, feature_meta: &FeatureMeta) -> Result { shape: shape.clone(), }; metadata.insert( - "ARROW:extension:name".to_string(), + ARROW_EXT_NAME_KEY.to_string(), "arrow.fixed_shape_tensor".to_string(), ); metadata.insert( - "ARROW:extension:metadata".to_string(), + ARROW_EXT_META_KEY.to_string(), serde_json::to_string(&tensor_metadata)?, ); Some(metadata) @@ -364,10 +365,10 @@ fn convert_column(records: &[Example], field: &ArrowField) -> Result { field.set_metadata( [ ( - "ARROW:extension:name".to_string(), - "lance.bfloat16".to_string(), + ARROW_EXT_NAME_KEY.to_string(), + BFLOAT16_EXT_NAME.to_string(), ), - ("ARROW:extension:metadata".to_string(), "".to_string()), + (ARROW_EXT_META_KEY.to_string(), "".to_string()), ] .into_iter() .collect(),