Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ log = { workspace = true }
parking_lot = { workspace = true }
parquet = { workspace = true }
rand = { workspace = true }
roaring = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sha2 = { workspace = true }
Expand Down
18 changes: 11 additions & 7 deletions src/query/catalog/src/plan/internal_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use databend_common_expression::SNAPSHOT_NAME_COLUMN_ID;
use databend_common_expression::VECTOR_SCORE_COLUMN_ID;
use databend_storages_common_table_meta::meta::try_extract_uuid_str_from_path;
use databend_storages_common_table_meta::meta::NUM_BLOCK_ID_BITS;
use roaring::RoaringTreemap;

// Segment and Block id Bits when generate internal column `_row_id`
// Assumes that the max block count of a segment is 2 ^ NUM_BLOCK_ID_BITS
Expand Down Expand Up @@ -94,15 +95,15 @@ pub fn block_idx_in_segment(block_num: usize, block_id: usize) -> usize {
}

// meta data for generate internal columns
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq, Eq)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
pub struct InternalColumnMeta {
pub segment_idx: usize,
pub block_id: usize,
pub block_location: String,
pub segment_location: String,
pub snapshot_location: Option<String>,
/// The row offsets in the block.
pub offsets: Option<Vec<usize>>,
pub offsets: Option<RoaringTreemap>,
pub base_block_ids: Option<Scalar>,
pub inner: Option<BlockMetaInfoPtr>,
// The search matched rows and optional scores in the block.
Expand Down Expand Up @@ -221,8 +222,8 @@ impl InternalColumn {
let high_32bit = compute_row_id_prefix(seg_id, block_id);
let mut row_ids = Vec::with_capacity(num_rows);
if let Some(offsets) = &meta.offsets {
for i in offsets {
let row_id = compute_row_id(high_32bit, *i as u64);
for i in offsets.iter() {
let row_id = compute_row_id(high_32bit, i);
row_ids.push(row_id);
}
} else {
Expand Down Expand Up @@ -256,8 +257,8 @@ impl InternalColumn {
});
let mut row_ids = Vec::with_capacity(num_rows);
if let Some(offsets) = &meta.offsets {
for i in offsets {
let row_id = format!("{}{:06x}", uuid, *i);
for i in offsets.iter() {
let row_id = format!("{}{:06x}", uuid, i);
row_ids.push(row_id);
}
} else {
Expand All @@ -282,6 +283,7 @@ impl InternalColumn {

let mut bitmap = MutableBitmap::from_len_zeroed(num_rows);
for (idx, _) in matched_rows.iter() {
debug_assert!(*idx < bitmap.len());
bitmap.set(*idx, true);
}
Column::Boolean(bitmap.into()).into()
Expand All @@ -292,8 +294,9 @@ impl InternalColumn {

let mut scores = vec![F32::from(0_f32); num_rows];
for (idx, score) in matched_rows.iter() {
debug_assert!(*idx < scores.len());
if let Some(val) = scores.get_mut(*idx) {
assert!(score.is_some());
debug_assert!(score.is_some());
*val = F32::from(*score.unwrap());
}
}
Expand All @@ -307,6 +310,7 @@ impl InternalColumn {
// Fill other rows with the maximum value and they will be filtered out.
let mut scores = vec![F32::from(f32::MAX); num_rows];
for (idx, score) in vector_scores.iter() {
debug_assert!(*idx < scores.len());
if let Some(val) = scores.get_mut(*idx) {
*val = *score;
}
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ parking_lot = { workspace = true }
parquet = { workspace = true }
paste = { workspace = true }
rand = { workspace = true }
roaring = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sha2 = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::Processor;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_sql::IndexType;
use roaring::RoaringTreemap;
use xorf::BinaryFuse16;

use super::native_data_source::NativeDataSource;
Expand Down Expand Up @@ -797,15 +798,17 @@ impl NativeDeserializeDataTransform {
let mut block = block.resort(&self.src_schema, &self.output_schema)?;
let fuse_part = FuseBlockPartInfo::from_part(&self.parts[0])?;
let offsets = if self.block_reader.query_internal_columns() {
let offset = self.read_state.offset;
let offset = self.read_state.offset as u64;
let offsets = if let Some(count) = self.read_state.filtered_count {
let filter_executor = self.filter_executor.as_mut().unwrap();
filter_executor.mutable_true_selection()[0..count]
.iter()
.map(|idx| *idx as usize + offset)
.collect::<Vec<_>>()
RoaringTreemap::from_sorted_iter(
filter_executor.true_selection()[0..count]
.iter()
.map(|idx| *idx as u64 + offset),
)
.unwrap()
} else {
(offset..offset + origin_num_rows).collect()
RoaringTreemap::from_sorted_iter(offset..offset + origin_num_rows as u64).unwrap()
};
Some(offsets)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::Processor;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_sql::IndexType;
use roaring::RoaringTreemap;
use xorf::BinaryFuse16;

use super::parquet_data_source::ParquetDataSource;
Expand Down Expand Up @@ -343,9 +344,12 @@ impl Processor for DeserializeDataTransform {
// `TransformAddInternalColumns` will generate internal columns using `BlockMetaIndex` in next pipeline.
let offsets = if self.block_reader.query_internal_columns() {
filter.as_ref().map(|bitmap| {
(0..origin_num_rows)
.filter(|i| unsafe { bitmap.get_bit_unchecked(*i) })
.collect()
RoaringTreemap::from_sorted_iter(
(0..origin_num_rows)
.filter(|i| unsafe { bitmap.get_bit_unchecked(*i) })
.map(|i| i as u64),
)
.unwrap()
})
} else {
None
Expand Down
34 changes: 31 additions & 3 deletions src/query/storages/fuse/src/operations/read/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use databend_common_exception::Result;
use databend_common_expression::BlockMetaInfoPtr;
use databend_common_expression::DataBlock;
use databend_common_expression::Scalar;
use roaring::RoaringTreemap;

use crate::operations::BlockMetaIndex;
use crate::FuseBlockPartInfo;
Expand All @@ -40,7 +41,7 @@ pub fn need_reserve_block_info(ctx: Arc<dyn TableContext>, table_idx: usize) ->
pub(crate) fn add_data_block_meta(
block: DataBlock,
fuse_part: &FuseBlockPartInfo,
offsets: Option<Vec<usize>>,
offsets: Option<RoaringTreemap>,
base_block_ids: Option<Scalar>,
update_stream_columns: bool,
query_internal_columns: bool,
Expand All @@ -67,6 +68,33 @@ pub(crate) fn add_data_block_meta(
if query_internal_columns {
// Fill `BlockMetaInfoPtr` if query internal columns
let block_meta = fuse_part.block_meta_index().unwrap();

// Transform matched_rows indices from block-level to page-level
let matched_rows = block_meta.matched_rows.clone().map(|matched_rows| {
if let Some(offsets) = &offsets {
matched_rows
.into_iter()
.filter(|(idx, _)| offsets.contains(*idx as u64))
.map(|(idx, score)| ((offsets.rank(idx as u64) - 1) as usize, score))
.collect::<Vec<_>>()
} else {
matched_rows
}
});

// Transform vector_scores indices from block-level to page-level
let vector_scores = block_meta.vector_scores.clone().map(|vector_scores| {
if let Some(offsets) = &offsets {
vector_scores
.into_iter()
.filter(|(idx, _)| offsets.contains(*idx as u64))
.map(|(idx, score)| ((offsets.rank(idx as u64) - 1) as usize, score))
.collect::<Vec<_>>()
} else {
vector_scores
}
});

let internal_column_meta = InternalColumnMeta {
segment_idx: block_meta.segment_idx,
block_id: block_meta.block_id,
Expand All @@ -76,8 +104,8 @@ pub(crate) fn add_data_block_meta(
offsets,
base_block_ids,
inner: meta,
matched_rows: block_meta.matched_rows.clone(),
vector_scores: block_meta.vector_scores.clone(),
matched_rows,
vector_scores,
};
meta = Some(Box::new(internal_column_meta));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ query
SELECT id, score(), content FROM t WHERE match(content, 'test')
----

query IFT
query
SELECT id, score(), content FROM t WHERE match(content, 'the')
----

Expand All @@ -47,7 +47,7 @@ SELECT id, score(), content FROM t WHERE match(content, 'word')
2 1.5948367 A picture is worth a thousand words
4 1.6550698 Actions speak louder than words

query IFT
query
SELECT id, score(), content FROM t WHERE match(content, 'box')
----

Expand Down Expand Up @@ -133,25 +133,25 @@ SELECT id, score(), content FROM t WHERE match(content, '文化 博大精深') O
15 2.063777 中国的茶文化源远流长,品茶已经成为一种生活方式。
28 7.61753 中国的饮食文化博大精深,各地的美食各具特色,让人流连忘返。

query IFT
query
SELECT id, score(), content FROM t WHERE match(content, '化博') ORDER BY score()
----


query IFT
query
SELECT id, score(), content FROM t WHERE match(content, '。') ORDER BY score()
----


query IFT
query
SELECT id, score(), content FROM t WHERE match(content, '不存在') ORDER BY score()
----


statement error 1903
SELECT id, score(), content FROM t WHERE match(content, '()')

query IFT
query
SELECT id, score(), content FROM t WHERE match(content, '()', 'lenient=true')
----

Expand Down Expand Up @@ -198,7 +198,7 @@ SELECT id, score(), content FROM t WHERE match(content, 'the')
6 2.033073 Beauty is in the eye of the beholder
10 2.033073 An apple a day keeps the doctor away

query IFT
query
SELECT id, score(), content FROM t WHERE match(content, 'fly')
----

Expand Down Expand Up @@ -306,7 +306,7 @@ SELECT id, score(), title FROM books WHERE query('title:python OR rust') ORDER B
6 0.96639454 Flask Web开发:基于Python的Web应用开发实战(第2版)
13 0.8931828 Learn AI-Assisted Python Programming: With GitHub Copilot and ChatGPT

query IFT
query
SELECT id, score(), title FROM books WHERE query('title:python AND rust') ORDER BY score() DESC
----

Expand Down Expand Up @@ -486,10 +486,32 @@ CREATE INVERTED INDEX IF NOT EXISTS idx ON t2(body) tokenizer = 'chinese'
statement ok
INSERT INTO t2 VALUES (1, null)

query IT
query
select * from t2 where query('body:test');
----

statement ok
CREATE TABLE t_native (id int, content string, INVERTED INDEX idx1 (content)) storage_format = 'native' row_per_page = 2;

statement ok
INSERT INTO t_native VALUES
(1, 'The quick brown fox jumps over the lazy dog'),
(2, 'A picture is worth a thousand words'),
(3, 'The early bird catches the worm'),
(4, 'Actions speak louder than words'),
(5, 'Time flies like an arrow; fruit flies like a banana'),
(6, 'Beauty is in the eye of the beholder'),
(7, 'When life gives you lemons, make lemonade'),
(8, 'Put all your eggs in one basket'),
(9, 'You can not judge a book by its cover'),
(10, 'An apple a day keeps the doctor away')

query IT
select * from t_native where query('content:book OR content:basket');
----
8 Put all your eggs in one basket
9 You can not judge a book by its cover

statement ok
use default

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,43 @@ SELECT id, cosine_distance(embedding2, [0.50515236, 0.8561939, 0.87169914, 0.558
----
5 0.03522873

statement ok
CREATE TABLE IF NOT EXISTS t_native(id Int, embedding Vector(8), VECTOR INDEX idx (embedding) m=10 ef_construct=40 distance='cosine') storage_format = 'native' row_per_page = 2

statement ok
INSERT INTO t_native VALUES
(1, [0.50515236, 0.8561939, 0.87169914, 0.55843271, 0.73689797, 0.49985862, 0.64527255, 0.29313098]),
(2, [0.17790798, 0.0132427, 0.55352279, 0.49129727, 0.74246407, 0.97345777, 0.83489323, 0.86012174]),
(3, [0.2703968, 0.26768266, 0.96587005, 0.04760408, 0.92289409, 0.15799311, 0.86381163, 0.2922287]),
(4, [0.0810719, 0.27882267, 0.6015564, 0.34236571, 0.58889543, 0.83293431, 0.67012723, 0.76303241]),
(5, [0.66399931, 0.35041433, 0.2159864, 0.89537508, 0.44577037, 0.57896497, 0.36630178, 0.33816571]),
(6, [0.32052319, 0.38567453, 0.62853221, 0.84816365, 0.15853234, 0.33207714, 0.7673085, 0.69513879]),
(7, [0.82590676, 0.35860656, 0.6277274, 0.95148122, 0.81893313, 0.91440945, 0.15803721, 0.5866869]),
(8, [0.42135513, 0.05637937, 0.88864157, 0.59217909, 0.98435169, 0.39234101, 0.41490889, 0.02760555])

statement ok
INSERT INTO t_native VALUES
(9, [0.61418788, 0.34545306, 0.14638622, 0.53249639, 0.09139293, 0.84940919, 0.105433, 0.4156201]),
(10, [0.21828953, 0.87852734, 0.64221122, 0.24536394, 0.81689593, 0.86341877, 0.7218334, 0.45028494]),
(11, [0.43279006, 0.45523681, 0.76060274, 0.66284758, 0.19131476, 0.13564463, 0.88712212, 0.93279565]),
(12, [0.79671359, 0.86079789, 0.94477631, 0.5116732, 0.29733205, 0.33645561, 0.41380333, 0.75909903]),
(13, [0.94666755, 0.39522571, 0.39857241, 0.88080323, 0.53470771, 0.09486194, 0.17524627, 0.86497559]),
(14, [0.8397819, 0.37221789, 0.32885295, 0.20470829, 0.49838217, 0.00736057, 0.45418757, 0.6956924 ]),
(15, [0.13230447, 0.630588, 0.10812326, 0.21558228, 0.83768057, 0.48870546, 0.65021806, 0.31626541]),
(16, [0.2667851, 0.01529589, 0.98994706, 0.31870983, 0.31783372, 0.34863699, 0.30254189, 0.84441678])

query IF
SELECT id, cosine_distance(embedding, [0.50515236, 0.8561939, 0.87169914, 0.55843271, 0.73689797, 0.49985862, 0.64527255, 0.29313098]::vector(8)) AS similarity FROM t_native ORDER BY similarity ASC LIMIT 5;
----
1 0.015007019
10 0.06970841
12 0.09774226
8 0.1490264
15 0.150944

statement ok
use default

statement ok
drop database test_vector_index


Loading