From 7b83d5522a081d2297f41c85efe621337600202f Mon Sep 17 00:00:00 2001 From: lijinglun Date: Wed, 22 Oct 2025 19:21:29 +0800 Subject: [PATCH] feat: add fuzziness to json inverted match query --- rust/lance-index/benches/inverted.rs | 8 +- rust/lance-index/src/scalar/inverted/index.rs | 36 +++-- rust/lance-index/src/scalar/inverted/query.rs | 71 +++++++++- .../lance-index/src/scalar/inverted/scorer.rs | 3 +- .../inverted/tokenizer/lance_tokenizer.rs | 30 +++++ rust/lance/src/dataset.rs | 124 ++++++++++++++++++ rust/lance/src/io/exec/fts.rs | 14 +- 7 files changed, 254 insertions(+), 32 deletions(-) diff --git a/rust/lance-index/benches/inverted.rs b/rust/lance-index/benches/inverted.rs index db625cbcd01..415c1bc3fc4 100644 --- a/rust/lance-index/benches/inverted.rs +++ b/rust/lance-index/benches/inverted.rs @@ -16,7 +16,8 @@ use lance_core::cache::LanceCache; use lance_core::ROW_ID; use lance_datagen::{array, RowCount}; use lance_index::prefilter::NoFilter; -use lance_index::scalar::inverted::query::{FtsSearchParams, Operator}; +use lance_index::scalar::inverted::lance_tokenizer::DocType; +use lance_index::scalar::inverted::query::{FtsSearchParams, Operator, Tokens}; use lance_index::scalar::inverted::{InvertedIndex, InvertedIndexBuilder}; use lance_index::scalar::lance_format::LanceIndexStore; use lance_index::{ @@ -99,7 +100,10 @@ fn bench_inverted(c: &mut Criterion) { black_box( invert_index .bm25_search( - vec![sample_words[word_idx].clone()].into(), + Arc::new(Tokens::new( + vec![sample_words[word_idx].clone()], + DocType::Text, + )), params.clone().into(), Operator::Or, no_filter.clone(), diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index 1278f1ca029..7a5afc4bf4a 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -7,10 +7,7 @@ use std::{ cmp::{min, Reverse}, collections::BinaryHeap, }; -use std::{ - collections::{HashMap, HashSet}, - ops::Range, -}; +use std::{collections::HashMap, ops::Range}; use crate::metrics::NoOpMetricsCollector; use crate::prefilter::NoFilter; @@ -232,7 +229,7 @@ impl InvertedIndex { #[instrument(level = "debug", skip_all)] pub async fn bm25_search( &self, - tokens: Arc>, + tokens: Arc, params: Arc, operator: Operator, prefilter: Arc, @@ -500,7 +497,7 @@ impl InvertedIndex { let (doc_ids, _) = self .bm25_search( - tokens.into(), + Arc::new(tokens), params.into(), Operator::And, Arc::new(NoFilter), @@ -679,7 +676,7 @@ impl InvertedPartition { self.tokens.get(token) } - pub fn expand_fuzzy(&self, tokens: &[String], params: &FtsSearchParams) -> Result> { + pub fn expand_fuzzy(&self, tokens: &Tokens, params: &FtsSearchParams) -> Result { let mut new_tokens = Vec::with_capacity(min(tokens.len(), params.max_expansions)); for token in tokens { let fuzziness = match params.fuzziness { @@ -692,8 +689,9 @@ impl InvertedPartition { location: location!(), })?; + let base_len = tokens.token_type().prefix_len(token) as u32; if let TokenMap::Fst(ref map) = self.tokens.tokens { - match params.prefix_length { + match base_len + params.prefix_length { 0 => take_fst_keys(map.search(lev), &mut new_tokens, params.max_expansions), prefix_length => { let prefix = &token[..min(prefix_length as usize, token.len())]; @@ -712,7 +710,7 @@ impl InvertedPartition { }); } } - Ok(new_tokens) + Ok(Tokens::new(new_tokens, tokens.token_type().clone())) } // search the documents that contain the query @@ -721,7 +719,7 @@ impl InvertedPartition { #[instrument(level = "debug", skip_all)] pub async fn bm25_search( &self, - tokens: &[String], + tokens: &Tokens, params: &FtsSearchParams, operator: Operator, mask: Arc, @@ -731,7 +729,7 @@ impl InvertedPartition { let is_phrase_query = params.phrase_slop.is_some(); let tokens = match is_fuzzy { true => self.expand_fuzzy(tokens, params)?, - false => tokens.to_vec(), + false => tokens.clone(), }; let mut token_ids = Vec::with_capacity(tokens.len()); for token in tokens { @@ -2337,9 +2335,7 @@ fn do_flat_full_text_search( let mut results = Vec::new(); let mut tokenizer = tokenizer.unwrap_or_else(|| InvertedIndexParams::default().build().unwrap()); - let query_tokens = collect_query_tokens(query, &mut tokenizer, None) - .into_iter() - .collect::>(); + let query_tokens = collect_query_tokens(query, &mut tokenizer, None); for batch in batches { let row_id_array = batch[ROW_ID].as_primitive::(); @@ -2361,7 +2357,7 @@ fn do_flat_full_text_search( pub fn flat_bm25_search( batch: RecordBatch, doc_col: &str, - query_tokens: &HashSet, + query_tokens: &Tokens, tokenizer: &mut Box, scorer: &mut MemBM25Scorer, ) -> std::result::Result { @@ -2389,7 +2385,7 @@ pub fn flat_bm25_search( .or_insert(1); } let mut score = 0.0; - for token in query_tokens.iter() { + for token in query_tokens { let freq = doc_token_count.get(token).copied().unwrap_or_default() as f32; let idf = idf(scorer.num_docs_containing_token(token), scorer.num_docs()); @@ -2420,10 +2416,7 @@ pub fn flat_bm25_search_stream( .build(), )), }; - let tokens = collect_query_tokens(&query, &mut tokenizer, None) - .into_iter() - .sorted_unstable() - .collect::>(); + let tokens = collect_query_tokens(&query, &mut tokenizer, None); let mut bm25_scorer = match index { Some(index) => { @@ -2473,6 +2466,7 @@ pub fn is_phrase_query(query: &str) -> bool { #[cfg(test)] mod tests { + use crate::scalar::inverted::lance_tokenizer::DocType; use lance_core::cache::LanceCache; use lance_core::utils::tempfile::TempObjDir; use lance_io::object_store::ObjectStore; @@ -2671,7 +2665,7 @@ mod tests { // Prewarm the inverted index (this loads posting lists into cache) index.prewarm().await.unwrap(); - let tokens = Arc::new(vec!["test".to_string()]); + let tokens = Arc::new(Tokens::new(vec!["test".to_string()], DocType::Text)); let params = Arc::new(FtsSearchParams::new().with_limit(Some(10))); let prefilter = Arc::new(NoFilter); let metrics = Arc::new(NoOpMetricsCollector); diff --git a/rust/lance-index/src/scalar/inverted/query.rs b/rust/lance-index/src/scalar/inverted/query.rs index dac3859f756..ad05aab29b5 100644 --- a/rust/lance-index/src/scalar/inverted/query.rs +++ b/rust/lance-index/src/scalar/inverted/query.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use crate::scalar::inverted::lance_tokenizer::DocType; use crate::scalar::inverted::tokenizer::lance_tokenizer::LanceTokenizer; use lance_core::{Error, Result}; use serde::ser::SerializeMap; @@ -650,11 +651,70 @@ impl FtsQueryNode for BooleanQuery { } } +#[derive(Clone)] +pub struct Tokens { + tokens: Vec, + tokens_set: HashSet, + token_type: DocType, +} + +impl Tokens { + pub fn new(tokens: Vec, token_type: DocType) -> Self { + let mut tokens_vec = vec![]; + let mut tokens_set = HashSet::new(); + for token in tokens.into_iter() { + tokens_vec.push(token.clone()); + tokens_set.insert(token); + } + + Self { + tokens: tokens_vec, + tokens_set, + token_type, + } + } + + pub fn len(&self) -> usize { + self.tokens.len() + } + + pub fn is_empty(&self) -> bool { + self.tokens.is_empty() + } + + pub fn token_type(&self) -> &DocType { + &self.token_type + } + + pub fn contains(&self, token: &str) -> bool { + self.tokens_set.contains(token) + } +} + +impl IntoIterator for Tokens { + type Item = String; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.tokens.into_iter() + } +} + +impl<'a> IntoIterator for &'a Tokens { + type Item = &'a String; + type IntoIter = std::slice::Iter<'a, String>; + + fn into_iter(self) -> Self::IntoIter { + self.tokens.iter() + } +} + pub fn collect_query_tokens( text: &str, tokenizer: &mut Box, inclusive: Option<&HashSet>, -) -> Vec { +) -> Tokens { + let token_type = tokenizer.doc_type(); let mut stream = tokenizer.token_stream_for_search(text); let mut tokens = Vec::new(); while let Some(token) = stream.next() { @@ -665,14 +725,15 @@ pub fn collect_query_tokens( } tokens.push(token.text.to_owned()); } - tokens + Tokens::new(tokens, token_type) } pub fn collect_doc_tokens( text: &str, tokenizer: &mut Box, - inclusive: Option<&HashSet>, -) -> Vec { + inclusive: Option<&Tokens>, +) -> Tokens { + let token_type = tokenizer.doc_type(); let mut stream = tokenizer.token_stream_for_doc(text); let mut tokens = Vec::new(); while let Some(token) = stream.next() { @@ -683,7 +744,7 @@ pub fn collect_doc_tokens( } tokens.push(token.text.to_owned()); } - tokens + Tokens::new(tokens, token_type) } pub fn fill_fts_query_column( diff --git a/rust/lance-index/src/scalar/inverted/scorer.rs b/rust/lance-index/src/scalar/inverted/scorer.rs index 78fa0ea20c6..4f38f03d712 100644 --- a/rust/lance-index/src/scalar/inverted/scorer.rs +++ b/rust/lance-index/src/scalar/inverted/scorer.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use super::InvertedPartition; +use crate::scalar::inverted::query::Tokens; use std::collections::HashMap; // the Scorer trait is used to calculate the score of a token in a document @@ -43,7 +44,7 @@ impl MemBM25Scorer { /// /// # Arguments /// * `tokens` - The tokens of the new document. - pub fn update(&mut self, tokens: &Vec) { + pub fn update(&mut self, tokens: &Tokens) { self.total_tokens += tokens.len() as u64; self.num_docs += 1; for token in tokens { diff --git a/rust/lance-index/src/scalar/inverted/tokenizer/lance_tokenizer.rs b/rust/lance-index/src/scalar/inverted/tokenizer/lance_tokenizer.rs index 30107bb2546..7bd3816f147 100644 --- a/rust/lance-index/src/scalar/inverted/tokenizer/lance_tokenizer.rs +++ b/rust/lance-index/src/scalar/inverted/tokenizer/lance_tokenizer.rs @@ -9,6 +9,7 @@ use snafu::location; use tantivy::tokenizer::{BoxTokenStream, Token, TokenStream}; /// Document type for full text search. +#[derive(Debug, Clone)] pub enum DocType { Text, Json, @@ -49,6 +50,25 @@ impl TryFrom<&Field> for DocType { } } +impl DocType { + /// Get the length of the prefix before value. + /// - JSON Token: path,type,value + /// - Text Token: value + pub fn prefix_len(&self, token: &str) -> usize { + match self { + Self::Json => { + if let Some(pos) = token.find(',') { + if let Some(second_pos) = token[pos + 1..].find(',') { + return pos + second_pos + 2; + } + } + panic!("json token must be in format of ,,") + } + Self::Text => 0, + } + } +} + /// Lance full text search tokenizer. /// /// `LanceTokenizer` defines 2 methods for tokenization, normally they are the same, but sometimes @@ -63,6 +83,8 @@ pub trait LanceTokenizer: Send + Sync { fn token_stream_for_doc<'a>(&'a mut self, text: &'a str) -> BoxTokenStream<'a>; /// Clone the tokenizer. fn box_clone(&self) -> Box; + /// Get document type. + fn doc_type(&self) -> DocType; } impl Clone for Box { @@ -94,6 +116,10 @@ impl LanceTokenizer for TextTokenizer { fn box_clone(&self) -> Box { Box::new(self.clone()) } + + fn doc_type(&self) -> DocType { + DocType::Text + } } #[derive(Clone)] @@ -129,6 +155,10 @@ impl LanceTokenizer for JsonTokenizer { fn box_clone(&self) -> Box { Box::new(self.clone()) } + + fn doc_type(&self) -> DocType { + DocType::Json + } } fn flatten_triplet( diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index bf890eebebc..a165ab73812 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -8211,6 +8211,130 @@ mod tests { (dataset, json_col) } + #[tokio::test] + async fn test_json_inverted_fuzziness_query() { + let (mut dataset, json_col) = prepare_json_dataset().await; + + // Create inverted index for json col + dataset + .create_index( + &[&json_col], + IndexType::Inverted, + None, + &InvertedIndexParams::default().lance_tokenizer("json".to_string()), + true, + ) + .await + .unwrap(); + + // Match query with fuzziness + let query = FullTextSearchQuery { + query: FtsQuery::Match( + MatchQuery::new("Content,str,Dursley".to_string()) + .with_column(Some(json_col.clone())), + ), + limit: None, + wand_factor: None, + }; + let batch = dataset + .scan() + .full_text_search(query) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(1, batch.num_rows()); + + let query = FullTextSearchQuery { + query: FtsQuery::Match( + MatchQuery::new("Content,str,Bursley".to_string()) + .with_column(Some(json_col.clone())), + ), + limit: None, + wand_factor: None, + }; + let batch = dataset + .scan() + .full_text_search(query) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(0, batch.num_rows()); + + let query = FullTextSearchQuery { + query: FtsQuery::Match( + MatchQuery::new("Content,str,Bursley".to_string()) + .with_column(Some(json_col.clone())) + .with_fuzziness(Some(1)), + ), + limit: None, + wand_factor: None, + }; + let batch = dataset + .scan() + .full_text_search(query) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(1, batch.num_rows()); + + let query = FullTextSearchQuery { + query: FtsQuery::Match( + MatchQuery::new("Content,str,ABursley".to_string()) + .with_column(Some(json_col.clone())) + .with_fuzziness(Some(1)), + ), + limit: None, + wand_factor: None, + }; + let batch = dataset + .scan() + .full_text_search(query) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(0, batch.num_rows()); + + let query = FullTextSearchQuery { + query: FtsQuery::Match( + MatchQuery::new("Content,str,ABursley".to_string()) + .with_column(Some(json_col.clone())) + .with_fuzziness(Some(2)), + ), + limit: None, + wand_factor: None, + }; + let batch = dataset + .scan() + .full_text_search(query) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(1, batch.num_rows()); + + let query = FullTextSearchQuery { + query: FtsQuery::Match( + MatchQuery::new("Dontent,str,Bursley".to_string()) + .with_column(Some(json_col.clone())) + .with_fuzziness(Some(2)), + ), + limit: None, + wand_factor: None, + }; + let batch = dataset + .scan() + .full_text_search(query) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(0, batch.num_rows()); + } + #[tokio::test] async fn test_json_inverted_match_query() { let (mut dataset, json_col) = prepare_json_dataset().await; diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 22363fe8e7f..2ba197da8a0 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -26,6 +26,7 @@ use super::PreFilterSource; use crate::{index::DatasetIndexInternalExt, Dataset}; use lance_index::metrics::MetricsCollector; use lance_index::scalar::inverted::builder::document_input; +use lance_index::scalar::inverted::lance_tokenizer::{DocType, JsonTokenizer, LanceTokenizer}; use lance_index::scalar::inverted::query::{ collect_query_tokens, BoostQuery, FtsSearchParams, MatchQuery, PhraseQuery, }; @@ -255,7 +256,14 @@ impl ExecutionPlan for MatchQueryExec { let tokenizer = tantivy::tokenizer::TextAnalyzer::from( tantivy::tokenizer::SimpleTokenizer::default(), ); - Box::new(TextTokenizer::new(tokenizer)) + match inverted_idx.tokenizer().doc_type() { + DocType::Text => { + Box::new(TextTokenizer::new(tokenizer)) as Box + } + DocType::Json => { + Box::new(JsonTokenizer::new(tokenizer)) as Box + } + } } }; let tokens = collect_query_tokens(&query.terms, &mut tokenizer, None); @@ -263,7 +271,7 @@ impl ExecutionPlan for MatchQueryExec { pre_filter.wait_for_ready().await?; let (doc_ids, mut scores) = inverted_idx .bm25_search( - tokens.into(), + Arc::new(tokens), params.into(), query.operator, pre_filter, @@ -646,7 +654,7 @@ impl ExecutionPlan for PhraseQueryExec { pre_filter.wait_for_ready().await?; let (doc_ids, scores) = index .bm25_search( - tokens.into(), + Arc::new(tokens), params.into(), lance_index::scalar::inverted::query::Operator::And, pre_filter,