From 1351e6cb3fa443a6f3ff6c2918f2cb6df4458f98 Mon Sep 17 00:00:00 2001 From: zhyass Date: Wed, 15 Jan 2025 20:19:52 +0800 Subject: [PATCH] fix: hilbert clustering in cluster mode --- src/query/service/src/interpreters/hook/compact_hook.rs | 4 ++-- src/query/sql/src/executor/physical_plan.rs | 2 +- src/query/sql/src/executor/physical_plan_visitor.rs | 8 +++++++- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index 1d0bf692a543e..d33f9d22baa82 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -204,8 +204,8 @@ async fn compact_table( &compact_target.database, &compact_target.table, )?; - let avail_memory_usage = - settings.get_max_memory_usage()? - GLOBAL_MEM_STAT.get_memory_usage().max(0) as u64; + let avail_memory_usage = settings.get_max_memory_usage()? + - GLOBAL_MEM_STAT.get_memory_usage().max(0) as u64; let recluster_block_size = settings .get_recluster_block_size()? .min(avail_memory_usage * 30 / 100); diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index bb7240f586abd..1e544adcd7b61 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -584,8 +584,8 @@ impl PhysicalPlan { | PhysicalPlan::CompactSource(_) | PhysicalPlan::ReplaceAsyncSourcer(_) | PhysicalPlan::Recluster(_) - | PhysicalPlan::HilbertSerialize(_) | PhysicalPlan::RecursiveCteScan(_) => Box::new(std::iter::empty()), + PhysicalPlan::HilbertSerialize(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::Filter(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::EvalScalar(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::AggregateExpand(plan) => Box::new(std::iter::once(plan.input.as_ref())), diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index 5cdccd69f9e91..6072f9cad1329 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -128,7 +128,12 @@ pub trait PhysicalPlanReplacer { } fn replace_hilbert_serialize(&mut self, plan: &HilbertSerialize) -> Result { - Ok(PhysicalPlan::HilbertSerialize(Box::new(plan.clone()))) + let input = self.replace(&plan.input)?; + Ok(PhysicalPlan::HilbertSerialize(Box::new(HilbertSerialize { + plan_id: plan.plan_id, + input: Box::new(input), + table_info: plan.table_info.clone(), + }))) } fn replace_table_scan(&mut self, plan: &TableScan) -> Result { @@ -333,6 +338,7 @@ pub trait PhysicalPlanReplacer { } fn replace_exchange(&mut self, plan: &Exchange) -> Result { + println!("replace_exchange trait"); let input = self.replace(&plan.input)?; Ok(PhysicalPlan::Exchange(Exchange {