From 275bf167fa8c8e35b46b49c759411876ece354e8 Mon Sep 17 00:00:00 2001
From: zhyass <mytesla@live.com>
Date: Wed, 22 May 2024 15:34:08 +0800
Subject: [PATCH] refactor recluster

---
 .../interpreter_table_optimize.rs             |   5 +-
 .../interpreter_table_recluster.rs            |  93 ++++---
 .../operations/mutation/recluster_mutator.rs  |  42 +--
 .../mutation/mutator/block_compact_mutator.rs |  56 ++--
 .../src/operations/mutation/mutator/mod.rs    |   2 +
 .../mutation/mutator/recluster_mutator.rs     | 260 +++++++++++++++---
 .../storages/fuse/src/operations/recluster.rs |  50 +---
 7 files changed, 332 insertions(+), 176 deletions(-)

diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs
index 58fca664fe5ba..47691042b17c6 100644
--- a/src/query/service/src/interpreters/interpreter_table_optimize.rs
+++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs
@@ -238,15 +238,14 @@ impl OptimizeTableInterpreter {
                 .await?
             {
                 if !mutator.tasks.is_empty() {
+                    let is_distributed = mutator.is_distributed();
                     let reclustered_block_count = mutator.recluster_blocks_count;
                     let physical_plan = build_recluster_physical_plan(
                         mutator.tasks,
                         table.get_table_info().clone(),
                         catalog.info(),
                         mutator.snapshot,
-                        mutator.remained_blocks,
-                        mutator.removed_segment_indexes,
-                        mutator.removed_segment_summary,
+                        is_distributed,
                         self.plan.need_lock,
                     )?;
 
diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs
index 316081ecc79cc..3e4f09cec6ae1 100644
--- a/src/query/service/src/interpreters/interpreter_table_recluster.rs
+++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs
@@ -29,17 +29,16 @@ use databend_common_sql::executor::physical_plans::Exchange;
 use databend_common_sql::executor::physical_plans::FragmentKind;
 use databend_common_sql::executor::physical_plans::ReclusterSink;
 use databend_common_sql::executor::physical_plans::ReclusterSource;
-use databend_common_sql::executor::physical_plans::ReclusterTask;
 use databend_common_sql::executor::PhysicalPlan;
+use databend_common_storages_fuse::operations::ReclusterTasks;
 use databend_common_storages_fuse::FuseTable;
-use databend_storages_common_table_meta::meta::BlockMeta;
-use databend_storages_common_table_meta::meta::Statistics;
 use databend_storages_common_table_meta::meta::TableSnapshot;
 use log::error;
 use log::warn;
 
 use crate::interpreters::Interpreter;
 use crate::interpreters::InterpreterClusteringHistory;
+use crate::interpreters::OptimizeTableInterpreter;
 use crate::locks::LockExt;
 use crate::locks::LockManager;
 use crate::pipelines::executor::ExecutorSettings;
@@ -149,16 +148,15 @@ impl Interpreter for ReclusterTableInterpreter {
             let mutator = mutator.unwrap();
             if mutator.tasks.is_empty() {
                 break;
-            };
+            }
+            let is_distributed = mutator.is_distributed();
             block_count += mutator.recluster_blocks_count;
             let physical_plan = build_recluster_physical_plan(
                 mutator.tasks,
                 table_info,
                 catalog.info(),
                 mutator.snapshot,
-                mutator.remained_blocks,
-                mutator.removed_segment_indexes,
-                mutator.removed_segment_summary,
+                is_distributed,
                 true,
             )?;
 
@@ -222,46 +220,59 @@ impl Interpreter for ReclusterTableInterpreter {
     }
 }
 
-#[allow(clippy::too_many_arguments)]
 pub fn build_recluster_physical_plan(
-    tasks: Vec<ReclusterTask>,
+    tasks: ReclusterTasks,
     table_info: TableInfo,
     catalog_info: CatalogInfo,
     snapshot: Arc<TableSnapshot>,
-    remained_blocks: Vec<Arc<BlockMeta>>,
-    removed_segment_indexes: Vec<usize>,
-    removed_segment_summary: Statistics,
+    is_distributed: bool,
     need_lock: bool,
 ) -> Result<PhysicalPlan> {
-    let is_distributed = tasks.len() > 1;
-    let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource {
-        tasks,
-        table_info: table_info.clone(),
-        catalog_info: catalog_info.clone(),
-        plan_id: u32::MAX,
-    }));
+    match tasks {
+        ReclusterTasks::Recluster {
+            tasks,
+            remained_blocks,
+            removed_segment_indexes,
+            removed_segment_summary,
+        } => {
+            let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource {
+                tasks,
+                table_info: table_info.clone(),
+                catalog_info: catalog_info.clone(),
+                plan_id: u32::MAX,
+            }));
 
-    if is_distributed {
-        root = PhysicalPlan::Exchange(Exchange {
-            plan_id: 0,
-            input: Box::new(root),
-            kind: FragmentKind::Merge,
-            keys: vec![],
-            allow_adjust_parallelism: true,
-            ignore_exchange: false,
-        });
+            if is_distributed {
+                root = PhysicalPlan::Exchange(Exchange {
+                    plan_id: 0,
+                    input: Box::new(root),
+                    kind: FragmentKind::Merge,
+                    keys: vec![],
+                    allow_adjust_parallelism: true,
+                    ignore_exchange: false,
+                });
+            }
+            let mut plan = PhysicalPlan::ReclusterSink(Box::new(ReclusterSink {
+                input: Box::new(root),
+                table_info,
+                catalog_info,
+                snapshot,
+                remained_blocks,
+                removed_segment_indexes,
+                removed_segment_summary,
+                plan_id: u32::MAX,
+                need_lock,
+            }));
+            plan.adjust_plan_id(&mut 0);
+            Ok(plan)
+        }
+        ReclusterTasks::Compact(parts) => OptimizeTableInterpreter::build_physical_plan(
+            parts,
+            table_info,
+            snapshot,
+            catalog_info,
+            is_distributed,
+            true,
+        ),
     }
