Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: recluster unclustered blocks #15623

Merged
merged 4 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,14 @@ impl OptimizeTableInterpreter {
.await?
{
if !mutator.tasks.is_empty() {
let is_distributed = mutator.is_distributed();
let reclustered_block_count = mutator.recluster_blocks_count;
let physical_plan = build_recluster_physical_plan(
mutator.tasks,
table.get_table_info().clone(),
catalog.info(),
mutator.snapshot,
mutator.remained_blocks,
mutator.removed_segment_indexes,
mutator.removed_segment_summary,
is_distributed,
)?;

build_res =
Expand Down
90 changes: 50 additions & 40 deletions src/query/service/src/interpreters/interpreter_table_recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,17 @@ use databend_common_sql::executor::physical_plans::Exchange;
use databend_common_sql::executor::physical_plans::FragmentKind;
use databend_common_sql::executor::physical_plans::ReclusterSink;
use databend_common_sql::executor::physical_plans::ReclusterSource;
use databend_common_sql::executor::physical_plans::ReclusterTask;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::plans::LockTableOption;
use databend_common_storages_fuse::operations::ReclusterTasks;
use databend_common_storages_fuse::FuseTable;
use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;
use log::error;
use log::warn;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterClusteringHistory;
use crate::interpreters::OptimizeTableInterpreter;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -148,16 +147,15 @@ impl Interpreter for ReclusterTableInterpreter {
let mutator = mutator.unwrap();
if mutator.tasks.is_empty() {
break;
};
}
let is_distributed = mutator.is_distributed();
block_count += mutator.recluster_blocks_count;
let physical_plan = build_recluster_physical_plan(
mutator.tasks,
table.get_table_info().clone(),
catalog.info(),
mutator.snapshot,
mutator.remained_blocks,
mutator.removed_segment_indexes,
mutator.removed_segment_summary,
is_distributed,
)?;

let mut build_res =
Expand Down Expand Up @@ -217,44 +215,56 @@ impl Interpreter for ReclusterTableInterpreter {
}
}

#[allow(clippy::too_many_arguments)]
pub fn build_recluster_physical_plan(
tasks: Vec<ReclusterTask>,
tasks: ReclusterTasks,
table_info: TableInfo,
catalog_info: CatalogInfo,
snapshot: Arc<TableSnapshot>,
remained_blocks: Vec<Arc<BlockMeta>>,
removed_segment_indexes: Vec<usize>,
removed_segment_summary: Statistics,
is_distributed: bool,
) -> Result<PhysicalPlan> {
let is_distributed = tasks.len() > 1;
let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource {
tasks,
table_info: table_info.clone(),
catalog_info: catalog_info.clone(),
plan_id: u32::MAX,
}));
match tasks {
ReclusterTasks::Recluster {
tasks,
remained_blocks,
removed_segment_indexes,
removed_segment_summary,
} => {
let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource {
tasks,
table_info: table_info.clone(),
catalog_info: catalog_info.clone(),
plan_id: u32::MAX,
}));

if is_distributed {
root = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(root),
kind: FragmentKind::Merge,
keys: vec![],
allow_adjust_parallelism: true,
ignore_exchange: false,
});
if is_distributed {
root = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(root),
kind: FragmentKind::Merge,
keys: vec![],
allow_adjust_parallelism: true,
ignore_exchange: false,
});
}
let mut plan = PhysicalPlan::ReclusterSink(Box::new(ReclusterSink {
input: Box::new(root),
table_info,
catalog_info,
snapshot,
remained_blocks,
removed_segment_indexes,
removed_segment_summary,
plan_id: u32::MAX,
}));
plan.adjust_plan_id(&mut 0);
Ok(plan)
}
ReclusterTasks::Compact(parts) => OptimizeTableInterpreter::build_physical_plan(
parts,
table_info,
snapshot,
catalog_info,
is_distributed,
),
}
let mut plan = PhysicalPlan::ReclusterSink(Box::new(ReclusterSink {
input: Box::new(root),
table_info,
catalog_info,
snapshot,
remained_blocks,
removed_segment_indexes,
removed_segment_summary,
plan_id: u32::MAX,
}));
plan.adjust_plan_id(&mut 0);
Ok(plan)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;

use databend_common_base::base::tokio;
use databend_common_catalog::plan::PartInfoType;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::table::CompactionLimits;
use databend_common_catalog::table::Table;
use databend_common_exception::Result;
Expand All @@ -33,9 +34,11 @@ use databend_query::schedulers::build_query_pipeline_without_render_result_set;
use databend_query::sessions::QueryContext;
use databend_query::sessions::TableContext;
use databend_query::test_kits::*;
use databend_storages_common_table_meta::meta::Location;
use databend_storages_common_table_meta::meta::SegmentInfo;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;
use opendal::Operator;
use rand::thread_rng;
use rand::Rng;
use uuid::Uuid;
Expand Down Expand Up @@ -195,6 +198,7 @@ async fn test_safety() -> Result<()> {
threshold,
cluster_key_id,
5,
false,
)
.await?;

