Skip to content

Commit

Permalink
chore: format the code about compact and add a time cost for bind tab…
Browse files Browse the repository at this point in the history
  • Loading branch information
BohuTANG authored and andylokandy committed Nov 27, 2023
1 parent 2084ef1 commit c28bda9
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 52 deletions.
35 changes: 19 additions & 16 deletions src/query/service/src/interpreters/interpreter_copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,24 +286,27 @@ impl Interpreter for CopyIntoTableInterpreter {
build_commit_data_pipeline(&self.ctx, &mut build_res.main_pipeline, &self.plan, &files)
.await?;

let compact_target = CompactTargetTableDescription {
catalog: self.plan.catalog_info.name_ident.catalog_name.clone(),
database: self.plan.database_name.clone(),
table: self.plan.table_name.clone(),
};
// Compact if 'enable_recluster_after_write' on.
{
let compact_target = CompactTargetTableDescription {
catalog: self.plan.catalog_info.name_ident.catalog_name.clone(),
database: self.plan.database_name.clone(),
table: self.plan.table_name.clone(),
};

let trace_ctx = CompactHookTraceCtx {
start,
operation_name: "copy_into_table".to_owned(),
};
let trace_ctx = CompactHookTraceCtx {
start,
operation_name: "copy_into_table".to_owned(),
};

hook_compact(
self.ctx.clone(),
&mut build_res.main_pipeline,
compact_target,
trace_ctx,
)
.await;
hook_compact(
self.ctx.clone(),
&mut build_res.main_pipeline,
compact_target,
trace_ctx,
)
.await;
}

// generate sync aggregating indexes if `enable_refresh_aggregating_index_after_write` on.
{
Expand Down
40 changes: 21 additions & 19 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,6 @@ impl Interpreter for MergeIntoInterpreter {
let handler = TableLockHandlerWrapper::instance(self.ctx.clone());
let mut heartbeat = handler.try_lock(self.ctx.clone(), table_info).await?;

// hook compact
let compact_target = CompactTargetTableDescription {
catalog: self.plan.catalog.clone(),
database: self.plan.database.clone(),
table: self.plan.table.clone(),
};

let compact_hook_trace_ctx = CompactHookTraceCtx {
start,
operation_name: "merge_into".to_owned(),
};

if build_res.main_pipeline.is_empty() {
heartbeat.shutdown().await?;
} else {
Expand All @@ -110,13 +98,27 @@ impl Interpreter for MergeIntoInterpreter {
});
}

hook_compact(
self.ctx.clone(),
&mut build_res.main_pipeline,
compact_target,
compact_hook_trace_ctx,
)
.await;
// Compact if 'enable_recluster_after_write' on.
{
let compact_target = CompactTargetTableDescription {
catalog: self.plan.catalog.clone(),
database: self.plan.database.clone(),
table: self.plan.table.clone(),
};

let compact_hook_trace_ctx = CompactHookTraceCtx {
start,
operation_name: "merge_into".to_owned(),
};

hook_compact(
self.ctx.clone(),
&mut build_res.main_pipeline,
compact_target,
compact_hook_trace_ctx,
)
.await;
}

Ok(build_res)
}
Expand Down
36 changes: 19 additions & 17 deletions src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,25 +100,27 @@ impl Interpreter for ReplaceInterpreter {
)?;
}

// hook compact
let compact_target = CompactTargetTableDescription {
catalog: self.plan.catalog.clone(),
database: self.plan.database.clone(),
table: self.plan.table.clone(),
};
// Compact if 'enable_recluster_after_write' on.
{
let compact_target = CompactTargetTableDescription {
catalog: self.plan.catalog.clone(),
database: self.plan.database.clone(),
table: self.plan.table.clone(),
};

let compact_hook_trace_ctx = CompactHookTraceCtx {
start,
operation_name: "replace_into".to_owned(),
};
let compact_hook_trace_ctx = CompactHookTraceCtx {
start,
operation_name: "replace_into".to_owned(),
};

hook_compact(
self.ctx.clone(),
&mut pipeline.main_pipeline,
compact_target,
compact_hook_trace_ctx,
)
.await;
hook_compact(
self.ctx.clone(),
&mut pipeline.main_pipeline,
compact_target,
compact_hook_trace_ctx,
)
.await;
}

Ok(pipeline)
}
Expand Down
5 changes: 5 additions & 0 deletions src/query/sql/src/planner/binder/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ use common_storages_stage::StageTable;
use common_storages_view::view_table::QUERY;
use common_users::UserApiProvider;
use dashmap::DashMap;
use log::info;
use parking_lot::RwLock;

use crate::binder::copy_into_table::resolve_file_location;
Expand Down Expand Up @@ -548,6 +549,8 @@ impl Binder {
alias: &Option<TableAlias>,
files_to_copy: Option<Vec<StageFileInfo>>,
) -> Result<(SExpr, BindContext)> {
let start = std::time::Instant::now();

let table = match stage_info.file_format_params {
FileFormatParams::Parquet(..) => {
let use_parquet2 = table_ctx.get_settings().get_use_parquet2()?;
Expand Down Expand Up @@ -658,6 +661,8 @@ impl Binder {
if let Some(alias) = alias {
bind_context.apply_table_alias(alias, &self.name_resolution_ctx)?;
}

info!("bind_stage_table cost: {:?}", start.elapsed());
Ok((s_expr, bind_context))
}

Expand Down

0 comments on commit c28bda9

Please sign in to comment.