Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 87 additions & 29 deletions python/python/tests/test_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -1689,37 +1689,33 @@ def scan_stats_callback(stats: lance.ScanStatistics):
assert small_bytes_read < large_bytes_read


def test_bloomfilter_index(tmp_path: Path):
"""Test create bloomfilter index"""
tbl = pa.Table.from_arrays([pa.array([i for i in range(10000)])], names=["values"])
dataset = lance.write_dataset(tbl, tmp_path / "dataset")
dataset.create_scalar_index("values", index_type="BLOOMFILTER")
indices = dataset.list_indices()
assert len(indices) == 1

# Get detailed index statistics
index_stats = dataset.stats.index_stats("values_idx")
assert index_stats["index_type"] == "BloomFilter"
assert "indices" in index_stats
assert len(index_stats["indices"]) == 1

# Verify bloomfilter statistics
bloom_stats = index_stats["indices"][0]
assert "num_blocks" in bloom_stats
assert bloom_stats["num_blocks"] == 2
assert bloom_stats["number_of_items"] == 8192
assert "probability" in bloom_stats
assert bloom_stats["probability"] == 0.00057 # Default probability
def test_zonemap_deletion_handling(tmp_path: Path):
"""Test zonemap deletion handling"""
data = pa.table(
{
"id": range(10),
"value": [True, False] * 5,
}
)
ds = lance.write_dataset(data, "memory://")
ds.delete("NOT value")
assert ds.to_table(filter="value = True").num_rows == 5
assert ds.to_table(filter="value = False").num_rows == 0
ids = ds.to_table(filter="value = True")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]

# Test that the bloomfilter index is being used in the query plan
scanner = dataset.scanner(filter="values == 1234", prefilter=True)
plan = scanner.explain_plan()
assert "ScalarIndexQuery" in plan
ds.create_scalar_index("value", index_type="zonemap")
ids = ds.to_table(filter="value = True")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]

# Verify the query returns correct results
result = scanner.to_table()
assert result.num_rows == 1
assert result["values"][0].as_py() == 1234
# now create the index before deletion
ds = lance.write_dataset(data, "memory://")
ds.create_scalar_index("value", index_type="zonemap")
ds.delete("NOT value")
assert ds.to_table(filter="value = True").num_rows == 5
assert ds.to_table(filter="value = False").num_rows == 0
ids = ds.to_table(filter="value = True")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]


def test_zonemap_index_remapping(tmp_path: Path):
Expand Down Expand Up @@ -1778,6 +1774,68 @@ def test_zonemap_index_remapping(tmp_path: Path):
assert result.num_rows == 501 # 1000..1500 inclusive


def test_bloomfilter_index(tmp_path: Path):
"""Test create bloomfilter index"""
tbl = pa.Table.from_arrays([pa.array([i for i in range(10000)])], names=["values"])
dataset = lance.write_dataset(tbl, tmp_path / "dataset")
dataset.create_scalar_index("values", index_type="BLOOMFILTER")
indices = dataset.list_indices()
assert len(indices) == 1

# Get detailed index statistics
index_stats = dataset.stats.index_stats("values_idx")
assert index_stats["index_type"] == "BloomFilter"
assert "indices" in index_stats
assert len(index_stats["indices"]) == 1

# Verify bloomfilter statistics
bloom_stats = index_stats["indices"][0]
assert "num_blocks" in bloom_stats
assert bloom_stats["num_blocks"] == 2
assert bloom_stats["number_of_items"] == 8192
assert "probability" in bloom_stats
assert bloom_stats["probability"] == 0.00057 # Default probability

# Test that the bloomfilter index is being used in the query plan
scanner = dataset.scanner(filter="values == 1234", prefilter=True)
plan = scanner.explain_plan()
assert "ScalarIndexQuery" in plan

# Verify the query returns correct results
result = scanner.to_table()
assert result.num_rows == 1
assert result["values"][0].as_py() == 1234


def test_bloomfilter_deletion_handling(tmp_path: Path):
"""Test bloomfilter deletion handling"""
data = pa.table(
{
"id": range(10),
"value": [1, 0] * 5,
}
)
ds = lance.write_dataset(data, "memory://")
ds.delete("value = 0")
assert ds.to_table(filter="value = 1").num_rows == 5
assert ds.to_table(filter="value = 0").num_rows == 0
ids = ds.to_table(filter="value = 1")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]

ds.create_scalar_index("value", index_type="bloomfilter")
ids = ds.to_table(filter="value = 1")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]

# now create the index before deletion
ds = lance.write_dataset(data, "memory://")
ds.create_scalar_index("value", index_type="bloomfilter")
ds.delete("value = 0")
assert ds.to_table(filter="value = 1").num_rows == 5
assert ds.to_table(filter="value = 0").num_rows == 0
ids = ds.to_table(filter="value = 1")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]


