Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add setting compact_max_block_selection #15641

Merged
merged 1 commit into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,12 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("compact_max_block_selection", DefaultSettingValue {
value: UserSettingValue::UInt64(10000),
desc: "Limits the maximum number of blocks that can be selected during a compact operation.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(2..=u64::MAX)),
}),
("enable_distributed_recluster", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Enable distributed execution of table recluster.",
Expand Down
8 changes: 8 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,14 @@ impl Settings {
self.try_get_u64("recluster_block_size")
}

pub fn set_compact_max_block_selection(&self, val: u64) -> Result<()> {
self.try_set_u64("compact_max_block_selection", val)
}

pub fn get_compact_max_block_selection(&self) -> Result<u64> {
self.try_get_u64("compact_max_block_selection")
}

pub fn get_enable_distributed_recluster(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_distributed_recluster")? != 0)
}
Expand Down
1 change: 0 additions & 1 deletion src/query/storages/fuse/src/operations/mutation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,5 @@ pub use meta::*;
pub use mutator::*;
pub use processors::*;

pub static MAX_BLOCK_COUNT: usize = 10_000;
pub type SegmentIndex = usize;
pub type BlockIndex = usize;
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use crate::operations::mutation::CompactExtraInfo;
use crate::operations::mutation::CompactLazyPartInfo;
use crate::operations::mutation::CompactTaskInfo;
use crate::operations::mutation::SegmentIndex;
use crate::operations::mutation::MAX_BLOCK_COUNT;
use crate::operations::CompactOptions;
use crate::statistics::reducers::merge_statistics_mut;
use crate::statistics::sort_by_cluster_stats;
Expand Down Expand Up @@ -84,14 +83,19 @@ impl BlockCompactMutator {
let snapshot = self.compact_params.base_snapshot.clone();
let segment_locations = &snapshot.segments;
let number_segments = segment_locations.len();

let settings = self.ctx.get_settings();
let compact_max_block_selection = settings.get_compact_max_block_selection()? as usize;
let max_threads = settings.get_max_threads()? as usize;

let num_segment_limit = self
.compact_params
.num_segment_limit
.unwrap_or(number_segments);
let num_block_limit = self
.compact_params
.num_block_limit
.unwrap_or(MAX_BLOCK_COUNT);
.unwrap_or(compact_max_block_selection);

info!("block compaction limits: seg {num_segment_limit}, block {num_block_limit}");

Expand All @@ -112,7 +116,7 @@ impl BlockCompactMutator {
let mut segment_idx = 0;
let mut is_end = false;
let mut parts = Vec::new();
let chunk_size = self.ctx.get_settings().get_max_threads()? as usize * 4;
let chunk_size = max_threads * 4;
for chunk in segment_locations.chunks(chunk_size) {
// Read the segments information in parallel.
let mut segment_infos = segments_io
Expand Down Expand Up @@ -194,7 +198,6 @@ impl BlockCompactMutator {
metrics_inc_compact_block_build_lazy_part_milliseconds(elapsed_time.as_millis() as u64);

let cluster = self.ctx.get_cluster();
let max_threads = self.ctx.get_settings().get_max_threads()? as usize;
let partitions = if cluster.is_empty() || parts.len() < cluster.nodes.len() * max_threads {
// NOTE: The snapshot schema does not contain the stream column.
let column_ids = self
Expand Down
Loading