-    let mut plan = PhysicalPlan::ReclusterSink(Box::new(ReclusterSink {
-        input: Box::new(root),
-        table_info,
-        catalog_info,
-        snapshot,
-        remained_blocks,
-        removed_segment_indexes,
-        removed_segment_summary,
-        plan_id: u32::MAX,
-        need_lock,
-    }));
-    plan.adjust_plan_id(&mut 0);
-    Ok(plan)
 }
diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs
index 6ea7f85924cb1..5e42ca1779d31 100644
--- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs
+++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs
@@ -28,6 +28,7 @@ use databend_common_expression::TableSchemaRef;
 use databend_common_storages_fuse::io::SegmentWriter;
 use databend_common_storages_fuse::io::TableMetaLocationGenerator;
 use databend_common_storages_fuse::operations::ReclusterMutator;
+use databend_common_storages_fuse::operations::ReclusterTasks;
 use databend_common_storages_fuse::pruning::create_segment_location_vector;
 use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut;
 use databend_common_storages_fuse::statistics::reducers::reduce_block_metas;
@@ -134,7 +135,7 @@ async fn test_recluster_mutator_block_select() -> Result<()> {
     )
     .await?;
 
-    let mut mutator = ReclusterMutator::try_create(
+    let mut mutator = ReclusterMutator::new(
         ctx,
         Arc::new(snapshot),
         schema,
@@ -142,11 +143,15 @@ async fn test_recluster_mutator_block_select() -> Result<()> {
         BlockThresholds::default(),
         cluster_key_id,
         1,
-    )?;
+        1000,
+    );
     let need_recluster = mutator.target_select(compact_segments).await?;
     assert!(need_recluster);
-    assert_eq!(mutator.tasks.len(), 1);
-    let total_block_nums = mutator.tasks.iter().map(|t| t.parts.len()).sum::<usize>();
+    let ReclusterTasks::Recluster { tasks, .. } = mutator.tasks else {
+        return Err(ErrorCode::Internal("Logical error, it's a bug"));
+    };
+    assert_eq!(tasks.len(), 1);
+    let total_block_nums = tasks.iter().map(|t| t.parts.len()).sum::<usize>();
     assert_eq!(total_block_nums, 3);
 
     Ok(())
@@ -247,7 +252,7 @@ async fn test_safety_for_recluster() -> Result<()> {
         .await?;
 
         let mut need_recluster = false;
-        let mut mutator = ReclusterMutator::try_create(
+        let mut mutator = ReclusterMutator::new(
             ctx.clone(),
             snapshot,
             schema.clone(),
@@ -255,16 +260,12 @@ async fn test_safety_for_recluster() -> Result<()> {
             threshold,
             cluster_key_id,
             max_tasks,
-        )?;
-        let selected_segs =
-            ReclusterMutator::select_segments(&compact_segments, block_per_seg, 8, cluster_key_id)?;
+            block_per_seg,
+        );
+        let selected_segs = mutator.select_segments(&compact_segments, 8)?;
         if selected_segs.is_empty() {
             for compact_segment in compact_segments.into_iter() {
-                if !ReclusterMutator::segment_can_recluster(
-                    &compact_segment.1.summary,
-                    block_per_seg,
-                    cluster_key_id,
-                ) {
+                if !mutator.segment_can_recluster(&compact_segment.1.summary) {
                     continue;
                 }
 
@@ -283,7 +284,15 @@ async fn test_safety_for_recluster() -> Result<()> {
 
         eprintln!("need_recluster: {}", need_recluster);
         if need_recluster {
-            let tasks = mutator.tasks;
+            let ReclusterTasks::Recluster {
+                tasks,
+                remained_blocks,
+                removed_segment_indexes,
+                ..
+            } = mutator.tasks
+            else {
+                return Err(ErrorCode::Internal("Logical error, it's a bug"));
+            };
             assert!(tasks.len() <= max_tasks && !tasks.is_empty());
             eprintln!("tasks_num: {}, max_tasks: {}", tasks.len(), max_tasks);
             let mut blocks = Vec::new();
@@ -296,10 +305,9 @@ async fn test_safety_for_recluster() -> Result<()> {
                 }
             }
 
-            let remained_blocks = std::mem::take(&mut mutator.remained_blocks);
             eprintln!(
                 "selected segments number {}, selected blocks number {}, remained blocks number {}",
-                mutator.removed_segment_indexes.len(),
+                removed_segment_indexes.len(),
                 blocks.len(),
                 remained_blocks.len()
             );
@@ -310,7 +318,7 @@ async fn test_safety_for_recluster() -> Result<()> {
             let block_ids_after_target = HashSet::from_iter(blocks.into_iter());
 
             let mut origin_blocks_ids = HashSet::new();
-            for idx in &mutator.removed_segment_indexes {
+            for idx in &removed_segment_indexes {
                 for b in &segment_infos[*idx].blocks {
                     origin_blocks_ids.insert(b.location.0.clone());
                 }
diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs
index bd2acdc77e8e9..4b789b5bea92f 100644
--- a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs
+++ b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs
@@ -144,7 +144,7 @@ impl BlockCompactMutator {
             for (segment_idx, compact_segment) in segment_infos.into_iter() {
                 let segments_vec = checker.add(segment_idx, compact_segment);
                 for segments in segments_vec {
-                    self.generate_part(segments, &mut parts, &mut checker);
+                    checker.generate_part(segments, &mut parts);
                 }
 
                 let residual_segment_cnt = checker.segments.len();
@@ -177,11 +177,7 @@ impl BlockCompactMutator {
         }
 
         // finalize the compaction.
-        self.generate_part(
-            std::mem::take(&mut checker.segments),
-            &mut parts,
-            &mut checker,
-        );
+        checker.finalize(&mut parts);
 
         // Status.
         let elapsed_time = start.elapsed();
@@ -295,28 +291,9 @@ impl BlockCompactMutator {
             }
         }
     }
-
-    fn generate_part(
-        &mut self,
-        segments: Vec<(SegmentIndex, Arc<CompactSegmentInfo>)>,
-        parts: &mut Vec<PartInfoPtr>,
-        checker: &mut SegmentCompactChecker,
-    ) {
-        if !segments.is_empty() && checker.check_for_compact(&segments) {
-            let mut segment_indices = Vec::with_capacity(segments.len());
-            let mut compact_segments = Vec::with_capacity(segments.len());
-            for (idx, segment) in segments.into_iter() {
-                segment_indices.push(idx);
-                compact_segments.push(segment);
-            }
-
-            let lazy_part = CompactLazyPartInfo::create(segment_indices, compact_segments);
-            parts.push(lazy_part);
-        }
-    }
 }
 
-struct SegmentCompactChecker {
+pub struct SegmentCompactChecker {
     segments: Vec<(SegmentIndex, Arc<CompactSegmentInfo>)>,
     total_block_count: u64,
     block_threshold: u64,
@@ -327,7 +304,7 @@ struct SegmentCompactChecker {
 }
 
 impl SegmentCompactChecker {
-    fn new(block_threshold: u64, cluster_key_id: Option<u32>) -> Self {
+    pub fn new(block_threshold: u64, cluster_key_id: Option<u32>) -> Self {
         Self {
             segments: vec![],
             total_block_count: 0,
@@ -361,7 +338,7 @@ impl SegmentCompactChecker {
         true
     }
 
-    fn add(
+    pub fn add(
         &mut self,
         idx: SegmentIndex,
         segment: Arc<CompactSegmentInfo>,
@@ -386,6 +363,29 @@ impl SegmentCompactChecker {
         self.segments.push((idx, segment));
         vec![std::mem::take(&mut self.segments)]
     }
+
+    pub fn generate_part(
+        &mut self,
+        segments: Vec<(SegmentIndex, Arc<CompactSegmentInfo>)>,
+        parts: &mut Vec<PartInfoPtr>,
+    ) {
+        if !segments.is_empty() && self.check_for_compact(&segments) {
+            let mut segment_indices = Vec::with_capacity(segments.len());
+            let mut compact_segments = Vec::with_capacity(segments.len());
+            for (idx, segment) in segments.into_iter() {
+                segment_indices.push(idx);
+                compact_segments.push(segment);
+            }
+
+            let lazy_part = CompactLazyPartInfo::create(segment_indices, compact_segments);
+            parts.push(lazy_part);
+        }
+    }
+
+    pub fn finalize(&mut self, parts: &mut Vec<PartInfoPtr>) {
+        let final_segments = std::mem::take(&mut self.segments);
+        self.generate_part(final_segments, parts);
+    }
 }
 
 struct CompactTaskBuilder {
diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/mod.rs b/src/query/storages/fuse/src/operations/mutation/mutator/mod.rs
index 7fd293d878035..0f63594424c08 100644
--- a/src/query/storages/fuse/src/operations/mutation/mutator/mod.rs
+++ b/src/query/storages/fuse/src/operations/mutation/mutator/mod.rs
@@ -17,7 +17,9 @@ mod recluster_mutator;
 mod segment_compact_mutator;
 
 pub use block_compact_mutator::BlockCompactMutator;
+pub use block_compact_mutator::SegmentCompactChecker;
 pub use recluster_mutator::ReclusterMutator;
+pub use recluster_mutator::ReclusterTasks;
 pub use segment_compact_mutator::SegmentCompactMutator;
 pub use segment_compact_mutator::SegmentCompactionState;
 pub use segment_compact_mutator::SegmentCompactor;
diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs
index ff7a536756768..b2b1737f37ace 100644
--- a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs
+++ b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs
@@ -19,6 +19,9 @@ use std::collections::HashSet;
 use std::sync::Arc;
 
 use databend_common_base::runtime::execute_futures_in_parallel;
+use databend_common_catalog::plan::Partitions;
+use databend_common_catalog::plan::PartitionsShuffleKind;
+use databend_common_catalog::table::Table;
 use databend_common_catalog::table_context::TableContext;
 use databend_common_exception::ErrorCode;
 use databend_common_exception::Result;
@@ -39,10 +42,50 @@ use minitrace::full_name;
 use minitrace::future::FutureExt;
 use minitrace::Span;
 
+use crate::operations::mutation::SegmentCompactChecker;
+use crate::operations::BlockCompactMutator;
+use crate::operations::CompactLazyPartInfo;
 use crate::statistics::reducers::merge_statistics_mut;
 use crate::table_functions::cmp_with_null;
 use crate::FuseTable;
 use crate::SegmentLocation;
+use crate::DEFAULT_AVG_DEPTH_THRESHOLD;
+use crate::DEFAULT_BLOCK_PER_SEGMENT;
+use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
+use crate::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD;
+
+#[derive(Clone)]
+pub enum ReclusterTasks {
+    Recluster {
+        tasks: Vec<ReclusterTask>,
+        remained_blocks: Vec<Arc<BlockMeta>>,
+        removed_segment_indexes: Vec<usize>,
+        removed_segment_summary: Statistics,
+    },
+    Compact(Partitions),
+}
+
+impl ReclusterTasks {
+    pub fn is_empty(&self) -> bool {
+        match self {
+            ReclusterTasks::Recluster { tasks, .. } => tasks.is_empty(),
+            ReclusterTasks::Compact(parts) => parts.is_empty(),
+        }
+    }
+
+    pub fn new_recluster_tasks() -> Self {
+        Self::Recluster {
+            tasks: vec![],
+            remained_blocks: vec![],
+            removed_segment_indexes: vec![],
+            removed_segment_summary: Statistics::default(),
+        }
+    }
+
+    pub fn new_compact_tasks() -> Self {
+        Self::Compact(Partitions::default())
+    }
+}
 
 #[derive(Clone)]
 pub struct ReclusterMutator {
@@ -52,17 +95,56 @@ pub struct ReclusterMutator {
     pub(crate) cluster_key_id: u32,
     pub(crate) schema: TableSchemaRef,
     pub(crate) max_tasks: usize,
+    pub(crate) block_per_seg: usize,
 
     pub snapshot: Arc<TableSnapshot>,
-    pub tasks: Vec<ReclusterTask>,
     pub recluster_blocks_count: u64,
-    pub remained_blocks: Vec<Arc<BlockMeta>>,
-    pub removed_segment_indexes: Vec<usize>,
-    pub removed_segment_summary: Statistics,
+    pub tasks: ReclusterTasks,
 }
 
 impl ReclusterMutator {
     pub fn try_create(
+        table: &FuseTable,
+        ctx: Arc<dyn TableContext>,
+        snapshot: Arc<TableSnapshot>,
+    ) -> Result<Self> {
+        let schema = table.schema_with_stream();
+        let cluster_key_id = table.cluster_key_meta.clone().unwrap().0;
+        let block_thresholds = table.get_block_thresholds();
+        let block_per_seg =
+            table.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT);
+
+        let avg_depth_threshold = table.get_option(
+            FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD,
+            DEFAULT_AVG_DEPTH_THRESHOLD,
+        );
+        let depth_threshold = (snapshot.summary.block_count as f64 * avg_depth_threshold)
+            .max(1.0)
+            .min(64.0);
+
+        let mut max_tasks = 1;
+        let cluster = ctx.get_cluster();
+        if !cluster.is_empty() && ctx.get_settings().get_enable_distributed_recluster()? {
+            max_tasks = cluster.nodes.len();
+        }
+
+        Ok(Self {
+            ctx,
+            schema,
+            depth_threshold,
+            block_thresholds,
+            cluster_key_id,
+            max_tasks,
+            block_per_seg,
+            snapshot,
+            recluster_blocks_count: 0,
+            tasks: ReclusterTasks::new_recluster_tasks(),
+        })
+    }
+
+    /// Used for tests.
+    #[allow(clippy::too_many_arguments)]
+    pub fn new(
         ctx: Arc<dyn TableContext>,
         snapshot: Arc<TableSnapshot>,
         schema: TableSchemaRef,
@@ -70,21 +152,20 @@ impl ReclusterMutator {
         block_thresholds: BlockThresholds,
         cluster_key_id: u32,
         max_tasks: usize,
-    ) -> Result<Self> {
-        Ok(Self {
+        block_per_seg: usize,
+    ) -> Self {
+        Self {
             ctx,
             schema,
             depth_threshold,
             block_thresholds,
             cluster_key_id,
             max_tasks,
+            block_per_seg,
             snapshot,
-            tasks: Vec::new(),
-            remained_blocks: Vec::new(),
             recluster_blocks_count: 0,
-            removed_segment_indexes: Vec::new(),
-            removed_segment_summary: Statistics::default(),
-        })
+            tasks: ReclusterTasks::new_recluster_tasks(),
+        }
     }
 
     #[async_backtrace::framed]
@@ -92,6 +173,20 @@ impl ReclusterMutator {
         &mut self,
         compact_segments: Vec<(SegmentLocation, Arc<CompactSegmentInfo>)>,
     ) -> Result<bool> {
+        match self.tasks {
+            ReclusterTasks::Compact(_) => self.generate_compact_tasks(compact_segments).await?,
+            ReclusterTasks::Recluster { .. } => {
+                self.generate_recluster_tasks(compact_segments).await?
+            }
+        }
+        Ok(self.tasks.is_empty())
+    }
+
+    #[async_backtrace::framed]
+    pub async fn generate_recluster_tasks(
+        &mut self,
+        compact_segments: Vec<(SegmentLocation, Arc<CompactSegmentInfo>)>,
+    ) -> Result<()> {
         let mut selected_segments = Vec::with_capacity(compact_segments.len());
         let mut selected_indices = Vec::with_capacity(compact_segments.len());
         let mut selected_statistics = Vec::with_capacity(compact_segments.len());
@@ -103,7 +198,7 @@ impl ReclusterMutator {
 
         let blocks_map = self.gather_block_map(selected_segments).await?;
         if blocks_map.is_empty() {
-            return Ok(false);
+            return Ok(());
         }
 
         let mem_info = sys_info::mem_info().map_err(ErrorCode::from_std_error)?;
@@ -121,6 +216,7 @@ impl ReclusterMutator {
         let column_nodes = ColumnNodes::new_from_schema(&arrow_schema, Some(&self.schema));
 
         let mut remained_blocks = Vec::new();
+        let mut tasks = Vec::new();
         let mut selected = false;
         for (level, block_metas) in blocks_map.into_iter() {
             let len = block_metas.len();
@@ -191,19 +287,19 @@ impl ReclusterMutator {
                 let block_size = block_meta.block_size as usize;
                 let row_count = block_meta.row_count as usize;
                 if task_bytes + block_size > memory_threshold && selected_blocks.len() > 1 {
-                    self.generate_task(
+                    tasks.push(self.generate_task(
                         &selected_blocks,
                         &column_nodes,
                         task_rows,
                         task_bytes,
                         level,
-                    );
+                    ));
 
                     task_rows = 0;
                     task_bytes = 0;
                     selected_blocks.clear();
 
-                    if self.tasks.len() >= self.max_tasks {
+                    if tasks.len() >= self.max_tasks {
                         remained_blocks.push(block_meta);
                         over_memory = true;
                         continue;
@@ -219,30 +315,86 @@ impl ReclusterMutator {
             match selected_blocks.len() {
                 0 => (),
                 1 => remained_blocks.push(selected_blocks[0].1.clone()),
-                _ => self.generate_task(
+                _ => tasks.push(self.generate_task(
                     &selected_blocks,
                     &column_nodes,
                     task_rows,
                     task_bytes,
                     level,
-                ),
+                )),
             }
 
             selected = true;
         }
 
         if selected {
-            self.remained_blocks = remained_blocks;
-
             selected_indices.sort_by(|a, b| b.cmp(a));
-            self.removed_segment_indexes = selected_indices;
 
             let default_cluster_key_id = Some(self.cluster_key_id);
+            let mut removed_segment_summary = Statistics::default();
             selected_statistics.iter().for_each(|v| {
-                merge_statistics_mut(&mut self.removed_segment_summary, v, default_cluster_key_id)
+                merge_statistics_mut(&mut removed_segment_summary, v, default_cluster_key_id)
             });
+            self.tasks = ReclusterTasks::Recluster {
+                tasks,
+                remained_blocks,
+                removed_segment_indexes: selected_indices,
+                removed_segment_summary,
+            };
+        }
+        Ok(())
+    }
+
+    async fn generate_compact_tasks(
+        &mut self,
+        compact_segments: Vec<(SegmentLocation, Arc<CompactSegmentInfo>)>,
+    ) -> Result<()> {
+        let mut parts = Vec::new();
+        let mut checker =
+            SegmentCompactChecker::new(self.block_per_seg as u64, Some(self.cluster_key_id));
+
+        for (loc, compact_segment) in compact_segments.into_iter() {
+            self.recluster_blocks_count += compact_segment.summary.block_count;
+            let segments_vec = checker.add(loc.segment_idx, compact_segment);
+            for segments in segments_vec {
+                checker.generate_part(segments, &mut parts);
+            }
+        }
+        // finalize the compaction.
+        checker.finalize(&mut parts);
+
+        let cluster = self.ctx.get_cluster();
+        let max_threads = self.ctx.get_settings().get_max_threads()? as usize;
+        let partitions = if cluster.is_empty() || parts.len() < cluster.nodes.len() * max_threads {
+            // NOTE: The snapshot schema does not contain the stream column.
+            let column_ids = self.snapshot.schema.to_leaf_column_id_set();
+            let lazy_parts = parts
+                .into_iter()
+                .map(|v| {
+                    v.as_any()
+                        .downcast_ref::<CompactLazyPartInfo>()
+                        .unwrap()
+                        .clone()
+                })
+                .collect::<Vec<_>>();
+            Partitions::create(
+                PartitionsShuffleKind::Mod,
+                BlockCompactMutator::build_compact_tasks(
+                    self.ctx.clone(),
+                    column_ids,
+                    Some(self.cluster_key_id),
+                    self.block_thresholds,
+                    lazy_parts,
+                )
+                .await?,
+            )
+        } else {
+            Partitions::create(PartitionsShuffleKind::Mod, parts)
+        };
+        if !partitions.is_empty() {
+            self.tasks = ReclusterTasks::Compact(partitions);
         }
-        Ok(selected)
+        Ok(())
     }
 
     fn generate_task(
@@ -252,35 +404,44 @@ impl ReclusterMutator {
         total_rows: usize,
         total_bytes: usize,
         level: i32,
-    ) {
+    ) -> ReclusterTask {
         let (stats, parts) =
             FuseTable::to_partitions(Some(&self.schema), block_metas, column_nodes, None, None);
-        let task = ReclusterTask {
+        self.recluster_blocks_count += block_metas.len() as u64;
+        ReclusterTask {
             parts,
             stats,
             total_rows,
             total_bytes,
             level,
-        };
-        self.tasks.push(task);
-        self.recluster_blocks_count += block_metas.len() as u64;
+        }
     }
 
     pub fn select_segments(
+        &mut self,
         compact_segments: &[(SegmentLocation, Arc<CompactSegmentInfo>)],
-        block_per_seg: usize,
         max_len: usize,
-        cluster_key_id: u32,
     ) -> Result<IndexSet<usize>> {
         let mut blocks_num = 0;
         let mut indices = IndexSet::new();
         let mut points_map: HashMap<Vec<Scalar>, (Vec<usize>, Vec<usize>)> = HashMap::new();
+        let mut unclustered_sg = IndexSet::new();
         for (i, (_, compact_segment)) in compact_segments.iter().enumerate() {
-            if !ReclusterMutator::segment_can_recluster(
-                &compact_segment.summary,
-                block_per_seg,
-                cluster_key_id,
-            ) {
+            let mut level = -1;
+            let clustered = compact_segment
+                .summary
+                .cluster_stats
+                .as_ref()
+                .is_some_and(|v| {
+                    level = v.level;
+                    v.cluster_key_id == self.cluster_key_id
+                });
+            if !clustered {
+                unclustered_sg.insert(i);
+                continue;
+            }
+
+            if level < 0 && (compact_segment.summary.block_count as usize) >= self.block_per_seg {
                 continue;
             }
 
@@ -298,21 +459,22 @@ impl ReclusterMutator {
             }
         }
 
-        if indices.len() < 2 || blocks_num < block_per_seg {
+        if !unclustered_sg.is_empty() {
+            self.tasks = ReclusterTasks::Compact(Partitions::default());
+            return Ok(unclustered_sg);
+        }
+
+        if indices.len() < 2 || blocks_num < self.block_per_seg {
             return Ok(indices);
         }
 
         ReclusterMutator::fetch_max_depth(points_map, 1.0, max_len)
     }
 
-    pub fn segment_can_recluster(
-        summary: &Statistics,
-        block_per_seg: usize,
-        cluster_key_id: u32,
-    ) -> bool {
+    pub fn segment_can_recluster(&self, summary: &Statistics) -> bool {
         if let Some(stats) = &summary.cluster_stats {
-            stats.cluster_key_id == cluster_key_id
-                && (stats.level >= 0 || (summary.block_count as usize) < block_per_seg)
+            stats.cluster_key_id == self.cluster_key_id
+                && (stats.level >= 0 || (summary.block_count as usize) < self.block_per_seg)
         } else {
             false
         }
@@ -472,4 +634,18 @@ impl ReclusterMutator {
         let set: HashSet<usize> = HashSet::from_iter(start.iter().chain(end.iter()).cloned());
         set.len() == 2
     }
+
+    pub fn is_distributed(&self) -> bool {
+        match &self.tasks {
+            ReclusterTasks::Recluster { tasks, .. } => tasks.len() > 1,
+            ReclusterTasks::Compact(_) => {
+                (!self.ctx.get_cluster().is_empty())
+                    && self
+                        .ctx
+                        .get_settings()
+                        .get_enable_distributed_compact()
+                        .unwrap_or(false)
+            }
+        }
+    }
 }
diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs
index 7fdd335582934..6d0535e239f37 100644
--- a/src/query/storages/fuse/src/operations/recluster.rs
+++ b/src/query/storages/fuse/src/operations/recluster.rs
@@ -35,10 +35,6 @@ use crate::pruning::PruningContext;
 use crate::pruning::SegmentPruner;
 use crate::FuseTable;
 use crate::SegmentLocation;
-use crate::DEFAULT_AVG_DEPTH_THRESHOLD;
-use crate::DEFAULT_BLOCK_PER_SEGMENT;
-use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
-use crate::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD;
 
 impl FuseTable {
     /// The flow of Pipeline is as follows:
@@ -88,39 +84,12 @@ impl FuseTable {
 
         let start = Instant::now();
 
-        let settings = ctx.get_settings();
-        let mut max_tasks = 1;
-        let cluster = ctx.get_cluster();
-        if !cluster.is_empty() && settings.get_enable_distributed_recluster()? {
-            max_tasks = cluster.nodes.len();
-        }
-
-        let schema = self.schema_with_stream();
-        let default_cluster_key_id = self.cluster_key_meta.clone().unwrap().0;
-        let block_thresholds = self.get_block_thresholds();
-        let block_per_seg =
-            self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT);
-        let avg_depth_threshold = self.get_option(
-            FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD,
-            DEFAULT_AVG_DEPTH_THRESHOLD,
-        );
-        let threshold = (snapshot.summary.block_count as f64 * avg_depth_threshold)
-            .max(1.0)
-            .min(64.0);
-        let mut mutator = ReclusterMutator::try_create(
-            ctx.clone(),
-            snapshot.clone(),
-            schema,
-            threshold,
-            block_thresholds,
-            default_cluster_key_id,
-            max_tasks,
-        )?;
+        let mut mutator = ReclusterMutator::try_create(self, ctx.clone(), snapshot.clone())?;
 
         let segment_locations = snapshot.segments.clone();
         let segment_locations = create_segment_location_vector(segment_locations, None);
 
-        let max_threads = settings.get_max_threads()? as usize;
+        let max_threads = ctx.get_settings().get_max_threads()? as usize;
         let limit = limit.unwrap_or(1000);
         // The default limit might be too small, which makes
         // the scanning of recluster candidates slow.
@@ -159,28 +128,19 @@ impl FuseTable {
             }
 
             // select the segments with the highest depth.
-            let selected_segs = ReclusterMutator::select_segments(
-                &compact_segments,
-                block_per_seg,
-                max_seg_num,
-                default_cluster_key_id,
-            )?;
+            let selected_segs = mutator.select_segments(&compact_segments, max_seg_num)?;
             // select the blocks with the highest depth.
             if selected_segs.is_empty() {
                 let mut selected_segs = vec![];
                 let mut block_count = 0;
                 for compact_segment in compact_segments.into_iter() {
-                    if !ReclusterMutator::segment_can_recluster(
-                        &compact_segment.1.summary,
-                        block_per_seg,
-                        default_cluster_key_id,
-                    ) {
+                    if !mutator.segment_can_recluster(&compact_segment.1.summary) {
                         continue;
                     }
 
                     block_count += compact_segment.1.summary.block_count as usize;
                     selected_segs.push(compact_segment);
-                    if block_count >= block_per_seg {
+                    if block_count >= mutator.block_per_seg {
                         let seg_num = selected_segs.len();
                         if mutator
                             .target_select(std::mem::take(&mut selected_segs))