Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 87 additions & 29 deletions python/python/tests/test_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -1689,37 +1689,33 @@ def scan_stats_callback(stats: lance.ScanStatistics):
assert small_bytes_read < large_bytes_read


def test_bloomfilter_index(tmp_path: Path):
"""Test create bloomfilter index"""
tbl = pa.Table.from_arrays([pa.array([i for i in range(10000)])], names=["values"])
dataset = lance.write_dataset(tbl, tmp_path / "dataset")
dataset.create_scalar_index("values", index_type="BLOOMFILTER")
indices = dataset.list_indices()
assert len(indices) == 1

# Get detailed index statistics
index_stats = dataset.stats.index_stats("values_idx")
assert index_stats["index_type"] == "BloomFilter"
assert "indices" in index_stats
assert len(index_stats["indices"]) == 1

# Verify bloomfilter statistics
bloom_stats = index_stats["indices"][0]
assert "num_blocks" in bloom_stats
assert bloom_stats["num_blocks"] == 2
assert bloom_stats["number_of_items"] == 8192
assert "probability" in bloom_stats
assert bloom_stats["probability"] == 0.00057 # Default probability
def test_zonemap_deletion_handling(tmp_path: Path):
"""Test zonemap deletion handling"""
data = pa.table(
{
"id": range(10),
"value": [True, False] * 5,
}
)
ds = lance.write_dataset(data, "memory://", max_rows_per_group=5)
ds.delete("NOT value")
assert ds.to_table(filter="value = True").num_rows == 5
assert ds.to_table(filter="value = False").num_rows == 0
ids = ds.to_table(filter="value = True")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]

# Test that the bloomfilter index is being used in the query plan
scanner = dataset.scanner(filter="values == 1234", prefilter=True)
plan = scanner.explain_plan()
assert "ScalarIndexQuery" in plan
ds.create_scalar_index("value", index_type="zonemap")
ids = ds.to_table(filter="value = True")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]

# Verify the query returns correct results
result = scanner.to_table()
assert result.num_rows == 1
assert result["values"][0].as_py() == 1234
# now create the index before deletion
ds = lance.write_dataset(data, "memory://", max_rows_per_group=5)
ds.create_scalar_index("value", index_type="zonemap")
ds.delete("NOT value")
assert ds.to_table(filter="value = True").num_rows == 5
assert ds.to_table(filter="value = False").num_rows == 0
ids = ds.to_table(filter="value = True")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]


def test_zonemap_index_remapping(tmp_path: Path):
Expand Down Expand Up @@ -1778,6 +1774,68 @@ def test_zonemap_index_remapping(tmp_path: Path):
assert result.num_rows == 501 # 1000..1500 inclusive


def test_bloomfilter_index(tmp_path: Path):
"""Test create bloomfilter index"""
tbl = pa.Table.from_arrays([pa.array([i for i in range(10000)])], names=["values"])
dataset = lance.write_dataset(tbl, tmp_path / "dataset")
dataset.create_scalar_index("values", index_type="BLOOMFILTER")
indices = dataset.list_indices()
assert len(indices) == 1

# Get detailed index statistics
index_stats = dataset.stats.index_stats("values_idx")
assert index_stats["index_type"] == "BloomFilter"
assert "indices" in index_stats
assert len(index_stats["indices"]) == 1

# Verify bloomfilter statistics
bloom_stats = index_stats["indices"][0]
assert "num_blocks" in bloom_stats
assert bloom_stats["num_blocks"] == 2
assert bloom_stats["number_of_items"] == 8192
assert "probability" in bloom_stats
assert bloom_stats["probability"] == 0.00057 # Default probability

# Test that the bloomfilter index is being used in the query plan
scanner = dataset.scanner(filter="values == 1234", prefilter=True)
plan = scanner.explain_plan()
assert "ScalarIndexQuery" in plan

# Verify the query returns correct results
result = scanner.to_table()
assert result.num_rows == 1
assert result["values"][0].as_py() == 1234


def test_bloomfilter_deletion_handling(tmp_path: Path):
"""Test bloomfilter deletion handling"""
data = pa.table(
{
"id": range(10),
"value": [1, 0] * 5,
}
)
ds = lance.write_dataset(data, "memory://", max_rows_per_group=5)
ds.delete("value = 0")
assert ds.to_table(filter="value = 1").num_rows == 5
assert ds.to_table(filter="value = 0").num_rows == 0
ids = ds.to_table(filter="value = 1")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]

ds.create_scalar_index("value", index_type="bloomfilter")
ids = ds.to_table(filter="value = 1")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]

# now create the index before deletion
ds = lance.write_dataset(data, "memory://", max_rows_per_group=5)
ds.create_scalar_index("value", index_type="bloomfilter")
ds.delete("value = 0")
assert ds.to_table(filter="value = 1").num_rows == 5
assert ds.to_table(filter="value = 0").num_rows == 0
ids = ds.to_table(filter="value = 1")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]


def test_json_index():
vals = ['{"x": 7, "y": 10}', '{"x": 11, "y": 22}', '{"y": 0}', '{"x": 10}']
tbl = pa.table({"jsons": pa.array(vals, pa.json_())})
Expand Down
43 changes: 43 additions & 0 deletions rust/debug.output
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
diff --git a/Cargo.lock b/Cargo.lock

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop this file?

index 71561804..b050dca3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4661,6 +4661,7 @@ dependencies = [
"itertools 0.13.0",
"jieba-rs",
"jsonb",
+ "lance",
"lance-arrow",
"lance-core",
"lance-datafusion",
diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs
index 0a746d0e..2e8a3852 100644
--- a/rust/lance-index/src/scalar/zonemap.rs
+++ b/rust/lance-index/src/scalar/zonemap.rs
@@ -546,6 +546,10 @@ impl ScalarIndex for ZoneMapIndex {
// Row addresses are: (fragment_id << 32) + zone_start
let zone_start_addr = (zone.fragment_id << 32) + zone.zone_start;
let zone_end_addr = zone_start_addr + (zone.zone_length as u64);
+ println!(
+ "found a match! zone_start_addr: {}, zone_end_addr: {}",
+ zone_start_addr, zone_end_addr
+ );

// Add all row addresses in this zone to the result
row_id_tree_map.insert_range(zone_start_addr..zone_end_addr);
@@ -763,6 +767,7 @@ impl ZoneMapIndexBuilder {
chunk_concat_stream(batches_source, self.options.rows_per_zone as usize);

while let Some(batch) = batches_source.try_next().await? {
+ println!("incoming data is {:?}", batch);
if batch.num_rows() == 0 {
continue;
}
@@ -831,6 +836,7 @@ impl ZoneMapIndexBuilder {
if self.cur_zone_offset > 0 {
self.new_map(self.cur_fragment_id)?;
}
+ println!("after training, the zone_maps are {:?}", self.maps);

Ok(())
}
Loading