Skip to content

Commit

Permalink
feat: compact and recluster add table lock (#15632)
Browse files Browse the repository at this point in the history
* compact and recluster add table lock

* fix test

* fix review commend
  • Loading branch information
zhyass authored May 28, 2024
1 parent cc33f17 commit 10d1952
Show file tree
Hide file tree
Showing 40 changed files with 281 additions and 274 deletions.
6 changes: 5 additions & 1 deletion src/query/catalog/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,9 @@ pub trait Lock: Sync + Send {

fn tenant_name(&self) -> &str;

async fn try_lock(&self, ctx: Arc<dyn TableContext>) -> Result<Option<LockGuard>>;
async fn try_lock(
&self,
ctx: Arc<dyn TableContext>,
should_retry: bool,
) -> Result<Option<LockGuard>>;
}
4 changes: 1 addition & 3 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use databend_storages_common_table_meta::meta::SnapshotId;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::table::ChangeType;

use crate::lock::Lock;
use crate::plan::DataSourceInfo;
use crate::plan::DataSourcePlan;
use crate::plan::PartStatistics;
Expand Down Expand Up @@ -339,10 +338,9 @@ pub trait Table: Sync + Send {
async fn compact_segments(
&self,
ctx: Arc<dyn TableContext>,
lock: Arc<dyn Lock>,
limit: Option<usize>,
) -> Result<()> {
let (_, _, _) = (ctx, lock, limit);
let (_, _) = (ctx, limit);

Err(ErrorCode::Unimplemented(format!(
"The operation 'compact_segments' is not supported for the table '{}', which is using the '{}' engine.",
Expand Down
3 changes: 2 additions & 1 deletion src/query/ee/tests/it/inverted_index/index_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_common_catalog::table::TableExt;
use databend_common_exception::Result;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::schema::CreateTableIndexReq;
use databend_common_sql::plans::LockTableOption;
use databend_common_sql::plans::RefreshTableIndexPlan;
use databend_common_storages_fuse::io::read::InvertedIndexReader;
use databend_common_storages_fuse::io::MetaReaders;
Expand Down Expand Up @@ -86,7 +87,7 @@ async fn test_fuse_do_refresh_inverted_index() -> Result<()> {
table: fixture.default_table_name(),
index_name: index_name.clone(),
segment_locs: None,
need_lock: true,
lock_opt: LockTableOption::LockWithRetry,
};
let interpreter = RefreshTableIndexInterpreter::try_create(ctx.clone(), refresh_index_plan)?;
let _ = interpreter.execute(ctx.clone()).await?;
Expand Down
3 changes: 2 additions & 1 deletion src/query/ee/tests/it/inverted_index/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use databend_common_expression::TableSchemaRefExt;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::schema::CreateTableIndexReq;
use databend_common_sql::plans::CreateTablePlan;
use databend_common_sql::plans::LockTableOption;
use databend_common_sql::plans::RefreshTableIndexPlan;
use databend_common_sql::BloomIndexColumns;
use databend_common_storages_fuse::pruning::create_segment_location_vector;
Expand Down Expand Up @@ -525,7 +526,7 @@ async fn test_block_pruner() -> Result<()> {
table: test_tbl_name.to_string(),
index_name: index_name.clone(),
segment_locs: None,
need_lock: true,
lock_opt: LockTableOption::LockWithRetry,
};
let interpreter = RefreshTableIndexInterpreter::try_create(ctx.clone(), refresh_index_plan)?;
let _ = interpreter.execute(ctx.clone()).await?;
Expand Down
13 changes: 7 additions & 6 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::plans::LockTableOption;
use databend_common_sql::plans::OptimizeTableAction;
use databend_common_sql::plans::OptimizeTablePlan;
use log::info;
Expand Down Expand Up @@ -52,10 +53,10 @@ pub async fn hook_compact(
pipeline: &mut Pipeline,
compact_target: CompactTargetTableDescription,
trace_ctx: CompactHookTraceCtx,
need_lock: bool,
lock_opt: LockTableOption,
) {
let op_name = trace_ctx.operation_name.clone();
if let Err(e) = do_hook_compact(ctx, pipeline, compact_target, trace_ctx, need_lock).await {
if let Err(e) = do_hook_compact(ctx, pipeline, compact_target, trace_ctx, lock_opt).await {
info!("compact hook ({}) with error (ignored): {}", op_name, e);
}
}
Expand All @@ -66,7 +67,7 @@ async fn do_hook_compact(
pipeline: &mut Pipeline,
compact_target: CompactTargetTableDescription,
trace_ctx: CompactHookTraceCtx,
need_lock: bool,
lock_opt: LockTableOption,
) -> Result<()> {
if pipeline.is_empty() {
return Ok(());
Expand Down Expand Up @@ -103,7 +104,7 @@ async fn do_hook_compact(
if err.is_ok() {
info!("execute {op_name} finished successfully. running table optimization job.");
match GlobalIORuntime::instance().block_on({
compact_table(ctx, compact_target, compaction_limits, need_lock)
compact_table(ctx, compact_target, compaction_limits, lock_opt)
}) {
Ok(_) => {
info!("execute {op_name} finished successfully. table optimization job finished.");
Expand All @@ -126,7 +127,7 @@ async fn compact_table(
ctx: Arc<QueryContext>,
compact_target: CompactTargetTableDescription,
compaction_limits: CompactionLimits,
need_lock: bool,
lock_opt: LockTableOption,
) -> Result<()> {
// evict the table from cache
ctx.evict_table_from_cache(
Expand All @@ -143,7 +144,7 @@ async fn compact_table(
table: compact_target.table,
action: OptimizeTableAction::CompactBlocks(compaction_limits.block_limit),
limit: compaction_limits.segment_limit,
need_lock,
lock_opt,
})?;

let mut build_res = optimize_interpreter.execute2().await?;
Expand Down
17 changes: 12 additions & 5 deletions src/query/service/src/interpreters/hook/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::Instant;
use databend_common_catalog::table_context::TableContext;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::plans::LockTableOption;
use log::info;
use log::warn;

Expand All @@ -35,7 +36,7 @@ pub struct HookOperator {
database: String,
table: String,
mutation_kind: MutationKind,
need_lock: bool,
lock_opt: LockTableOption,
}

impl HookOperator {
Expand All @@ -45,15 +46,15 @@ impl HookOperator {
database: String,
table: String,
mutation_kind: MutationKind,
need_lock: bool,
lock_opt: LockTableOption,
) -> Self {
Self {
ctx,
catalog,
database,
table,
mutation_kind,
need_lock,
lock_opt,
}
}

Expand Down Expand Up @@ -105,7 +106,7 @@ impl HookOperator {
pipeline,
compact_target,
trace_ctx,
self.need_lock,
self.lock_opt.clone(),
)
.await;
}
Expand All @@ -122,6 +123,12 @@ impl HookOperator {
table: self.table.to_owned(),
};

hook_refresh(self.ctx.clone(), pipeline, refresh_desc, self.need_lock).await;
hook_refresh(
self.ctx.clone(),
pipeline,
refresh_desc,
self.lock_opt.clone(),
)
.await;
}
}
17 changes: 11 additions & 6 deletions src/query/service/src/interpreters/hook/refresh_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use databend_common_meta_app::schema::ListIndexesByIdReq;
use databend_common_meta_app::schema::ListVirtualColumnsReq;
use databend_common_meta_types::MetaId;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::plans::LockTableOption;
use databend_common_sql::plans::Plan;
use databend_common_sql::plans::RefreshIndexPlan;
use databend_common_sql::plans::RefreshTableIndexPlan;
Expand Down Expand Up @@ -57,7 +58,7 @@ pub async fn hook_refresh(
ctx: Arc<QueryContext>,
pipeline: &mut Pipeline,
desc: RefreshDesc,
need_lock: bool,
lock_opt: LockTableOption,
) {
if pipeline.is_empty() {
return;
Expand All @@ -66,7 +67,7 @@ pub async fn hook_refresh(
pipeline.set_on_finished(move |(_profiles, err)| {
if err.is_ok() {
info!("execute pipeline finished successfully, starting run refresh job.");
match GlobalIORuntime::instance().block_on(do_refresh(ctx, desc, need_lock)) {
match GlobalIORuntime::instance().block_on(do_refresh(ctx, desc, lock_opt)) {
Ok(_) => {
info!("execute refresh job successfully.");
}
Expand All @@ -79,7 +80,11 @@ pub async fn hook_refresh(
});
}

async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc, need_lock: bool) -> Result<()> {
async fn do_refresh(
ctx: Arc<QueryContext>,
desc: RefreshDesc,
lock_opt: LockTableOption,
) -> Result<()> {
let table = ctx
.get_table(&desc.catalog, &desc.database, &desc.table)
.await?;
Expand All @@ -99,7 +104,7 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc, need_lock: bool)

// Generate sync inverted indexes.
let inverted_index_plans =
generate_refresh_inverted_index_plan(ctx.clone(), &desc, table.clone(), need_lock).await?;
generate_refresh_inverted_index_plan(ctx.clone(), &desc, table.clone(), lock_opt).await?;
plans.extend_from_slice(&inverted_index_plans);

// Generate virtual columns.
Expand Down Expand Up @@ -263,7 +268,7 @@ async fn generate_refresh_inverted_index_plan(
ctx: Arc<QueryContext>,
desc: &RefreshDesc,
table: Arc<dyn Table>,
need_lock: bool,
lock_opt: LockTableOption,
) -> Result<Vec<Plan>> {
let segment_locs = ctx.get_segment_locations()?;
let mut plans = vec![];
Expand All @@ -279,7 +284,7 @@ async fn generate_refresh_inverted_index_plan(
table: desc.table.clone(),
index_name: index.name.clone(),
segment_locs: Some(segment_locs.clone()),
need_lock,
lock_opt: lock_opt.clone(),
};
plans.push(Plan::RefreshTableIndex(Box::new(plan)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::executor::physical_plans::TableScan;
use databend_common_sql::executor::table_read_plan::ToReadDataSourcePlan;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::plans::LockTableOption;
use databend_common_storage::StageFileInfo;
use databend_common_storages_stage::StageTable;
use log::debug;
Expand Down Expand Up @@ -392,7 +393,7 @@ impl Interpreter for CopyIntoTableInterpreter {
self.plan.database_name.to_string(),
self.plan.table_name.to_string(),
MutationKind::Insert,
true,
LockTableOption::LockNoRetry,
);
hook_operator.execute(&mut build_res.main_pipeline).await;
}
Expand Down
30 changes: 17 additions & 13 deletions src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use databend_common_sql::plans::BoundColumnRef;
use databend_common_sql::plans::ConstantExpr;
use databend_common_sql::plans::EvalScalar;
use databend_common_sql::plans::FunctionCall;
use databend_common_sql::plans::LockTableOption;
use databend_common_sql::plans::RelOperator;
use databend_common_sql::plans::ScalarItem;
use databend_common_sql::plans::SubqueryDesc;
Expand All @@ -67,7 +68,6 @@ use crate::interpreters::common::create_push_down_filters;
use crate::interpreters::HookOperator;
use crate::interpreters::Interpreter;
use crate::interpreters::SelectInterpreter;
use crate::locks::LockManager;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelinePullingExecutor;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -110,21 +110,26 @@ impl Interpreter for DeleteInterpreter {
let is_distributed = !self.ctx.get_cluster().is_empty();

let catalog_name = self.plan.catalog_name.as_str();
let catalog = self.ctx.get_catalog(catalog_name).await?;
let catalog_info = catalog.info();

let db_name = self.plan.database_name.as_str();
let tbl_name = self.plan.table_name.as_str();
let tbl = catalog
.get_table(&self.ctx.get_tenant(), db_name, tbl_name)
.await?;

// Add table lock.
let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;
let lock_guard = self
.ctx
.clone()
.acquire_table_lock(
catalog_name,
db_name,
tbl_name,
&LockTableOption::LockWithRetry,
)
.await?;

// refresh table.
let tbl = tbl.refresh(self.ctx.as_ref()).await?;
let catalog = self.ctx.get_catalog(catalog_name).await?;
let catalog_info = catalog.info();
let tbl = catalog
.get_table(&self.ctx.get_tenant(), db_name, tbl_name)
.await?;

// check mutability
tbl.check_mutable()?;
Expand Down Expand Up @@ -301,7 +306,7 @@ impl Interpreter for DeleteInterpreter {
tbl_name.to_string(),
MutationKind::Delete,
// table lock has been added, no need to check.
false,
LockTableOption::NoLock,
);
hook_operator
.execute_refresh(&mut build_res.main_pipeline)
Expand Down Expand Up @@ -354,7 +359,6 @@ impl DeleteInterpreter {
mutation_kind: MutationKind::Delete,
update_stream_meta: vec![],
merge_meta,
need_lock: false,
deduplicated_label: None,
plan_id: u32::MAX,
}));
Expand Down
5 changes: 3 additions & 2 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use databend_common_sql::executor::PhysicalPlanBuilder;
use databend_common_sql::plans::insert::InsertValue;
use databend_common_sql::plans::Insert;
use databend_common_sql::plans::InsertInputSource;
use databend_common_sql::plans::LockTableOption;
use databend_common_sql::plans::Plan;
use databend_common_sql::NameResolutionContext;

Expand Down Expand Up @@ -276,7 +277,7 @@ impl Interpreter for InsertInterpreter {
self.plan.database.clone(),
self.plan.table.clone(),
MutationKind::Insert,
true,
LockTableOption::LockNoRetry,
);
hook_operator.execute(&mut build_res.main_pipeline).await;
}
Expand Down Expand Up @@ -311,7 +312,7 @@ impl Interpreter for InsertInterpreter {
self.plan.database.clone(),
self.plan.table.clone(),
MutationKind::Insert,
true,
LockTableOption::LockNoRetry,
);
hook_operator.execute(&mut build_res.main_pipeline).await;
}
Expand Down
Loading

0 comments on commit 10d1952

Please sign in to comment.