From 9e1e4a518143236371b76ecb6f1da5c694eb867b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 18 Apr 2024 21:33:06 +0800 Subject: [PATCH] =?UTF-8?q?it=20works=20on=20my=20machine=20=C2=AF\=5F(?= =?UTF-8?q?=E3=83=84)=5F/=C2=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ruihang Xia --- Cargo.lock | 12 +--- Cargo.toml | 21 ++++-- feed.py | 54 ++++++++++++++++ src/common/function/src/scalars.rs | 1 + src/common/function/src/scalars/matches.rs | 61 ++++++++++++++++++ src/common/function/src/scalars/math.rs | 2 + src/common/substrait/Cargo.toml | 4 +- src/common/substrait/src/df_substrait.rs | 18 ++++++ src/index/Cargo.toml | 6 +- src/index/src/full_text_index/create.rs | 23 +++++-- src/index/src/full_text_index/search.rs | 2 +- src/mito2/src/access_layer.rs | 7 ++ src/mito2/src/read/scan_region.rs | 26 ++++++-- src/mito2/src/sst/index.rs | 2 + src/mito2/src/sst/index/applier.rs | 50 +++++++++++++++ src/mito2/src/sst/index/creator.rs | 9 ++- src/mito2/src/sst/parquet/reader.rs | 74 +++++++++++++++++++++- 17 files changed, 336 insertions(+), 36 deletions(-) create mode 100644 feed.py create mode 100644 src/common/function/src/scalars/matches.rs diff --git a/Cargo.lock b/Cargo.lock index 5dcdc036f93b..f6ccd7881ccc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2550,7 +2550,6 @@ checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" [[package]] name = "datafusion" version = "32.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9" dependencies = [ "ahash 0.8.6", "arrow", @@ -2597,7 +2596,6 @@ dependencies = [ [[package]] name = "datafusion-common" version = "32.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9" dependencies = [ "ahash 0.8.6", "arrow", @@ -2615,7 +2613,6 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "32.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9" dependencies = [ "arrow", "chrono", @@ -2635,7 +2632,6 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "32.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9" dependencies = [ "ahash 0.8.6", "arrow", @@ -2649,7 +2645,6 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "32.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9" dependencies = [ "arrow", "async-trait", @@ -2666,7 +2661,6 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "32.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9" dependencies = [ "ahash 0.8.6", "arrow", @@ -2699,7 +2693,6 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "32.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9" dependencies = [ "ahash 0.8.6", "arrow", @@ -2729,7 +2722,6 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "32.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9" dependencies = [ "arrow", "arrow-schema", @@ -2742,7 +2734,6 @@ dependencies = [ [[package]] name = "datafusion-substrait" version = "32.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9" dependencies = [ "async-recursion", "chrono", @@ -9934,6 +9925,7 @@ dependencies = [ "bytes", "catalog", "common-error", + "common-function", "common-macro", "datafusion", "datafusion-common", @@ -9942,6 +9934,7 @@ dependencies = [ "datatypes", "promql", "prost 0.12.3", + "session", "snafu", "substrait 0.17.1", "tokio", @@ -10183,6 +10176,7 @@ dependencies = [ "time", "uuid", "winapi", + "zstd 0.13.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 788bc68798e0..ee9ada37fe14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,13 +91,20 @@ bytes = { version = "1.5", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] } clap = { version = "4.4", features = ["derive"] } dashmap = "5.4" -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } -datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } -datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } -datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } -datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } -datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } -datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } +# datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } +# datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } +# datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } +# datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } +# datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } +# datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } +# datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } +datafusion = { path = "../arrow-datafusion/datafusion/core" } +datafusion-common = { path = "../arrow-datafusion/datafusion/common" } +datafusion-expr = { path = "../arrow-datafusion/datafusion/expr" } +datafusion-optimizer = { path = "../arrow-datafusion/datafusion/optimizer" } +datafusion-physical-expr = { path = "../arrow-datafusion/datafusion/physical-expr" } +datafusion-sql = { path = "../arrow-datafusion/datafusion/sql" } +datafusion-substrait = { path = "../arrow-datafusion/datafusion/substrait" } derive_builder = "0.12" dotenv = "0.15" etcd-client = "0.12" diff --git a/feed.py b/feed.py new file mode 100644 index 000000000000..dfe06f95324c --- /dev/null +++ b/feed.py @@ -0,0 +1,54 @@ +# read line from log-1000.txt and POST it to http://localhost:4000/v1/influxdb/write?db=public&precision=ms +# POST data format: "many_logs,host=1 log= " + +import requests +from tqdm import tqdm +from concurrent.futures import ThreadPoolExecutor + +batch_size = 3000 +worker = 8 + +# Define the URL +url = "http://localhost:4000/v1/influxdb/write?db=public&precision=ms" + + +def send_data(start, data): + # Send the POST request + response = requests.post(url, data=data) + # Check the response + if response.status_code >= 300: + print( + f"Failed to send log line {start}: {response.status_code} {response.text}" + ) + + +# Open the file +with open("target/log-1000.txt", "r") as file: + lines = file.readlines() + +# Create a progress bar +with tqdm( + total=len(lines), + desc="Processing lines", + bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}", +) as pbar: + data = "" + with ThreadPoolExecutor(max_workers=worker) as executor: + for i, line in enumerate(lines): + # Prepare the POST data + content = line.strip() + content = content.replace('"', " ") + content = content.replace("'", " ") + content = content.replace("=", " ") + content = content.replace(".", " ") + + data = data + f'many_logs,host=1 log="{content}" {i}\n' + + if i % batch_size == 0: + executor.submit(send_data, i, data) + data = "" + # Update the progress bar + pbar.update(batch_size) + + # close the executor + executor.shutdown(wait=True) diff --git a/src/common/function/src/scalars.rs b/src/common/function/src/scalars.rs index 143d3f9cbbcd..2b3f463e9437 100644 --- a/src/common/function/src/scalars.rs +++ b/src/common/function/src/scalars.rs @@ -15,6 +15,7 @@ pub mod aggregate; pub(crate) mod date; pub mod expression; +pub mod matches; pub mod math; pub mod numpy; #[cfg(test)] diff --git a/src/common/function/src/scalars/matches.rs b/src/common/function/src/scalars/matches.rs new file mode 100644 index 000000000000..f7ab6e33c2ad --- /dev/null +++ b/src/common/function/src/scalars/matches.rs @@ -0,0 +1,61 @@ +// Copyright 2024 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; +use std::fmt::Display; +use std::sync::Arc; + +use common_query::error::Result; +use common_query::prelude::{Signature, Volatility}; +use datatypes::prelude::ConcreteDataType; +use datatypes::vectors::{BooleanVector, VectorRef}; + +use crate::function::{Function, FunctionContext}; + +const NAME: &str = "matches"; + +/// The function to find remainders +#[derive(Clone, Debug, Default)] +pub struct MatchesFunction; + +impl Display for MatchesFunction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", NAME.to_ascii_uppercase()) + } +} + +impl Function for MatchesFunction { + fn name(&self) -> &str { + NAME + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::boolean_datatype()) + } + + fn signature(&self) -> Signature { + Signature::exact( + vec![ + ConcreteDataType::string_datatype(), + ConcreteDataType::string_datatype(), + ], + Volatility::Immutable, + ) + } + + fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + let num_rows = columns[1].len(); + Ok(Arc::new(BooleanVector::from(vec![true; num_rows]))) + } +} diff --git a/src/common/function/src/scalars/math.rs b/src/common/function/src/scalars/math.rs index f7d50f881dfb..623111a9254d 100644 --- a/src/common/function/src/scalars/math.rs +++ b/src/common/function/src/scalars/math.rs @@ -31,6 +31,7 @@ pub use pow::PowFunction; pub use rate::RateFunction; use snafu::ResultExt; +use super::matches::MatchesFunction; use crate::function::{Function, FunctionContext}; use crate::function_registry::FunctionRegistry; use crate::scalars::math::modulo::ModuloFunction; @@ -44,6 +45,7 @@ impl MathFunction { registry.register(Arc::new(RateFunction)); registry.register(Arc::new(RangeFunction)); registry.register(Arc::new(ClampFunction)); + registry.register(Arc::new(MatchesFunction)); } } diff --git a/src/common/substrait/Cargo.toml b/src/common/substrait/Cargo.toml index 9ac4fc150f77..24dd8923fa54 100644 --- a/src/common/substrait/Cargo.toml +++ b/src/common/substrait/Cargo.toml @@ -12,14 +12,16 @@ async-trait.workspace = true bytes.workspace = true catalog.workspace = true common-error.workspace = true +common-function.workspace = true common-macro.workspace = true -datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true datafusion-substrait.workspace = true +datafusion.workspace = true datatypes.workspace = true promql.workspace = true prost.workspace = true +session.workspace = true snafu.workspace = true [dependencies.substrait_proto] diff --git a/src/common/substrait/src/df_substrait.rs b/src/common/substrait/src/df_substrait.rs index c4e1db9a560e..06e7b357ba6a 100644 --- a/src/common/substrait/src/df_substrait.rs +++ b/src/common/substrait/src/df_substrait.rs @@ -16,6 +16,9 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::{Buf, Bytes, BytesMut}; +use common_function::scalars::matches::MatchesFunction; +use common_function::scalars::udf::create_udf; +use common_function::state::FunctionState; use datafusion::catalog::CatalogList; use datafusion::execution::context::SessionState; use datafusion::execution::runtime_env::RuntimeEnv; @@ -24,6 +27,7 @@ use datafusion_expr::LogicalPlan; use datafusion_substrait::logical_plan::consumer::from_substrait_plan; use datafusion_substrait::logical_plan::producer::to_substrait_plan; use prost::Message; +use session::context::QueryContext; use snafu::ResultExt; use substrait_proto::proto::Plan; @@ -50,6 +54,13 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor { let state = SessionState::new_with_config_rt(state_config, Arc::new(RuntimeEnv::default())) .with_serializer_registry(Arc::new(ExtensionSerializer)); let mut context = SessionContext::new_with_state(state); + + let udf = create_udf( + Arc::new(MatchesFunction), + QueryContext::arc(), + Arc::new(FunctionState::default()), + ); + context.register_udf(udf.into()); context.register_catalog_list(catalog_list); let plan = Plan::decode(message).context(DecodeRelSnafu)?; let df_plan = from_substrait_plan(&mut context, &plan) @@ -65,6 +76,13 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor { .with_serializer_registry(Arc::new(ExtensionSerializer)); let context = SessionContext::new_with_state(session_state); + let udf = create_udf( + Arc::new(MatchesFunction), + QueryContext::arc(), + Arc::new(FunctionState::default()), + ); + context.register_udf(udf.into()); + let substrait_plan = to_substrait_plan(plan, &context).context(EncodeDfPlanSnafu)?; substrait_plan.encode(&mut buf).context(EncodeRelSnafu)?; diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index 65df20b2fc8c..261f95081fad 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -22,13 +22,13 @@ greptime-proto.workspace = true mockall.workspace = true pin-project.workspace = true prost.workspace = true -regex.workspace = true regex-automata.workspace = true +regex.workspace = true snafu.workspace = true -tantivy = "0.22" +tantivy = { version = "0.22", features = ["zstd-compression"] } [dev-dependencies] rand.workspace = true tempfile.workspace = true -tokio.workspace = true tokio-util.workspace = true +tokio.workspace = true diff --git a/src/index/src/full_text_index/create.rs b/src/index/src/full_text_index/create.rs index 1a913f862196..faa7a0ef3e71 100644 --- a/src/index/src/full_text_index/create.rs +++ b/src/index/src/full_text_index/create.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; use std::path::Path; use snafu::ResultExt; -use tantivy::schema::{OwnedValue, Schema, INDEXED, STORED, TEXT}; -use tantivy::{Document, Index, IndexWriter, TantivyDocument}; +use tantivy::schema::{Schema, INDEXED, STORED, TEXT}; +use tantivy::store::{Compressor, ZstdCompressor}; +use tantivy::{Index, IndexWriter, TantivyDocument}; use super::error::TantivySnafu; use crate::full_text_index::error::Result; @@ -43,12 +43,20 @@ impl FullTextIndexCreater { let text_field = schema_builder.add_text_field("text", TEXT); let schema = schema_builder.build(); + // create path + std::fs::create_dir_all(&path).unwrap(); + common_telemetry::info!("[DEBUG] create full text index in {:?}", path.as_ref()); + // build index - let index = Index::create_in_dir(path, schema).context(TantivySnafu)?; + let mut index = Index::create_in_dir(path, schema).context(TantivySnafu)?; + + // tune + index.settings_mut().docstore_compression = Compressor::Zstd(ZstdCompressor::default()); + index.settings_mut().docstore_blocksize = 65_536; // build writer // 100 MB - let writer = index.writer(100_000_000).context(TantivySnafu)?; + let writer = index.writer(400_000_000).context(TantivySnafu)?; Ok(Self { index, @@ -67,11 +75,14 @@ impl FullTextIndexCreater { self.writer.add_document(doc).context(TantivySnafu)?; self.row_count += 1; - self.writer.commit().context(TantivySnafu)?; Ok(()) } pub fn finish(&mut self) -> Result<()> { + common_telemetry::info!( + "[DEBUG] full text index finish with {} entries", + self.row_count + ); self.row_count = 0; self.writer.commit().context(TantivySnafu)?; Ok(()) diff --git a/src/index/src/full_text_index/search.rs b/src/index/src/full_text_index/search.rs index 6b75cabe3b06..72f42800f032 100644 --- a/src/index/src/full_text_index/search.rs +++ b/src/index/src/full_text_index/search.rs @@ -55,7 +55,7 @@ impl FullTextIndexSearcher { let query_parser = QueryParser::for_index(&self.index, vec![self.text_field]); let query = query_parser.parse_query(query).context(ParseQuerySnafu)?; let top_docs = searcher - .search(&query, &tantivy::collector::TopDocs::with_limit(100)) + .search(&query, &tantivy::collector::TopDocs::with_limit(1000_0000)) .context(TantivySnafu)?; let mut result = HashSet::new(); for (_score, doc_address) in top_docs { diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index ef8d2358a8dc..d826bf4a355b 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -96,6 +96,13 @@ impl AccessLayer { })?; } + let full_text_index_dir = format!( + "/tmp/greptimedb/{}index/{}/full_text_index", + self.region_dir, file_meta.file_id + ); + common_telemetry::info!("[DEBUG] removing {}", full_text_index_dir); + tokio::fs::remove_dir(full_text_index_dir).await.unwrap(); + Ok(()) } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 92d1d6b4245d..b936bc93c48c 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -39,7 +39,7 @@ use crate::read::{compat, Batch, Source}; use crate::region::version::VersionRef; use crate::sst::file::FileHandle; use crate::sst::index::applier::builder::SstIndexApplierBuilder; -use crate::sst::index::applier::SstIndexApplierRef; +use crate::sst::index::applier::{FullTextIndexApplier, SstIndexApplierRef}; /// A scanner scans a region and returns a [SendableRecordBatchStream]. pub(crate) enum Scanner { @@ -269,6 +269,7 @@ impl ScanRegion { ); let index_applier = self.build_index_applier(); + let full_text_index_applier = self.build_full_text_index_applier(); let predicate = Predicate::new(self.request.filters.clone()); // The mapper always computes projected column ids as the schema of SSTs may change. let mapper = match &self.request.projection { @@ -283,6 +284,7 @@ impl ScanRegion { .with_files(files) .with_cache(self.cache_manager) .with_index_applier(index_applier) + .with_full_index_applier(full_text_index_applier) .with_parallelism(self.parallelism) .with_start_time(self.start_time) .with_append_mode(self.version.options.append_mode) @@ -337,9 +339,12 @@ impl ScanRegion { .map(Arc::new) } - fn build_full_text_index_applier(&self) -> Option { - // start here - todo!() + fn build_full_text_index_applier(&self) -> Option { + FullTextIndexApplier::new( + self.access_layer.region_dir().to_string(), + self.version.metadata.region_id, + &self.request.filters, + ) } } @@ -398,6 +403,8 @@ pub(crate) struct ScanInput { pub(crate) append_mode: bool, /// Whether to remove deletion markers. pub(crate) filter_deleted: bool, + + full_text_index_applier: Option, } impl ScanInput { @@ -418,6 +425,7 @@ impl ScanInput { query_start: None, append_mode: false, filter_deleted: true, + full_text_index_applier: None, } } @@ -477,6 +485,15 @@ impl ScanInput { self } + #[must_use] + pub(crate) fn with_full_index_applier( + mut self, + index_applier: Option, + ) -> Self { + self.full_text_index_applier = index_applier; + self + } + /// Sets start time of the query. #[must_use] pub(crate) fn with_start_time(mut self, now: Option) -> Self { @@ -514,6 +531,7 @@ impl ScanInput { .projection(Some(self.mapper.column_ids().to_vec())) .cache(self.cache_manager.clone()) .index_applier(self.index_applier.clone()) + .full_text_index_applier(self.full_text_index_applier.clone()) .build() .await; let reader = match maybe_reader { diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index af35dcddfec7..922eb120d555 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -63,6 +63,8 @@ impl Indexer { // Skip index creation if error occurs. self.inner = None; } + } else { + common_telemetry::info!("[DEBUG] Indexer::update: inner is None"); } if let Some(creator) = self.inner.as_ref() { diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index f14251afb9d5..e1451f806d90 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -16,7 +16,10 @@ pub mod builder; use std::sync::Arc; +use common_query::logical_plan::Expr; +use datafusion_expr::Expr as DfExpr; use futures::{AsyncRead, AsyncSeek}; +use index::full_text_index::search::FullTextIndexSearcher; use index::inverted_index::format::reader::InvertedIndexBlobReader; use index::inverted_index::search::index_apply::{ ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext, @@ -172,6 +175,53 @@ impl Drop for SstIndexApplier { } } +#[derive(Debug, Clone)] +pub(crate) struct FullTextIndexApplier { + region_dir: String, + region_id: RegionId, + query: String, +} + +impl FullTextIndexApplier { + pub fn new(region_dir: String, region_id: RegionId, filters: &[Expr]) -> Option { + let query = Self::extract_from_filter(filters)?; + Some(Self { + region_dir, + region_id, + query, + }) + } + + fn extract_from_filter(filters: &[Expr]) -> Option { + common_telemetry::info!("[DEBUG] filters in scan request: {:?}", filters); + for filter in filters { + if let DfExpr::ScalarUDF(udf) = filter.df_expr() + && udf.fun.name == "matches" + { + let pattern = &udf.args[0]; + if let DfExpr::Literal(literal) = pattern { + return Some(literal.to_string()); + } else { + return None; + } + } + } + None + } + + /// Returns the selected row number + pub fn apply(&self, file_id: FileId) -> Result> { + let index_path = format!( + "/tmp/greptimedb/{}index/{}/full_text_index", + self.region_dir, file_id + ); + common_telemetry::info!("[DEBUG] open index at {index_path}"); + + let searcher = FullTextIndexSearcher::open(index_path).unwrap(); + Ok(searcher.search(&self.query).unwrap()) + } +} + #[cfg(test)] mod tests { use common_base::BitVec; diff --git a/src/mito2/src/sst/index/creator.rs b/src/mito2/src/sst/index/creator.rs index 893540e18695..ea14d1dab60d 100644 --- a/src/mito2/src/sst/index/creator.rs +++ b/src/mito2/src/sst/index/creator.rs @@ -121,9 +121,11 @@ impl SstIndexCreator { ); let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count)); - let full_text_index_path = format!("{file_path}/full_text_index"); - let full_text_index_creater = - FullTextIndexCreater::new(segment_row_count.get(), full_text_index_path).unwrap(); + let file_id = file_path.trim_end_matches(".puffin"); + let full_text_index_path = format!("/tmp/greptimedb/{file_id}/full_text_index"); + // let full_text_index_creater = + // FullTextIndexCreater::new(segment_row_count.get(), full_text_index_path).unwrap(); + let full_text_index_creater = FullTextIndexCreater::new(1, full_text_index_path).unwrap(); let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns()); Self { @@ -250,6 +252,7 @@ impl SstIndexCreator { } // try find column named "log" and update it into full text index + common_telemetry::info!("[DEBUG] do_update: log_column_id: {:?}", self.log_column_id); if let Some(log_column_id) = self.log_column_id { for col in batch.fields() { if col.column_id == log_column_id { diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 4afaf3ca39ba..84952995eb55 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -29,7 +29,7 @@ use datafusion_common::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; use itertools::Itertools; use object_store::ObjectStore; -use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; +use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection, RowSelector}; use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; use parquet::file::metadata::ParquetMetaData; use parquet::format::KeyValue; @@ -50,7 +50,7 @@ use crate::metrics::{ use crate::read::{Batch, BatchReader}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; use crate::sst::file::FileHandle; -use crate::sst::index::applier::SstIndexApplierRef; +use crate::sst::index::applier::{FullTextIndexApplier, SstIndexApplierRef}; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::row_group::InMemoryRowGroup; @@ -77,6 +77,8 @@ pub(crate) struct ParquetReaderBuilder { cache_manager: Option, /// Index applier. index_applier: Option, + + full_text_index_applier: Option, } impl ParquetReaderBuilder { @@ -95,6 +97,7 @@ impl ParquetReaderBuilder { projection: None, cache_manager: None, index_applier: None, + full_text_index_applier: None, } } @@ -131,6 +134,15 @@ impl ParquetReaderBuilder { self } + #[must_use] + pub fn full_text_index_applier( + mut self, + full_text_index_applier: Option, + ) -> Self { + self.full_text_index_applier = full_text_index_applier; + self + } + /// Builds and initializes a [ParquetReader]. /// /// This needs to perform IO operation. @@ -280,6 +292,11 @@ impl ParquetReaderBuilder { } metrics.num_row_groups_before_filtering += num_row_groups; + if let Some(full_text_index_result) = self.prune_row_groups_by_full_text_index(parquet_meta) + { + return full_text_index_result; + } + self.prune_row_groups_by_inverted_index(parquet_meta, metrics) .await .or_else(|| self.prune_row_groups_by_minmax(read_format, parquet_meta, metrics)) @@ -408,6 +425,59 @@ impl ParquetReaderBuilder { Some(row_groups) } + + fn prune_row_groups_by_full_text_index( + &self, + parquet_meta: &ParquetMetaData, + ) -> Option>> { + let applier = self.full_text_index_applier.as_ref()?; + let file_id = self.file_handle.file_id(); + let mut selected_row = applier.apply(file_id).unwrap(); + + common_telemetry::info!("[DEBUG] selected_row: {:?}", selected_row.len()); + + // Let's assume that the number of rows in the first row group + // can represent the `row_group_size` of the Parquet file. + // + // If the file contains only one row group, i.e. the number of rows + // less than the `row_group_size`, the calculation of `row_group_id` + // and `rg_begin_row_id` is still correct. + let row_group_size = parquet_meta.row_group(0).num_rows() as usize; + if row_group_size == 0 { + return None; + } + + // translate `selected_row` into row groups selection + selected_row.sort_unstable(); + let mut row_groups_selected = BTreeMap::new(); + for row_id in selected_row.iter() { + let row_group_id = row_id / row_group_size; + let rg_row_id = row_id % row_group_size; + + row_groups_selected + .entry(row_group_id) + .or_insert_with(Vec::new) + .push(rg_row_id); + } + let row_group = row_groups_selected + .into_iter() + .map(|(row_group_id, row_ids)| { + let mut current_row = 0; + let mut selection = vec![]; + for row_id in row_ids { + selection.push(RowSelector::skip(row_id - current_row)); + selection.push(RowSelector::select(1)); + current_row = row_id + 1; + } + + (row_group_id, Some(RowSelection::from(selection))) + }) + .collect(); + + // common_telemetry::info!("[DEBUG] row_group: {:?}", row_group); + + Some(row_group) + } } /// Parquet reader metrics.