Skip to content

feat(query): read write inverted index #14827

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 5, 2024
267 changes: 266 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/common/exception/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ paste = "1.0.9"
prost = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tantivy = "0.21.1"
thiserror = { workspace = true }
tonic = { workspace = true }
7 changes: 6 additions & 1 deletion src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,12 @@ build_exceptions! {
IllegalCloudControlMessageFormat(1703),

// Geometry errors.
GeometryError(1801)
GeometryError(1801),

// Tantivy errors.
TantivyError(1901),
TantivyOpenReadError(1902),
TantivyQueryParserError(1903)
}

// Meta service errors [2001, 3000].
Expand Down
18 changes: 18 additions & 0 deletions src/common/exception/src/exception_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,24 @@ impl From<GeozeroError> for ErrorCode {
}
}

impl From<tantivy::TantivyError> for ErrorCode {
fn from(error: tantivy::TantivyError) -> Self {
ErrorCode::TantivyError(error.to_string())
}
}

impl From<tantivy::directory::error::OpenReadError> for ErrorCode {
fn from(error: tantivy::directory::error::OpenReadError) -> Self {
ErrorCode::TantivyOpenReadError(error.to_string())
}
}

impl From<tantivy::query::QueryParserError> for ErrorCode {
fn from(error: tantivy::query::QueryParserError) -> Self {
ErrorCode::TantivyQueryParserError(error.to_string())
}
}

// === prost error ===
impl From<prost::EncodeError> for ErrorCode {
fn from(error: prost::EncodeError) -> Self {
Expand Down
1 change: 1 addition & 0 deletions src/query/ee/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ databend-common-users = { path = "../users" }
databend-enterprise-aggregating-index = { path = "../ee_features/aggregating_index" }
databend-enterprise-background-service = { path = "../ee_features/background_service" }
databend-enterprise-data-mask-feature = { path = "../ee_features/data_mask" }
databend-enterprise-inverted-index = { path = "../ee_features/inverted_index" }
databend-enterprise-storage-encryption = { path = "../ee_features/storage_encryption" }
databend-enterprise-stream-handler = { path = "../ee_features/stream_handler" }
databend-enterprise-vacuum-handler = { path = "../ee_features/vacuum_handler" }
Expand Down
2 changes: 2 additions & 0 deletions src/query/ee/src/enterprise_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_license::license_manager::LicenseManager;
use crate::aggregating_index::RealAggregatingIndexHandler;
use crate::background_service::RealBackgroundService;
use crate::data_mask::RealDatamaskHandler;
use crate::inverted_index::RealInvertedIndexHandler;
use crate::license::license_mgr::RealLicenseManager;
use crate::storage_encryption::RealStorageEncryptionHandler;
use crate::storages::fuse::operations::RealVacuumHandler;
Expand All @@ -37,6 +38,7 @@ impl EnterpriseServices {
RealBackgroundService::init(&cfg).await?;
RealVirtualColumnHandler::init()?;
RealStreamHandler::init()?;
RealInvertedIndexHandler::init()?;
Ok(())
}
}
113 changes: 113 additions & 0 deletions src/query/ee/src/inverted_index/indexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2023 Databend Cloud
//
// Licensed under the Elastic License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.elastic.co/licensing/elastic-license
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_catalog::plan::Projection;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::DataSchema;
use databend_common_storages_fuse::io::InvertedIndexWriter;
use databend_common_storages_fuse::io::MetaReaders;
use databend_common_storages_fuse::io::ReadSettings;
use databend_common_storages_fuse::FuseTable;
use databend_storages_common_cache::LoadParams;
use databend_storages_common_table_meta::meta::Location;

pub struct Indexer {}

impl Indexer {
pub(crate) fn new() -> Indexer {
Indexer {}
}

#[async_backtrace::framed]
pub(crate) async fn index(
&self,
fuse_table: &FuseTable,
ctx: Arc<dyn TableContext>,
schema: DataSchema,
segment_locs: Option<Vec<Location>>,
) -> Result<String> {
let snapshot_opt = fuse_table.read_table_snapshot().await?;
let snapshot = if let Some(val) = snapshot_opt {
val
} else {
// no snapshot
return Ok("".to_string());
};
if schema.fields.is_empty() {
// no field for index
return Ok("".to_string());
}

let table_schema = &fuse_table.get_table_info().meta.schema;

// Collect field indices used by inverted index.
let mut field_indices = Vec::new();
for field in &schema.fields {
let field_index = table_schema.index_of(field.name())?;
field_indices.push(field_index);
}

let projection = Projection::Columns(field_indices);
let block_reader =
fuse_table.create_block_reader(ctx.clone(), projection, false, false, false)?;

let segment_reader =
MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema.clone());

let settings = ReadSettings::from_ctx(&ctx)?;
let write_settings = fuse_table.get_write_settings();
let storage_format = write_settings.storage_format;

let operator = fuse_table.get_operator_ref();

// If no segment locations are specified, iterates through all segments
let segment_locs = if let Some(segment_locs) = segment_locs {
segment_locs
} else {
snapshot.segments.clone()
};

let mut index_writer = InvertedIndexWriter::try_create(schema)?;

for (location, ver) in segment_locs {
let segment_info = segment_reader
.read(&LoadParams {
location: location.to_string(),
len_hint: None,
ver,
put_cache: false,
})
.await?;

let block_metas = segment_info.block_metas()?;
for block_meta in block_metas {
let block = block_reader
.read_by_meta(&settings, &block_meta, &storage_format)
.await?;

index_writer.add_block(block)?;
}
}

let location_generator = fuse_table.meta_location_generator();

let index_location = index_writer.finalize(operator, location_generator).await?;
// TODO: add index location to meta
Ok(index_location)
}
}
52 changes: 52 additions & 0 deletions src/query/ee/src/inverted_index/inverted_index_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2023 Databend Cloud
//
// Licensed under the Elastic License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.elastic.co/licensing/elastic-license
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_base::base::GlobalInstance;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::DataSchema;
use databend_common_storages_fuse::FuseTable;
use databend_enterprise_inverted_index::InvertedIndexHandler;
use databend_enterprise_inverted_index::InvertedIndexHandlerWrapper;
use databend_storages_common_table_meta::meta::Location;

