Skip to content

Commit

Permalink
create index
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Apr 16, 2024
1 parent 64941d8 commit 182e22d
Show file tree
Hide file tree
Showing 12 changed files with 567 additions and 1 deletion.
300 changes: 299 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ prost.workspace = true
regex.workspace = true
regex-automata.workspace = true
snafu.workspace = true
tantivy = "0.22"

[dev-dependencies]
rand.workspace = true
Expand Down
17 changes: 17 additions & 0 deletions src/index/src/full_text_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2024 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod create;
pub mod error;
pub mod search;
79 changes: 79 additions & 0 deletions src/index/src/full_text_index/create.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2024 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::path::Path;

use snafu::ResultExt;
use tantivy::schema::{OwnedValue, Schema, INDEXED, STORED, TEXT};
use tantivy::{Document, Index, IndexWriter, TantivyDocument};

use super::error::TantivySnafu;
use crate::full_text_index::error::Result;

pub struct FullTextIndexCreater {
index: Index,
writer: IndexWriter,
count_field: tantivy::schema::Field,
text_field: tantivy::schema::Field,

row_count: usize,
segment_size: usize,
}

impl FullTextIndexCreater {
pub fn new<P>(segment_size: usize, path: P) -> Result<Self>
where
P: AsRef<Path>,
{
// build schema
let mut schema_builder = Schema::builder();
let count_field = schema_builder.add_i64_field("seg_count", INDEXED | STORED);
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();

// build index
let index = Index::create_in_dir(path, schema).context(TantivySnafu)?;

// build writer
// 100 MB
let writer = index.writer(100_000_000).context(TantivySnafu)?;

Ok(Self {
index,
writer,
count_field,
text_field,
row_count: 0,
segment_size,
})
}

pub fn push_string(&mut self, content: String) -> Result<()> {
let mut doc = TantivyDocument::new();
doc.add_text(self.text_field, content);
doc.add_i64(self.count_field, (self.row_count / self.segment_size) as _);
self.writer.add_document(doc).context(TantivySnafu)?;
self.row_count += 1;

self.writer.commit().context(TantivySnafu)?;
Ok(())
}

pub fn finish(&mut self) -> Result<()> {
self.row_count = 0;
self.writer.commit().context(TantivySnafu)?;
Ok(())
}
}
45 changes: 45 additions & 0 deletions src/index/src/full_text_index/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2024 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use tantivy::directory::error::OpenDirectoryError;

#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Tantivy error"))]
Tantivy {
#[snafu(source)]
error: tantivy::TantivyError,
location: Location,
},

#[snafu(display("Failed to open directory"))]
OpenDirectory {
#[snafu(source)]
error: OpenDirectoryError,
location: Location,
},

#[snafu(display("Failed to parse tantivy query"))]
ParseQuery {
#[snafu(source)]
error: tantivy::query::QueryParserError,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
74 changes: 74 additions & 0 deletions src/index/src/full_text_index/search.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2024 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::path::Path;

use snafu::ResultExt;
use tantivy::directory::MmapDirectory;
use tantivy::query::QueryParser;
use tantivy::schema::Value;
use tantivy::{Index, IndexReader, TantivyDocument, TantivyError};

use super::error::ParseQuerySnafu;
use crate::full_text_index::error::{OpenDirectorySnafu, Result, TantivySnafu};

pub struct FullTextIndexSearcher {
index: Index,
count_field: tantivy::schema::Field,
text_field: tantivy::schema::Field,
reader: IndexReader,
}

impl FullTextIndexSearcher {
pub fn open<P>(path: P) -> Result<Self>
where
P: AsRef<Path>,
{
let index = Index::open_in_dir(path).context(TantivySnafu)?;
let schema = index.schema();
let count_field = schema.get_field("seg_count").unwrap();
let text_field = schema.get_field("text").unwrap();
let reader = index.reader().context(TantivySnafu)?;

Ok(Self {
index,
count_field,
text_field,
reader,
})
}

pub fn search(&self, query: &str) -> Result<Vec<usize>> {
let searcher = self.reader.searcher();
let query_parser = QueryParser::for_index(&self.index, vec![self.text_field]);
let query = query_parser.parse_query(query).context(ParseQuerySnafu)?;
let top_docs = searcher
.search(&query, &tantivy::collector::TopDocs::with_limit(100))
.context(TantivySnafu)?;
let mut result = HashSet::new();
for (_score, doc_address) in top_docs {
let retrieved_doc = searcher
.doc::<TantivyDocument>(doc_address)
.context(TantivySnafu)?;
let seg_count = retrieved_doc
.get_first(self.count_field)
.unwrap()
.as_i64()
.unwrap();
result.insert(seg_count);
}
Ok(result.into_iter().map(|x| x as _).collect())
}
}
1 change: 1 addition & 0 deletions src/index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@

#![feature(iter_partition_in_place)]

pub mod full_text_index;
pub mod inverted_index;
5 changes: 5 additions & 0 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,11 @@ impl ScanRegion {
.flatten()
.map(Arc::new)
}

fn build_full_text_index_applier(&self) -> Option<SstIndexApplierRef> {
// start here
todo!()
}
}