def test_json_index():
vals = ['{"x": 7, "y": 10}', '{"x": 11, "y": 22}', '{"y": 0}', '{"x": 10}']
tbl = pa.table({"jsons": pa.array(vals, pa.json_())})
Expand Down
60 changes: 45 additions & 15 deletions rust/lance-index/src/scalar/bloomfilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ const BLOOMFILTER_INDEX_VERSION: u32 = 0;
#[derive(Debug, Clone)]
struct BloomFilterStatistics {
fragment_id: u64,
// zone_start is the start row of the zone in the fragment, also known
// as local row offset
// zone_start is the actual first row address (local offset within fragment)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Local offset and row address are not the same thing.

Suggested change
// zone_start is the actual first row address (local offset within fragment)
// zone_start is the start row of the zone in the fragment, also known
// as local row offset. To get the first row address, you can do
// `fragment_id << 32 + zone_start`.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, will fix the wrong wording

zone_start: u64,
zone_length: usize,
// zone_length is the address span: (last_row_addr - first_row_addr + 1)
// AKA offset in the fragment, which allows handling non-contiguous addresses after deletions

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment. What does it mean? How are deletions handled with respect to this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewrote the whole part and added two examples

zone_length: u64,
// Whether this zone contains any null values
has_null: bool,
// The actual bloom filter (SBBF) for efficient querying
Expand Down Expand Up @@ -231,7 +232,7 @@ impl BloomFilterIndex {
blocks.push(BloomFilterStatistics {
fragment_id: fragment_id_col.value(i),
zone_start: zone_start_col.value(i),
zone_length: zone_length_col.value(i) as usize,
zone_length: zone_length_col.value(i),
has_null: has_null_col.value(i),
bloom_filter,
});
Expand Down Expand Up @@ -470,9 +471,10 @@ impl ScalarIndex for BloomFilterIndex {
for block in self.zones.iter() {
if self.evaluate_block_against_query(block, query)? {
// Calculate the range of row addresses for this zone
// Row addresses are: (fragment_id << 32) + zone_start
// zone_length is the address span (not row count), so we can directly use it
// This handles non-contiguous addresses from deletions correctly
let zone_start_addr = (block.fragment_id << 32) + block.zone_start;
let zone_end_addr = zone_start_addr + (block.zone_length as u64);
let zone_end_addr = zone_start_addr + block.zone_length;

// Add all row addresses in this zone to the result
row_id_tree_map.insert_range(zone_start_addr..zone_end_addr);
Expand Down Expand Up @@ -619,6 +621,10 @@ pub struct BloomFilterIndexBuilder {
// The local offset within the current zones
cur_zone_offset: usize,
cur_fragment_id: u64,
// Track the actual first and last row addresses in the current zone
// This handles non-contiguous addresses after deletions
cur_zone_first_row_addr: Option<u64>,
cur_zone_last_row_addr: Option<u64>,
cur_zone_has_null: bool,
sbbf: Option<Sbbf>,
}
Expand All @@ -639,6 +645,8 @@ impl BloomFilterIndexBuilder {
blocks: Vec::new(),
cur_zone_offset: 0,
cur_fragment_id: 0,
cur_zone_first_row_addr: None,
cur_zone_last_row_addr: None,
cur_zone_has_null: false,
sbbf: Some(sbbf),
})
Expand Down Expand Up @@ -922,13 +930,14 @@ impl BloomFilterIndexBuilder {
}

fn new_block(&mut self, fragment_id: u64) -> Result<()> {
// Calculate zone_start based on existing zones in the same fragment
let zone_start = self
.blocks
.iter()
.filter(|block| block.fragment_id == fragment_id)
.map(|block| block.zone_length as u64)
.sum::<u64>();
// Use the actual first and last row addresses we tracked
// zone_length is the address span (last - first + 1), not row count
// This correctly handles non-contiguous addresses after deletions
let zone_start = self.cur_zone_first_row_addr.unwrap_or(0);
let zone_length = self
.cur_zone_last_row_addr
.map(|last_addr| last_addr - zone_start + 1)
.unwrap_or(self.cur_zone_offset as u64);

// Store the current bloom filter directly
let bloom_filter = if let Some(ref sbbf) = self.sbbf {
Expand All @@ -948,13 +957,15 @@ impl BloomFilterIndexBuilder {
let new_block = BloomFilterStatistics {
fragment_id,
zone_start,
zone_length: self.cur_zone_offset,
zone_length,
has_null: self.cur_zone_has_null,
bloom_filter,
};

self.blocks.push(new_block);
self.cur_zone_offset = 0;
self.cur_zone_first_row_addr = None;
self.cur_zone_last_row_addr = None;
self.cur_zone_has_null = false;

// Reset sbbf for the next block
Expand Down Expand Up @@ -1023,11 +1034,30 @@ impl BloomFilterIndexBuilder {
if desired > remaining {
// Not enough data to fill a map, just increment counts
self.update_stats(&data_array.slice(array_offset, remaining))?;

// Track first and last row addresses (local offsets within fragment)
let first_addr = row_addrs_array.value(array_offset) & 0xFFFFFFFF;
let last_addr =
row_addrs_array.value(array_offset + remaining - 1) & 0xFFFFFFFF;
if self.cur_zone_first_row_addr.is_none() {
self.cur_zone_first_row_addr = Some(first_addr);
}
self.cur_zone_last_row_addr = Some(last_addr);

self.cur_zone_offset += remaining;
break;
} else if desired > 0 {
// There is enough data, create a new zone
self.update_stats(&data_array.slice(array_offset, desired))?;

// Track first and last row addresses (local offsets within fragment)
let first_addr = row_addrs_array.value(array_offset) & 0xFFFFFFFF;
let last_addr = row_addrs_array.value(array_offset + desired - 1) & 0xFFFFFFFF;
if self.cur_zone_first_row_addr.is_none() {
self.cur_zone_first_row_addr = Some(first_addr);
}
self.cur_zone_last_row_addr = Some(last_addr);

self.cur_zone_offset += desired;
self.new_block(row_addrs_array.value(array_offset) >> 32)?;
} else if desired == 0 {
Expand Down Expand Up @@ -1059,7 +1089,7 @@ impl BloomFilterIndexBuilder {
UInt64Array::from_iter_values(self.blocks.iter().map(|block| block.zone_start));

let zone_lengths =
UInt64Array::from_iter_values(self.blocks.iter().map(|block| block.zone_length as u64));
UInt64Array::from_iter_values(self.blocks.iter().map(|block| block.zone_length));

let has_nulls = arrow_array::BooleanArray::from(
self.blocks
Expand Down
Loading
Loading