use super::indexer::Indexer;

pub struct RealInvertedIndexHandler {}

#[async_trait::async_trait]
impl InvertedIndexHandler for RealInvertedIndexHandler {
#[async_backtrace::framed]
async fn do_refresh_index(
&self,
fuse_table: &FuseTable,
ctx: Arc<dyn TableContext>,
schema: DataSchema,
segment_locs: Option<Vec<Location>>,
) -> Result<String> {
let indexer = Indexer::new();
indexer.index(fuse_table, ctx, schema, segment_locs).await
}
}

impl RealInvertedIndexHandler {
pub fn init() -> Result<()> {
let rm = RealInvertedIndexHandler {};
let wrapper = InvertedIndexHandlerWrapper::new(Box::new(rm));
GlobalInstance::set(Arc::new(wrapper));
Ok(())
}
}
17 changes: 17 additions & 0 deletions src/query/ee/src/inverted_index/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2023 Databend Cloud
//
// Licensed under the Elastic License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.elastic.co/licensing/elastic-license
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod indexer;
mod inverted_index_handler;
pub use inverted_index_handler::RealInvertedIndexHandler;
1 change: 1 addition & 0 deletions src/query/ee/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod aggregating_index;
pub mod background_service;
pub mod data_mask;
pub mod enterprise_services;
pub mod inverted_index;
pub mod license;
pub mod storage_encryption;
pub mod storages;
Expand Down
2 changes: 2 additions & 0 deletions src/query/ee/src/test_kits/mock_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_license::license_manager::LicenseManagerWrapper;

use crate::aggregating_index::RealAggregatingIndexHandler;
use crate::data_mask::RealDatamaskHandler;
use crate::inverted_index::RealInvertedIndexHandler;
use crate::license::RealLicenseManager;
use crate::storages::fuse::operations::RealVacuumHandler;
use crate::stream::RealStreamHandler;
Expand All @@ -40,6 +41,7 @@ impl MockServices {
RealDatamaskHandler::init()?;
RealVirtualColumnHandler::init()?;
RealStreamHandler::init()?;
RealInvertedIndexHandler::init()?;
Ok(())
}
}
74 changes: 74 additions & 0 deletions src/query/ee/tests/it/inverted_index/index_refresh.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2023 Databend Cloud
//
// Licensed under the Elastic License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.elastic.co/licensing/elastic-license
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_base::base::tokio;
use databend_common_exception::Result;
use databend_common_expression::DataSchema;
use databend_common_storages_fuse::io::read::InvertedIndexReader;
use databend_common_storages_fuse::FuseTable;
use databend_enterprise_inverted_index::get_inverted_index_handler;
use databend_enterprise_query::test_kits::context::EESetup;
use databend_query::test_kits::append_string_sample_data;
use databend_query::test_kits::*;

#[tokio::test(flavor = "multi_thread")]
async fn test_fuse_do_refresh_inverted_index() -> Result<()> {
let fixture = TestFixture::setup_with_custom(EESetup::new()).await?;

fixture
.default_session()
.get_settings()
.set_data_retention_time_in_days(0)?;
fixture.create_default_database().await?;
fixture.create_string_table().await?;

let number_of_block = 2;
append_string_sample_data(number_of_block, &fixture).await?;

let table = fixture.latest_default_table().await?;
let table_schema = table.schema();
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let dal = fuse_table.get_operator_ref();

let table_ctx = fixture.new_query_ctx().await?;
let schema = DataSchema::from(table_schema);

let handler = get_inverted_index_handler();
let location = handler
.do_refresh_index(fuse_table, table_ctx.clone(), schema.clone(), None)
.await?;

let index_reader = InvertedIndexReader::create(dal.clone(), schema);

let num = 5;
let query = "rust";
let docs = index_reader.do_read(location.clone(), query, num)?;
assert_eq!(docs.len(), 2);
assert_eq!(docs[0].1.doc_id, 0);
assert_eq!(docs[1].1.doc_id, 1);

let query = "java";
let docs = index_reader.do_read(location.clone(), query, num)?;
assert_eq!(docs.len(), 1);
assert_eq!(docs[0].1.doc_id, 2);

let query = "data";
let docs = index_reader.do_read(location, query, num)?;
assert_eq!(docs.len(), 3);
assert_eq!(docs[0].1.doc_id, 4);
assert_eq!(docs[1].1.doc_id, 1);
assert_eq!(docs[2].1.doc_id, 5);

Ok(())
}
15 changes: 15 additions & 0 deletions src/query/ee/tests/it/inverted_index/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2023 Databend Cloud
//
// Licensed under the Elastic License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.elastic.co/licensing/elastic-license
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod index_refresh;
1 change: 1 addition & 0 deletions src/query/ee/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
#![feature(unwrap_infallible)]
mod aggregating_index;
mod background_service;
mod inverted_index;
mod license;
mod storages;
Loading