/// Config for parallel scan.
Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ impl<'a> IndexerBuilder<'a> {
segment_row_count = row_group_size;
}

// find a column named "log"
let log_column_id = self.metadata.column_by_name("log").map(|c| c.column_id);

let creator = SstIndexCreator::new(
self.file_path,
self.file_id,
Expand All @@ -197,6 +200,7 @@ impl<'a> IndexerBuilder<'a> {
self.intermediate_manager,
self.mem_threshold_index_create,
segment_row_count,
log_column_id,
)
.with_buffer_size(self.write_buffer_size)
.with_ignore_column_ids(
Expand Down
36 changes: 36 additions & 0 deletions src/mito2/src/sst/index/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

use common_telemetry::warn;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::StringVector;
use index::full_text_index::create::FullTextIndexCreater;
use index::inverted_index::create::sort::external_sort::ExternalSorter;
use index::inverted_index::create::sort_create::SortIndexCreator;
use index::inverted_index::create::InvertedIndexCreator;
Expand All @@ -29,6 +32,7 @@ use object_store::ObjectStore;
use puffin::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ConcreteDataType;
use tokio::io::duplex;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};

Expand Down Expand Up @@ -83,6 +87,10 @@ pub struct SstIndexCreator {

/// The memory usage of the index creator.
memory_usage: Arc<AtomicUsize>,

// experimental full text index
full_text_index_creater: FullTextIndexCreater,
log_column_id: Option<u32>,
}

impl SstIndexCreator {
Expand All @@ -96,6 +104,7 @@ impl SstIndexCreator {
intermediate_manager: IntermediateManager,
memory_usage_threshold: Option<usize>,
segment_row_count: NonZeroUsize,
log_column_id: Option<u32>,
) -> Self {
let temp_file_provider = Arc::new(TempFileProvider::new(
IntermediateLocation::new(&metadata.region_id, &sst_file_id),
Expand All @@ -112,6 +121,10 @@ impl SstIndexCreator {
);
let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));

let full_text_index_path = format!("{file_path}/full_text_index");
let full_text_index_creater =
FullTextIndexCreater::new(segment_row_count.get(), full_text_index_path).unwrap();

let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns());
Self {
file_path,
Expand All @@ -127,6 +140,9 @@ impl SstIndexCreator {

ignore_column_ids: HashSet::default(),
memory_usage,

full_text_index_creater,
log_column_id,
}
}

Expand Down Expand Up @@ -233,6 +249,23 @@ impl SstIndexCreator {
.context(PushIndexValueSnafu)?;
}

// try find column named "log" and update it into full text index
if let Some(log_column_id) = self.log_column_id {
for col in batch.fields() {
if col.column_id == log_column_id {
let vector = &col.data;
if vector.data_type() == ConcreteDataType::string_datatype() {
let vector = vector.as_any().downcast_ref::<StringVector>().unwrap();
for content in vector.iter_data() {
self.full_text_index_creater
.push_string(content.unwrap_or_default().to_string())
.unwrap();
}
}
}
}
}

Ok(())
}

Expand Down Expand Up @@ -296,6 +329,8 @@ impl SstIndexCreator {
_ => {}
}

self.full_text_index_creater.finish().unwrap();

let byte_count = puffin_writer.finish().await.context(PuffinFinishSnafu)?;
guard.inc_byte_count(byte_count);
Ok(())
Expand Down Expand Up @@ -421,6 +456,7 @@ mod tests {
intm_mgr,
memory_threshold,
NonZeroUsize::new(segment_row_count).unwrap(),
None,
);

for (str_tag, i32_tag) in &tags {
Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/sst/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub fn index_file_path(region_dir: &str, sst_file_id: FileId) -> String {
util::join_path(&dir, &sst_file_id.as_puffin())
}

pub fn full_text_index_path(region_dir: &str) -> String {
util::join_dir(region_dir, "full_text_index")
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ impl ParquetReaderBuilder {
.await
.or_else(|| self.prune_row_groups_by_minmax(read_format, parquet_meta, metrics))
.unwrap_or_else(|| (0..num_row_groups).map(|i| (i, None)).collect())

// todo: change here
}

/// Applies index to prune row groups.
Expand Down

0 comments on commit 182e22d

Please sign in to comment.