From 568c10ad8072e82875b5d3fe1bd3f94bfca46ad4 Mon Sep 17 00:00:00 2001 From: zhyass Date: Fri, 17 Jan 2025 19:06:25 +0800 Subject: [PATCH] fix --- .../transforms/transform_compact_builder.rs | 20 ++++++++++- .../transform_compact_no_split_builder.rs | 3 +- .../src/interpreters/hook/compact_hook.rs | 7 ---- src/query/settings/src/settings_default.rs | 9 ++++- src/query/sql/src/planner/binder/ddl/table.rs | 36 +++++++++---------- .../mutation/mutator/recluster_mutator.rs | 9 ++++- .../storages/system/src/streams_table.rs | 2 +- 7 files changed, 55 insertions(+), 31 deletions(-) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs index 69e4c1b40056a..e31442bdc657c 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs @@ -14,7 +14,9 @@ use databend_common_exception::Result; use databend_common_expression::BlockThresholds; +use databend_common_expression::Column; use databend_common_expression::DataBlock; +use databend_common_expression::Value; use databend_common_pipeline_core::Pipeline; use crate::processors::AccumulatingTransform; @@ -62,8 +64,9 @@ impl AccumulatingTransform for BlockCompactBuilder { const NAME: &'static str = "BlockCompactBuilder"; fn transform(&mut self, data: DataBlock) -> Result> { + let data = data.consume_convert_to_full(); let num_rows = data.num_rows(); - let num_bytes = data.memory_size(); + let num_bytes = memory_size(&data); if !self.thresholds.check_for_compact(num_rows, num_bytes) { // holding slices of blocks to merge later may lead to oom, so @@ -112,3 +115,18 @@ impl AccumulatingTransform for BlockCompactBuilder { } } } + +pub(crate) fn memory_size(data_block: &DataBlock) -> usize { + data_block + .columns() + .iter() + .map(|entry| match &entry.value { + Value::Column(Column::Nullable(col)) if col.validity.true_count() == 0 => { + // For `Nullable` columns with no valid values, + // only the size of the validity bitmap is counted. + col.validity.as_slice().0.len() + } + _ => entry.memory_size(), + }) + .sum() +} diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs index 85215b0b58b6f..9773e0375b1be 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs @@ -78,8 +78,9 @@ impl AccumulatingTransform for BlockCompactNoSplitBuilder { const NAME: &'static str = "BlockCompactNoSplitBuilder"; fn transform(&mut self, data: DataBlock) -> Result> { + let data = data.consume_convert_to_full(); self.accumulated_rows += data.num_rows(); - self.accumulated_bytes += data.memory_size(); + self.accumulated_bytes += crate::processors::memory_size(&data); if !self .thresholds .check_large_enough(self.accumulated_rows, self.accumulated_bytes) diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index d33f9d22baa82..0218fa5399d2f 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use std::time::Instant; use databend_common_base::runtime::GlobalIORuntime; -use databend_common_base::runtime::GLOBAL_MEM_STAT; use databend_common_catalog::lock::LockTableOption; use databend_common_catalog::table::CompactionLimits; use databend_common_catalog::table_context::TableContext; @@ -204,12 +203,6 @@ async fn compact_table( &compact_target.database, &compact_target.table, )?; - let avail_memory_usage = settings.get_max_memory_usage()? - - GLOBAL_MEM_STAT.get_memory_usage().max(0) as u64; - let recluster_block_size = settings - .get_recluster_block_size()? - .min(avail_memory_usage * 30 / 100); - settings.set_recluster_block_size(recluster_block_size)?; ctx.set_enable_sort_spill(false); let recluster = RelOperator::Recluster(Recluster { catalog: compact_target.catalog, diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index e7d753dda0a61..aa3d2da50b073 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -128,6 +128,7 @@ impl DefaultSettings { Ok(Arc::clone(DEFAULT_SETTINGS.get_or_try_init(|| -> Result> { let num_cpus = Self::num_cpus(); let max_memory_usage = Self::max_memory_usage()?; + let recluster_block_size = Self::recluster_block_size(max_memory_usage); let default_max_spill_io_requests = Self::spill_io_requests(num_cpus); let default_max_storage_io_requests = Self::storage_io_requests(num_cpus); let data_retention_time_in_days_max = Self::data_retention_time_in_days_max(); @@ -814,7 +815,7 @@ impl DefaultSettings { range: Some(SettingRange::Numeric(0..=1)), }), ("recluster_block_size", DefaultSettingValue { - value: UserSettingValue::UInt64(100 * 1024 * 1024 * 1024), + value: UserSettingValue::UInt64(recluster_block_size), desc: "Sets the maximum byte size of blocks for recluster", mode: SettingMode::Both, scope: SettingScope::Both, @@ -1261,6 +1262,12 @@ impl DefaultSettings { }) } + fn recluster_block_size(max_memory_usage: u64) -> u64 { + // The sort merge consumes more than twice as much memory, + // so the block size is set relatively conservatively here. + std::cmp::min(max_memory_usage * 30 / 100, 80 * 1024 * 1024 * 1024) + } + /// Converts and validates a setting value based on its key. pub fn convert_value(k: String, v: String) -> Result<(String, UserSettingValue)> { // Retrieve the default settings instance diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 47aa44e6a1327..6a0b91f1081e7 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -16,7 +16,6 @@ use std::collections::BTreeMap; use std::collections::HashSet; use std::sync::Arc; -use chrono::Utc; use databend_common_ast::ast::AddColumnOption as AstAddColumnOption; use databend_common_ast::ast::AlterTableAction; use databend_common_ast::ast::AlterTableStmt; @@ -1144,14 +1143,13 @@ impl Binder { let sample_size = settings.get_hilbert_sample_size_per_block()?; let mut keys_bounds = Vec::with_capacity(cluster_key_strs.len()); let mut hilbert_keys = Vec::with_capacity(cluster_key_strs.len()); - let suffix = format!("{:08x}", Utc::now().timestamp()); for (index, cluster_key_str) in cluster_key_strs.into_iter().enumerate() { + keys_bounds.push(format!( + "range_bound({partitions}, {sample_size})({cluster_key_str}) AS bound_{index}" + )); hilbert_keys.push(format!( "hilbert_key(cast(ifnull(range_partition_id({table}.{cluster_key_str}, \ - _keys_bound._bound{index}_{suffix}), {partitions}) as uint16))" - )); - keys_bounds.push(format!( - "range_bound({partitions}, {sample_size})({cluster_key_str}) AS _bound{index}_{suffix}" + _keys_bound.bound_{index}), {partitions}) as uint16))" )); } let keys_bounds_str = keys_bounds.join(", "); @@ -1173,20 +1171,20 @@ impl Binder { let query = format!( "WITH _keys_bound AS ( \ - SELECT \ - {keys_bounds_str} \ - FROM {database}.{table} \ - ), \ - _source_data AS ( \ - SELECT \ - {output_with_table_str}, \ - hilbert_index([{hilbert_keys_str}], 2) AS _hilbert_index \ - FROM {database}.{table}, _keys_bound \ - ) \ SELECT \ - {output_str} \ - FROM _source_data \ - ORDER BY _hilbert_index" + {keys_bounds_str} \ + FROM {database}.{table} \ + ), \ + _source_data AS ( \ + SELECT \ + {output_with_table_str}, \ + hilbert_index([{hilbert_keys_str}], 2) AS _hilbert_index \ + FROM {database}.{table}, _keys_bound \ + ) \ + SELECT \ + {output_str} \ + FROM _source_data \ + ORDER BY _hilbert_index" ); let tokens = tokenize_sql(query.as_str())?; let (stmt, _) = parse_sql(&tokens, self.dialect)?; diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs index 005b0fe63fd35..866d8c9633367 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs @@ -20,6 +20,7 @@ use std::collections::HashSet; use std::sync::Arc; use databend_common_base::runtime::execute_futures_in_parallel; +use databend_common_base::runtime::GLOBAL_MEM_STAT; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::plan::ReclusterParts; @@ -195,7 +196,13 @@ impl ReclusterMutator { } } - let memory_threshold = self.ctx.get_settings().get_recluster_block_size()? as usize; + // Compute memory threshold and maximum number of blocks allowed for reclustering. + let settings = self.ctx.get_settings(); + let avail_memory_usage = + settings.get_max_memory_usage()? - GLOBAL_MEM_STAT.get_memory_usage().max(0) as u64; + let memory_threshold = settings + .get_recluster_block_size()? + .min(avail_memory_usage * 30 / 100) as usize; // specify a rather small value, so that `recluster_block_size` might be tuned to lower value. let max_blocks_num = (memory_threshold / self.block_thresholds.max_bytes_per_block).max(2) * self.max_tasks; diff --git a/src/query/storages/system/src/streams_table.rs b/src/query/storages/system/src/streams_table.rs index 9a30db09adf21..f752186b84fea 100644 --- a/src/query/storages/system/src/streams_table.rs +++ b/src/query/storages/system/src/streams_table.rs @@ -185,7 +185,7 @@ impl AsyncSystemTable for StreamsTable { let mut handlers = Vec::new(); for table in tables { - // If db1 is visible, do not means db1.table1 is visible. An user may have a grant about db1.table2, so db1 is visible + // If db1 is visible, do not mean db1.table1 is visible. A user may have a grant about db1.table2, so db1 is visible // for her, but db1.table1 may be not visible. So we need an extra check about table here after db visibility check. let t_id = table.get_id(); if visibility_checker.check_table_visibility(