Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ arrow = { version = "49.0.0", features = ["pyarrow"] }
arrow-array = "49.0"
arrow-data = "49.0"
arrow-schema = "49.0"
object_store = "0.8.0"
object_store = "0.9.0"
async-trait = "0.1"
chrono = "0.4.31"
env_logger = "0.10"
futures = "0.3"
half = { version = "2.1", default-features = false, features = ["num-traits"] }
half = { version = "2.3", default-features = false, features = ["num-traits", "std"] }
lance = { path = "../rust/lance", features = ["tensorflow", "dynamodb"] }
lance-arrow = { path = "../rust/lance-arrow" }
lance-core = { path = "../rust/lance-core" }
Expand Down
50 changes: 48 additions & 2 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,12 @@ def create_index(
metric: str = "L2",
replace: bool = False,
num_partitions: Optional[int] = None,
ivf_centroids: Optional[Union[np.ndarray, pa.FixedSizeListArray]] = None,
ivf_centroids: Optional[
Union[np.ndarray, pa.FixedSizeListArray, pa.FixedShapeTensorArray]
] = None,
pq_codebook: Optional[
Union[np.ndarray, pa.FixedSizeListArray, pa.FixedShapeTensorArray]
] = None,
num_sub_vectors: Optional[int] = None,
accelerator: Optional[Union[str, "torch.Device"]] = None,
index_cache_size: Optional[int] = None,
Expand Down Expand Up @@ -894,9 +899,16 @@ def create_index(
Replace the existing index if it exists.
num_partitions : int, optional
The number of partitions of IVF (Inverted File Index).
ivf_centroids : ``np.ndarray`` or ``pyarrow.FixedSizeListArray``. Optional.
ivf_centroids : ``np.ndarray``, ``pyarrow.FixedSizeListArray``
or ``pyarrow.FixedShapeTensorArray``. Optional.
A ``num_partitions x dimension`` array of K-mean centroids for IVF
clustering. If not provided, a new Kmean model will be trained.
pq_codebook : ``np.ndarray``, ``pyarrow.FixedSizeListArray``
or ``pyarrow.FixedShapeTensorArray``. Optional.
A ``num_sub_vectors x (2 ^ nbits * dimensions // num_sub_vectors)``
array of K-mean centroids for PQ codebook.
Note: nbits is always 8 for now.
If not provided, a new PQ model will be trained.
num_sub_vectors : int, optional
The number of sub-vectors for PQ (Product Quantization).
accelerator : str or ``torch.Device``, optional
Expand Down Expand Up @@ -1049,6 +1061,11 @@ def create_index(
)
kwargs["precomputed_partitions_file"] = partitions_file

if (ivf_centroids is None) and (pq_codebook is not None):
raise ValueError(
"ivf_centroids must be specified when pq_codebook is provided"
)

if ivf_centroids is not None:
# User provided IVF centroids
if _check_for_numpy(ivf_centroids) and isinstance(
Expand All @@ -1075,6 +1092,35 @@ def create_index(
[ivf_centroids], ["_ivf_centroids"]
)
kwargs["ivf_centroids"] = ivf_centroids_batch

if pq_codebook is not None:
# User provided IVF centroids
if _check_for_numpy(pq_codebook) and isinstance(
pq_codebook, np.ndarray
):
if (
len(pq_codebook.shape) != 3
or pq_codebook.shape[0] != num_sub_vectors
or pq_codebook.shape[1] != 256
):
raise ValueError(
f"PQ codebook must be 3D array: (sub_vectors, 256, dim), "
f"got {pq_codebook.shape}"
)
if pq_codebook.dtype not in [np.float16, np.float32, np.float64]:
raise TypeError(
"PQ codebook must be floating number"
+ f"got {pq_codebook.dtype}"
)
values = pa.array(pq_codebook.reshape(-1))
pq_codebook = pa.FixedSizeListArray.from_arrays(
values, num_sub_vectors * 256
)
pq_codebook_batch = pa.RecordBatch.from_arrays(
[pq_codebook], ["_pq_codebook"]
)
kwargs["pq_codebook"] = pq_codebook_batch

if shuffle_partition_batches is not None:
kwargs["shuffle_partition_batches"] = shuffle_partition_batches
if shuffle_partition_concurrency is not None:
Expand Down
10 changes: 10 additions & 0 deletions python/python/lance/torch/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ def _fit_once(
del dists
del chunk

# this happens when there are too many NaNs or the data is just the same
# vectors repeated over and over. Performance may be bad but we don't
# want to crash.
if total_dist == 0:
logging.warning(
"Kmeans::train: total_dist is 0, this is unusual."
" This could result in bad performance during search."
)
raise StopIteration("kmeans: converged")

if abs(total_dist - last_dist) / total_dist < self.tolerance:
raise StopIteration("kmeans: converged")

Expand Down
51 changes: 51 additions & 0 deletions python/python/lance/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,54 @@ def predict(
"""Predict the cluster for each vector in the data."""
arr = self._to_fixed_size_list(data)
return self._kmeans.predict(arr)


def sanity_check_vector_index(
dataset,
column: str,
refine_factor: int = 5,
sample_size: Optional[int] = None,
pass_threshold: float = 1.0,
):
"""Run in-sample queries and make sure that the recall
for k=1 is very high (should be near 100%)
Parameters
----------
dataset: LanceDataset
The dataset to sanity check.
column: str
The column name of the vector column.
refine_factor: int, default=5
The refine factor to use for the nearest neighbor query.
sample_size: int, optional
The number of vectors to sample from the dataset.
If None, the entire dataset will be used.
pass_threshold: float, default=1.0
The minimum fraction of vectors that must pass the sanity check.
If less than this fraction of vectors pass, a ValueError will be raised.
"""

data = dataset.to_table() if sample_size is None else dataset.sample(sample_size)
vecs = data[column].to_numpy(zero_copy_only=False)
passes = 0
total = len(vecs)

for vec in vecs:
if np.isnan(vec).any():
total -= 1
continue
distance = dataset.to_table(
nearest={
"column": column,
"q": vec,
"k": 1,
"nprobes": 1,
"refine_factor": refine_factor,
}
)["_distance"].to_pylist()[0]
passes += 1 if abs(distance) < 1e-6 else 0

if passes / total < pass_threshold:
raise ValueError(
f"Vector index failed sanity check, only {passes}/{total} passed"
)
21 changes: 21 additions & 0 deletions python/python/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,27 @@ def test_bf16_fixed_size_list_cast():
assert casted == fsl


def test_bf16_roundtrip(tmp_path: Path):
import numpy as np
from ml_dtypes import bfloat16

values = BFloat16Array.from_numpy(np.random.random(9).astype(bfloat16))
vectors = pa.FixedSizeListArray.from_arrays(values, 3)
tensors = pa.ExtensionArray.from_storage(
pa.fixed_shape_tensor(values.type, [3]), vectors
)
data = pa.table(
{
"values": values.slice(0, 3),
"vector": vectors,
"tensors": tensors,
}
)
ds = lance.write_dataset(data, tmp_path)
assert ds.schema == data.schema
assert ds.to_table() == data


def test_roundtrip_take_ext_types(tmp_path: Path):
tensor_type = pa.fixed_shape_tensor(pa.float32(), [2, 3])
inner = pa.array([float(x) for x in range(0, 18)], pa.float32())
Expand Down
2 changes: 0 additions & 2 deletions python/python/tests/test_tf.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,6 @@ def test_tfrecord_parsing(tmp_path, sample_tf_example):


def test_tfrecord_roundtrip(tmp_path, sample_tf_example):
del sample_tf_example.features.feature["9_tensor_bf16"]

serialized = sample_tf_example.SerializeToString()

path = tmp_path / "test.tfrecord"
Expand Down
97 changes: 85 additions & 12 deletions python/python/tests/test_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,28 @@
import pyarrow as pa
import pyarrow.compute as pc
import pytest
import torch
from lance.util import sanity_check_vector_index
from lance.vector import vec_to_table


def create_table(nvec=1000, ndim=128):
def create_table(nvec=1000, ndim=128, nans=0):
mat = np.random.randn(nvec, ndim)
price = np.random.rand(nvec) * 100
if nans > 0:
nans_mat = np.empty((nans, ndim))
nans_mat[:] = np.nan
mat = np.concatenate((mat, nans_mat), axis=0)
price = np.random.rand(nvec + nans) * 100

def gen_str(n):
return "".join(random.choices(string.ascii_letters + string.digits, k=n))

meta = np.array([gen_str(100) for _ in range(nvec)])
meta = np.array([gen_str(100) for _ in range(nvec + nans)])
tbl = (
vec_to_table(data=mat)
.append_column("price", pa.array(price))
.append_column("meta", pa.array(meta))
.append_column("id", pa.array(range(nvec)))
.append_column("id", pa.array(range(nvec + nans)))
)
return tbl

Expand Down Expand Up @@ -128,6 +134,69 @@ def func(rs: pa.Table):
print(run(dataset, q=np.array(q), assert_func=func))


def test_index_with_nans(tmp_path):
# 1024 rows, the entire table should be sampled
tbl = create_table(nvec=1000, nans=24)

dataset = lance.write_dataset(tbl, tmp_path)
dataset = dataset.create_index(
"vector",
index_type="IVF_PQ",
num_partitions=4,
num_sub_vectors=16,
accelerator=torch.device("cpu"),
)
sanity_check_vector_index(dataset, "vector")


def test_index_with_no_centroid_movement(tmp_path):
# this test makes the centroids essentially [1..]
# this makes sure the early stop condition in the index building code
# doesn't do divide by zero
mat = np.concatenate([np.ones((256, 32))])

tbl = vec_to_table(data=mat)

dataset = lance.write_dataset(tbl, tmp_path)
dataset = dataset.create_index(
"vector",
index_type="IVF_PQ",
num_partitions=1,
num_sub_vectors=4,
accelerator=torch.device("cpu"),
)
sanity_check_vector_index(dataset, "vector")


def test_index_with_pq_codebook(tmp_path):
tbl = create_table(nvec=1024, ndim=128)
dataset = lance.write_dataset(tbl, tmp_path)
pq_codebook = np.random.randn(4, 256, 128 // 4).astype(np.float32)

dataset = dataset.create_index(
"vector",
index_type="IVF_PQ",
num_partitions=1,
num_sub_vectors=4,
ivf_centroids=np.random.randn(1, 128).astype(np.float32),
pq_codebook=pq_codebook,
)
sanity_check_vector_index(dataset, "vector", refine_factor=10, pass_threshold=0.99)

pq_codebook = pa.FixedShapeTensorArray.from_numpy_ndarray(pq_codebook)

dataset = dataset.create_index(
"vector",
index_type="IVF_PQ",
num_partitions=1,
num_sub_vectors=4,
ivf_centroids=np.random.randn(1, 128).astype(np.float32),
pq_codebook=pq_codebook,
replace=True,
)
sanity_check_vector_index(dataset, "vector", refine_factor=10, pass_threshold=0.99)


@pytest.mark.cuda
def test_create_index_using_cuda(tmp_path):
tbl = create_table()
Expand Down Expand Up @@ -297,8 +366,7 @@ def test_pre_populated_ivf_centroids(dataset, tmp_path: Path):
if platform.system() == "Windows":
expected_filepath = expected_filepath.replace("\\", "/")
expected_statistics = {
"index_cache_entry_count": 1,
"index_cache_hit_rate": 0,
"index_cache_entry_count": 2,
"index_type": "IVF",
"uuid": index_uuid,
"uri": expected_filepath,
Expand All @@ -316,15 +384,20 @@ def test_pre_populated_ivf_centroids(dataset, tmp_path: Path):
}

with pytest.raises(KeyError, match='Index "non-existent_idx" not found'):
# increase 1 miss of index_cache.metadata_cache
assert dataset_with_index.stats.index_stats("non-existent_idx")
with pytest.raises(KeyError, match='Index "" not found'):
# increase 1 miss of index_cache.metadata_cache
assert dataset_with_index.stats.index_stats("")
with pytest.raises(TypeError):
dataset_with_index.stats.index_stats()

# increase 1 hit of index_cache.metadata_cache
actual_statistics = dataset_with_index.stats.index_stats("vector_idx")
partitions = actual_statistics.pop("partitions")
hit_rate = actual_statistics.pop("index_cache_hit_rate")
assert actual_statistics == expected_statistics
assert np.isclose(hit_rate, 7 / 11)

assert len(partitions) == 5
partition_keys = {"index", "length", "offset", "centroid"}
Expand Down Expand Up @@ -494,24 +567,24 @@ def query_index(ds, ntimes):
)

assert (
indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 1
indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 2
)
query_index(indexed_dataset, 1)
assert (
indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 2
indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 3
)
assert (
indexed_dataset.stats.index_stats("vector_idx")["index_cache_hit_rate"] == 0.5
assert np.isclose(
indexed_dataset.stats.index_stats("vector_idx")["index_cache_hit_rate"], 18 / 25
)
query_index(indexed_dataset, 128)
assert (
indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 10
indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 11
)

indexed_dataset = lance.LanceDataset(indexed_dataset.uri, index_cache_size=5)
query_index(indexed_dataset, 128)
assert (
indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 5
indexed_dataset.stats.index_stats("vector_idx")["index_cache_entry_count"] == 6
)


Expand Down
5 changes: 3 additions & 2 deletions python/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use arrow_array::{RecordBatch, RecordBatchIterator, RecordBatchReader};
use arrow_schema::{DataType, Field, Schema};
use half::bf16;
use lance::arrow::bfloat16::BFloat16Array;
use lance_arrow::bfloat16::{ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME};
use pyo3::{
exceptions::PyValueError,
ffi::Py_uintptr_t,
Expand Down Expand Up @@ -72,8 +73,8 @@ impl BFloat16 {
}

const EXPORT_METADATA: [(&str, &str); 2] = [
("ARROW:extension:name", "lance.bfloat16"),
("ARROW:extension:metadata", ""),
(ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME),
(ARROW_EXT_META_KEY, ""),
];

#[pyfunction]
Expand Down
Loading