Expand Down Expand Up @@ -239,53 +243,72 @@ async fn test_safety() -> Result<()> {
eprintln!("no target select");
continue;
}
assert!(selections.partitions_type() != PartInfoType::LazyLevel);

let mut actual_blocks_number = 0;
let mut compact_segment_indices = HashSet::new();
let mut actual_block_ids = HashSet::new();
for part in selections.partitions.into_iter() {
let part = CompactBlockPartInfo::from_part(&part)?;
match part {
CompactBlockPartInfo::CompactExtraInfo(extra) => {
compact_segment_indices.insert(extra.segment_index);
compact_segment_indices.extend(extra.removed_segment_indexes.iter());
actual_blocks_number += extra.unchanged_blocks.len();
for b in &extra.unchanged_blocks {
actual_block_ids.insert(b.1.location.clone());
}
verify_compact_tasks(
ctx.get_application_level_data_operator()?.operator(),
selections,
locations,
HashSet::new(),
)
.await?;
}

Ok(())
}

pub async fn verify_compact_tasks(
dal: Operator,
parts: Partitions,
locations: Vec<Location>,
expected_segment_indices: HashSet<usize>,
) -> Result<()> {
assert!(parts.partitions_type() != PartInfoType::LazyLevel);

let mut actual_blocks_number = 0;
let mut compact_segment_indices = HashSet::new();
let mut actual_block_ids = HashSet::new();
for part in parts.partitions.into_iter() {
let part = CompactBlockPartInfo::from_part(&part)?;
match part {
CompactBlockPartInfo::CompactExtraInfo(extra) => {
compact_segment_indices.insert(extra.segment_index);
compact_segment_indices.extend(extra.removed_segment_indexes.iter());
actual_blocks_number += extra.unchanged_blocks.len();
for b in &extra.unchanged_blocks {
actual_block_ids.insert(b.1.location.clone());
}
CompactBlockPartInfo::CompactTaskInfo(task) => {
compact_segment_indices.insert(task.index.segment_idx);
actual_blocks_number += task.blocks.len();
for b in &task.blocks {
actual_block_ids.insert(b.location.clone());
}
}
CompactBlockPartInfo::CompactTaskInfo(task) => {
compact_segment_indices.insert(task.index.segment_idx);
actual_blocks_number += task.blocks.len();
for b in &task.blocks {
actual_block_ids.insert(b.location.clone());
}
}
}
}

eprintln!("compact_segment_indices: {:?}", compact_segment_indices);
let mut except_blocks_number = 0;
let mut except_block_ids = HashSet::new();
for idx in compact_segment_indices.into_iter() {
let loc = locations.get(idx).unwrap();
let compact_segment = SegmentsIO::read_compact_segment(
ctx.get_application_level_data_operator()?.operator(),
loc.clone(),
TestFixture::default_table_schema(),
false,
)
.await?;
let segment = SegmentInfo::try_from(compact_segment)?;
except_blocks_number += segment.blocks.len();
for b in &segment.blocks {
except_block_ids.insert(b.location.clone());
}
eprintln!("compact_segment_indices: {:?}", compact_segment_indices);
if !expected_segment_indices.is_empty() {
assert_eq!(expected_segment_indices, compact_segment_indices);
}
let mut except_blocks_number = 0;
let mut except_block_ids = HashSet::new();
for idx in compact_segment_indices.into_iter() {
let loc = locations.get(idx).unwrap();
let compact_segment = SegmentsIO::read_compact_segment(
dal.clone(),
loc.clone(),
TestFixture::default_table_schema(),
false,
)
.await?;
let segment = SegmentInfo::try_from(compact_segment)?;
except_blocks_number += segment.blocks.len();
for b in &segment.blocks {
except_block_ids.insert(b.location.clone());
}
assert_eq!(except_blocks_number, actual_blocks_number);
assert_eq!(except_block_ids, actual_block_ids);
}

assert_eq!(except_blocks_number, actual_blocks_number);
assert_eq!(except_block_ids, actual_block_ids);
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ mod deletion;
mod recluster_mutator;
mod segments_compact_mutator;

pub use block_compact_mutator::verify_compact_tasks;
pub use segments_compact_mutator::compact_segment;
pub use segments_compact_mutator::CompactSegmentTestFixture;
Loading
Loading