Skip to content

Commit

Permalink
Merge branch 'main' into feature_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass authored May 25, 2024
2 parents 09cadfb + a7bc773 commit 9a1562b
Show file tree
Hide file tree
Showing 72 changed files with 4,632 additions and 2,203 deletions.
44 changes: 14 additions & 30 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,13 @@ use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::anyerror::AnyError;
use databend_common_meta_types::protobuf as pb;
use databend_common_meta_types::txn_op::Request;
use databend_common_meta_types::txn_op_response::Response;
use databend_common_meta_types::ConditionResult;
use databend_common_meta_types::InvalidReply;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MatchSeqExt;
use databend_common_meta_types::MetaAPIError;
use databend_common_meta_types::MetaDataError;
use databend_common_meta_types::MetaDataReadError;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaId;
use databend_common_meta_types::MetaNetworkError;
Expand Down Expand Up @@ -2288,10 +2284,13 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let seq_names = self.mget_kv(&id_name_kv_keys).await?;
let mut table_names = Vec::with_capacity(table_ids.len());

// None means table_name not found, maybe immutable table id. Ignore it
for seq_name in seq_names.into_iter().flatten() {
let name_ident: DBIdTableName = deserialize_struct(&seq_name.data)?;
table_names.push(Some(name_ident.table_name));
for seq_name in seq_names {
if let Some(seq_name) = seq_name {
let name_ident: DBIdTableName = deserialize_struct(&seq_name.data)?;
table_names.push(Some(name_ident.table_name));
} else {
table_names.push(None);
}
}

let mut meta_kv_keys = Vec::with_capacity(table_ids.len());
Expand All @@ -2301,15 +2300,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}

