Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Jan 19, 2025
1 parent 2a0f999 commit d24c269
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,14 @@ impl AccumulatingTransform for BlockCompactBuilder {
}

pub(crate) fn memory_size(data_block: &DataBlock) -> usize {
let num_rows = data_block.num_rows();
data_block
.columns()
.iter()
.map(|entry| {
if let Value::Column(Column::Nullable(col)) = &entry.value {
if col.validity.true_count() == 0 {
return 0;
return num_rows;
}
}
entry.memory_size()
Expand Down
7 changes: 0 additions & 7 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::Arc;
use std::time::Instant;

use databend_common_base::runtime::GlobalIORuntime;
use databend_common_base::runtime::GLOBAL_MEM_STAT;
use databend_common_catalog::lock::LockTableOption;
use databend_common_catalog::table::CompactionLimits;
use databend_common_catalog::table_context::TableContext;
Expand Down Expand Up @@ -204,12 +203,6 @@ async fn compact_table(
&compact_target.database,
&compact_target.table,
)?;
let avail_memory_usage = settings.get_max_memory_usage()?
- GLOBAL_MEM_STAT.get_memory_usage().max(0) as u64;
let recluster_block_size = settings
.get_recluster_block_size()?
.min(avail_memory_usage * 30 / 100);
settings.set_recluster_block_size(recluster_block_size)?;
ctx.set_enable_sort_spill(false);
let recluster = RelOperator::Recluster(Recluster {
catalog: compact_target.catalog,
Expand Down
9 changes: 8 additions & 1 deletion src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl DefaultSettings {
Ok(Arc::clone(DEFAULT_SETTINGS.get_or_try_init(|| -> Result<Arc<DefaultSettings>> {
let num_cpus = Self::num_cpus();
let max_memory_usage = Self::max_memory_usage()?;
let recluster_block_size = Self::recluster_block_size(max_memory_usage);
let default_max_spill_io_requests = Self::spill_io_requests(num_cpus);
let default_max_storage_io_requests = Self::storage_io_requests(num_cpus);
let data_retention_time_in_days_max = Self::data_retention_time_in_days_max();
Expand Down Expand Up @@ -814,7 +815,7 @@ impl DefaultSettings {
range: Some(SettingRange::Numeric(0..=1)),
}),
("recluster_block_size", DefaultSettingValue {
value: UserSettingValue::UInt64(100 * 1024 * 1024 * 1024),
value: UserSettingValue::UInt64(recluster_block_size),
desc: "Sets the maximum byte size of blocks for recluster",
mode: SettingMode::Both,
scope: SettingScope::Both,
Expand Down Expand Up @@ -1261,6 +1262,12 @@ impl DefaultSettings {
})
}

fn recluster_block_size(max_memory_usage: u64) -> u64 {
// The sort merge consumes more than twice as much memory,
// so the block size is set relatively conservatively here.
std::cmp::min(max_memory_usage * 30 / 100, 80 * 1024 * 1024 * 1024)
}

/// Converts and validates a setting value based on its key.
pub fn convert_value(k: String, v: String) -> Result<(String, UserSettingValue)> {
// Retrieve the default settings instance
Expand Down
36 changes: 17 additions & 19 deletions src/query/sql/src/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::BTreeMap;
use std::collections::HashSet;
use std::sync::Arc;

use chrono::Utc;
use databend_common_ast::ast::AddColumnOption as AstAddColumnOption;
use databend_common_ast::ast::AlterTableAction;
use databend_common_ast::ast::AlterTableStmt;
Expand Down Expand Up @@ -1144,14 +1143,13 @@ impl Binder {
let sample_size = settings.get_hilbert_sample_size_per_block()?;
let mut keys_bounds = Vec::with_capacity(cluster_key_strs.len());
let mut hilbert_keys = Vec::with_capacity(cluster_key_strs.len());
let suffix = format!("{:08x}", Utc::now().timestamp());
for (index, cluster_key_str) in cluster_key_strs.into_iter().enumerate() {
keys_bounds.push(format!(
"range_bound({partitions}, {sample_size})({cluster_key_str}) AS bound_{index}"
));
hilbert_keys.push(format!(
"hilbert_key(cast(ifnull(range_partition_id({table}.{cluster_key_str}, \
_keys_bound._bound{index}_{suffix}), {partitions}) as uint16))"
));
keys_bounds.push(format!(
"range_bound({partitions}, {sample_size})({cluster_key_str}) AS _bound{index}_{suffix}"
_keys_bound.bound_{index}), {partitions}) as uint16))"
));
}
let keys_bounds_str = keys_bounds.join(", ");
Expand All @@ -1173,20 +1171,20 @@ impl Binder {

let query = format!(
"WITH _keys_bound AS ( \
SELECT \
{keys_bounds_str} \
FROM {database}.{table} \
), \
_source_data AS ( \
SELECT \
{output_with_table_str}, \
hilbert_index([{hilbert_keys_str}], 2) AS _hilbert_index \
FROM {database}.{table}, _keys_bound \
) \
SELECT \
{output_str} \
FROM _source_data \
ORDER BY _hilbert_index"
{keys_bounds_str} \
FROM {database}.{table} \
), \
_source_data AS ( \
SELECT \
{output_with_table_str}, \
hilbert_index([{hilbert_keys_str}], 2) AS _hilbert_index \
FROM {database}.{table}, _keys_bound \
) \
SELECT \
{output_str} \
FROM _source_data \
ORDER BY _hilbert_index"
);
let tokens = tokenize_sql(query.as_str())?;
let (stmt, _) = parse_sql(&tokens, self.dialect)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::collections::HashSet;
use std::sync::Arc;

use databend_common_base::runtime::execute_futures_in_parallel;
use databend_common_base::runtime::GLOBAL_MEM_STAT;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::plan::PartitionsShuffleKind;
use databend_common_catalog::plan::ReclusterParts;
Expand Down Expand Up @@ -195,7 +196,12 @@ impl ReclusterMutator {
}
}

let memory_threshold = self.ctx.get_settings().get_recluster_block_size()? as usize;
let settings = self.ctx.get_settings();
let avail_memory_usage =
settings.get_max_memory_usage()? - GLOBAL_MEM_STAT.get_memory_usage().max(0) as u64;
let memory_threshold = settings
.get_recluster_block_size()?
.min(avail_memory_usage * 30 / 100) as usize;
// specify a rather small value, so that `recluster_block_size` might be tuned to lower value.
let max_blocks_num =
(memory_threshold / self.block_thresholds.max_bytes_per_block).max(2) * self.max_tasks;
Expand Down

0 comments on commit d24c269

Please sign in to comment.