Skip to content

Commit

Permalink
fix: hilbert clustering in cluster mode
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Jan 15, 2025
1 parent ee49bc8 commit 1351e6c
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ async fn compact_table(
&compact_target.database,
&compact_target.table,
)?;
let avail_memory_usage =
settings.get_max_memory_usage()? - GLOBAL_MEM_STAT.get_memory_usage().max(0) as u64;
let avail_memory_usage = settings.get_max_memory_usage()?
- GLOBAL_MEM_STAT.get_memory_usage().max(0) as u64;
let recluster_block_size = settings
.get_recluster_block_size()?
.min(avail_memory_usage * 30 / 100);
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/executor/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,8 @@ impl PhysicalPlan {
| PhysicalPlan::CompactSource(_)
| PhysicalPlan::ReplaceAsyncSourcer(_)
| PhysicalPlan::Recluster(_)
| PhysicalPlan::HilbertSerialize(_)
| PhysicalPlan::RecursiveCteScan(_) => Box::new(std::iter::empty()),
PhysicalPlan::HilbertSerialize(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::Filter(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::EvalScalar(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::AggregateExpand(plan) => Box::new(std::iter::once(plan.input.as_ref())),
Expand Down
8 changes: 7 additions & 1 deletion src/query/sql/src/executor/physical_plan_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ pub trait PhysicalPlanReplacer {
}

fn replace_hilbert_serialize(&mut self, plan: &HilbertSerialize) -> Result<PhysicalPlan> {
Ok(PhysicalPlan::HilbertSerialize(Box::new(plan.clone())))
let input = self.replace(&plan.input)?;
Ok(PhysicalPlan::HilbertSerialize(Box::new(HilbertSerialize {
plan_id: plan.plan_id,
input: Box::new(input),
table_info: plan.table_info.clone(),
})))
}

fn replace_table_scan(&mut self, plan: &TableScan) -> Result<PhysicalPlan> {
Expand Down Expand Up @@ -333,6 +338,7 @@ pub trait PhysicalPlanReplacer {
}

fn replace_exchange(&mut self, plan: &Exchange) -> Result<PhysicalPlan> {
println!("replace_exchange trait");
let input = self.replace(&plan.input)?;

Ok(PhysicalPlan::Exchange(Exchange {
Expand Down

0 comments on commit 1351e6c

Please sign in to comment.