let seq_metas = self.mget_kv(&meta_kv_keys).await?;
if seq_metas.len() != table_names.len() {
return Err(KVAppError::MetaError(MetaError::APIError(
MetaAPIError::DataError(MetaDataError::ReadError(MetaDataReadError::new(
"mget_table_names_by_ids",
"",
&AnyError::error("The system is experiencing high load, please retry later"),
))),
)));
}
for (i, seq_meta_opt) in seq_metas.iter().enumerate() {
if let Some(seq_meta) = seq_meta_opt {
let table_meta: TableMeta = deserialize_struct(&seq_meta.data)?;
Expand Down Expand Up @@ -2362,10 +2352,13 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
// If multi drop/create db the capacity may not same
let mut db_names = Vec::with_capacity(db_ids.len());

// None means db_name not found, maybe immutable database id. Ignore it
for seq_name in seq_names.into_iter().flatten() {
let name_ident: DatabaseNameIdentRaw = deserialize_struct(&seq_name.data)?;
db_names.push(Some(name_ident.database_name().to_string()));
for seq_name in seq_names {
if let Some(seq_name) = seq_name {
let name_ident: DatabaseNameIdentRaw = deserialize_struct(&seq_name.data)?;
db_names.push(Some(name_ident.database_name().to_string()));
} else {
db_names.push(None);
}
}

let mut meta_kv_keys = Vec::with_capacity(db_ids.len());
Expand All @@ -2375,15 +2368,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}

let seq_metas = self.mget_kv(&meta_kv_keys).await?;
if seq_metas.len() != db_names.len() {
return Err(KVAppError::MetaError(MetaError::APIError(
MetaAPIError::DataError(MetaDataError::ReadError(MetaDataReadError::new(
"mget_table_names_by_ids",
"",
&AnyError::error("The system is experiencing high load, please retry later"),
))),
)));
}
for (i, seq_meta_opt) in seq_metas.iter().enumerate() {
if let Some(seq_meta) = seq_meta_opt {
let db_meta: DatabaseMeta = deserialize_struct(&seq_meta.data)?;
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/src/pipelines/builders/builder_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ impl PipelineBuilder {
self.main_pipeline.get_scopes(),
);
right_side_builder.cte_state = self.cte_state.clone();
right_side_builder.hash_join_states = self.hash_join_states.clone();

let mut right_res = right_side_builder.finalize(&range_join.right)?;
right_res.main_pipeline.add_sink(|input| {
Ok(ProcessorPtr::create(
Expand All @@ -104,6 +106,10 @@ impl PipelineBuilder {
merge_into_is_distributed,
enable_merge_into_optimization,
)?;
if let Some((build_cache_index, _)) = join.build_side_cache_info {
self.hash_join_states
.insert(build_cache_index, state.clone());
}
self.expand_build_side_pipeline(&join.build, join, state.clone())?;
self.build_join_probe(join, state)
}
Expand All @@ -122,6 +128,7 @@ impl PipelineBuilder {
&join.probe_to_build,
merge_into_is_distributed,
enable_merge_into_optimization,
join.build_side_cache_info.clone(),
)
}

Expand All @@ -139,6 +146,7 @@ impl PipelineBuilder {
self.main_pipeline.get_scopes(),
);
build_side_builder.cte_state = self.cte_state.clone();
build_side_builder.hash_join_states = self.hash_join_states.clone();
let mut build_res = build_side_builder.finalize(build)?;

assert!(build_res.main_pipeline.is_pulling_pipeline()?);
Expand Down Expand Up @@ -267,6 +275,7 @@ impl PipelineBuilder {
self.main_pipeline.get_scopes(),
);
left_side_builder.cte_state = self.cte_state.clone();
left_side_builder.hash_join_states = self.hash_join_states.clone();
let mut left_side_pipeline = left_side_builder.finalize(left_side)?;
assert!(left_side_pipeline.main_pipeline.is_pulling_pipeline()?);

Expand Down
65 changes: 65 additions & 0 deletions src/query/service/src/pipelines/builders/builder_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,28 @@
// limitations under the License.

use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_sources::OneBlockSource;
use databend_common_sql::evaluator::BlockOperator;
use databend_common_sql::evaluator::CompoundBlockOperator;
use databend_common_sql::executor::physical_plans::CacheScan;
use databend_common_sql::executor::physical_plans::ConstantTableScan;
use databend_common_sql::executor::physical_plans::CteScan;
use databend_common_sql::executor::physical_plans::ExpressionScan;
use databend_common_sql::executor::physical_plans::TableScan;
use databend_common_sql::plans::CacheSource;
use databend_common_sql::StreamContext;

use crate::pipelines::processors::transforms::CacheSourceState;
use crate::pipelines::processors::transforms::HashJoinCacheState;
use crate::pipelines::processors::transforms::MaterializedCteSource;
use crate::pipelines::processors::transforms::TransformAddInternalColumns;
use crate::pipelines::processors::transforms::TransformCacheScan;
use crate::pipelines::processors::transforms::TransformExpressionScan;
use crate::pipelines::processors::TransformAddStreamColumns;
use crate::pipelines::PipelineBuilder;

Expand Down Expand Up @@ -115,4 +124,60 @@ impl PipelineBuilder {
1,
)
}

pub(crate) fn build_cache_scan(&mut self, scan: &CacheScan) -> Result<()> {
let max_threads = self.settings.get_max_threads()?;
let max_block_size = self.settings.get_max_block_size()? as usize;
let cache_source_state = match &scan.cache_source {
CacheSource::HashJoinBuild((cache_index, column_indexes)) => {
let hash_join_state = match self.hash_join_states.get(cache_index) {
Some(hash_join_state) => hash_join_state.clone(),
None => {
return Err(ErrorCode::Internal(
"Hash join state not found during building cache scan".to_string(),
));
}
};
CacheSourceState::HashJoinCacheState(HashJoinCacheState::new(
column_indexes.clone(),
hash_join_state,
max_block_size,
))
}
};

self.main_pipeline.add_source(
|output| {
TransformCacheScan::create(self.ctx.clone(), output, cache_source_state.clone())
},
max_threads as usize,
)
}

pub(crate) fn build_expression_scan(&mut self, scan: &ExpressionScan) -> Result<()> {
self.build_pipeline(&scan.input)?;

let values = scan
.values
.iter()
.map(|row| {
row.iter()
.map(|scalar| scalar.as_expr(&BUILTIN_FUNCTIONS))
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();

let fun_ctx = self.func_ctx.clone();

self.main_pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(TransformExpressionScan::create(
input,
output,
values.clone(),
fun_ctx.clone(),
)))
})?;

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl PipelineBuilder {
self.main_pipeline.get_scopes(),
);
pipeline_builder.cte_state = self.cte_state.clone();
pipeline_builder.hash_join_states = self.hash_join_states.clone();

let mut build_res = pipeline_builder.finalize(input)?;

Expand Down
8 changes: 8 additions & 0 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use databend_common_sql::IndexType;
use super::PipelineBuilderData;
use crate::pipelines::processors::transforms::HashJoinBuildState;
use crate::pipelines::processors::transforms::MaterializedCteState;
use crate::pipelines::processors::HashJoinState;
use crate::pipelines::PipelineBuildResult;
use crate::servers::flight::v1::exchange::DefaultExchangeInjector;
use crate::servers::flight::v1::exchange::ExchangeInjector;
Expand All @@ -52,6 +53,8 @@ pub struct PipelineBuilder {
pub cte_state: HashMap<IndexType, Arc<MaterializedCteState>>,

pub(crate) exchange_injector: Arc<dyn ExchangeInjector>,

pub hash_join_states: HashMap<usize, Arc<HashJoinState>>,
}

impl PipelineBuilder {
Expand All @@ -71,6 +74,7 @@ impl PipelineBuilder {
cte_state: HashMap::new(),
merge_into_probe_data_fields: None,
join_state: None,
hash_join_states: HashMap::new(),
}
}

Expand Down Expand Up @@ -157,6 +161,10 @@ impl PipelineBuilder {
PhysicalPlan::MaterializedCte(materialized_cte) => {
self.build_materialized_cte(materialized_cte)
}
PhysicalPlan::CacheScan(cache_scan) => self.build_cache_scan(cache_scan),
PhysicalPlan::ExpressionScan(expression_scan) => {
self.build_expression_scan(expression_scan)
}

// Copy into.
PhysicalPlan::CopyIntoTable(copy) => self.build_copy_into_table(copy),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::cell::SyncUnsafeCell;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI8;
Expand All @@ -26,6 +27,7 @@ use databend_common_base::base::tokio::sync::watch::Sender;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::BlockEntry;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::HashMethodFixedKeys;
use databend_common_expression::HashMethodSerializer;
Expand All @@ -44,6 +46,7 @@ use crate::pipelines::processors::transforms::hash_join::row::RowSpace;
use crate::pipelines::processors::transforms::hash_join::util::build_schema_wrap_nullable;
use crate::pipelines::processors::HashJoinDesc;
use crate::sessions::QueryContext;
use crate::sql::IndexType;

pub struct SerializerHashJoinHashTable {
pub(crate) hash_table: BinaryHashJoinHashMap,
Expand Down Expand Up @@ -120,6 +123,12 @@ pub struct HashJoinState {
pub(crate) enable_spill: bool,

pub(crate) merge_into_state: Option<SyncUnsafeCell<MergeIntoState>>,

/// Build side cache info.
/// A HashMap for mapping the column indexes to the BlockEntry indexes in DataBlock.
pub(crate) column_map: HashMap<usize, usize>,
// The index of the next cache block to be read.
pub(crate) next_cache_block_index: AtomicUsize,
}

impl HashJoinState {
Expand All @@ -131,6 +140,7 @@ impl HashJoinState {
probe_to_build: &[(usize, (bool, bool))],
merge_into_is_distributed: bool,
enable_merge_into_optimization: bool,
build_side_cache_info: Option<(usize, HashMap<IndexType, usize>)>,
) -> Result<Arc<HashJoinState>> {
if matches!(
hash_join_desc.join_type,
Expand All @@ -144,6 +154,11 @@ impl HashJoinState {
if ctx.get_settings().get_join_spilling_memory_ratio()? != 0 {
enable_spill = true;
}
let column_map = if let Some((_, column_map)) = build_side_cache_info {
column_map
} else {
HashMap::new()
};
Ok(Arc::new(HashJoinState {
hash_table: SyncUnsafeCell::new(HashJoinHashTable::Null),
hash_table_builders: AtomicUsize::new(0),
Expand All @@ -166,6 +181,8 @@ impl HashJoinState {
merge_into_is_distributed,
)),
},
column_map,
next_cache_block_index: AtomicUsize::new(0),
}))
}

Expand Down Expand Up @@ -256,4 +273,36 @@ impl HashJoinState {
}
build_state.generation_state.is_build_projected = true;
}

pub fn num_build_chunks(&self) -> usize {
let build_state = unsafe { &*self.build_state.get() };
build_state.generation_state.chunks.len()
}

pub fn get_cached_columns(&self, column_index: usize) -> Vec<BlockEntry> {
let index = self.column_map.get(&column_index).unwrap();
let build_state = unsafe { &*self.build_state.get() };
let columns = build_state
.generation_state
.chunks
.iter()
.map(|data_block| data_block.get_by_offset(*index).clone())
.collect::<Vec<_>>();
columns
}

pub fn get_cached_num_rows(&self) -> Vec<usize> {
let build_state = unsafe { &*self.build_state.get() };
let num_rows = build_state
.generation_state
.chunks
.iter()
.map(|data_block| data_block.num_rows())
.collect::<Vec<_>>();
num_rows
}

pub fn next_cache_block_index(&self) -> usize {
self.next_cache_block_index.fetch_add(1, Ordering::Relaxed)
}
}
6 changes: 6 additions & 0 deletions src/query/service/src/pipelines/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ mod transform_add_computed_columns;
mod transform_add_const_columns;
mod transform_add_internal_columns;
mod transform_add_stream_columns;
mod transform_cache_scan;
mod transform_cast_schema;
mod transform_create_sets;
mod transform_expression_scan;
mod transform_filter;
mod transform_limit;
mod transform_materialized_cte;
Expand All @@ -47,8 +49,12 @@ pub use transform_add_computed_columns::TransformAddComputedColumns;
pub use transform_add_const_columns::TransformAddConstColumns;
pub use transform_add_internal_columns::TransformAddInternalColumns;
pub use transform_add_stream_columns::TransformAddStreamColumns;
pub use transform_cache_scan::CacheSourceState;
pub use transform_cache_scan::HashJoinCacheState;
pub use transform_cache_scan::TransformCacheScan;
pub use transform_cast_schema::TransformCastSchema;
pub use transform_create_sets::TransformCreateSets;
pub use transform_expression_scan::TransformExpressionScan;
pub use transform_filter::TransformFilter;
pub use transform_limit::TransformLimit;
pub use transform_materialized_cte::MaterializedCteSink;
Expand Down
Loading

0 comments on commit 9a1562b

Please sign in to comment.