Skip to content

feat: add kill query when extend lock failure #13980

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 1 addition & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ members = [
"src/query/storages/common/cache",
"src/query/storages/common/cache_manager",
"src/query/storages/common/index",
"src/query/storages/common/locks",
"src/query/storages/common/pruner",
"src/query/storages/common/table_meta",
"src/query/storages/delta",
Expand Down
1 change: 1 addition & 0 deletions src/common/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub use crate::metrics::cluster;
/// Metrics.
pub use crate::metrics::http;
pub use crate::metrics::interpreter;
pub use crate::metrics::lock;
pub use crate::metrics::mysql;
pub use crate::metrics::openai;
pub use crate::metrics::session;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

use std::sync::LazyLock;

use common_meta_app::schema::LockType;
use common_metrics::register_counter;
use common_metrics::register_counter_family;
use common_metrics::Counter;
use common_metrics::Family;
use common_metrics::VecLabels;
use crate::register_counter;
use crate::register_counter_family;
use crate::Counter;
use crate::Family;
use crate::VecLabels;

const METRIC_CREATED_LOCK_NUMS: &str = "created_lock_nums";
const METRIC_ACQUIRED_LOCK_NUMS: &str = "acquired_lock_nums";
Expand All @@ -38,17 +37,17 @@ static SHUTDOWN_LOCK_HOLDER_NUMS: LazyLock<Counter> =
const LABEL_TYPE: &str = "type";
const LABEL_TABLE_ID: &str = "table_id";

pub fn record_created_lock_nums(lock_type: LockType, table_id: u64, num: u64) {
pub fn record_created_lock_nums(lock_type: String, table_id: u64, num: u64) {
let labels = &vec![
(LABEL_TYPE, lock_type.to_string()),
(LABEL_TYPE, lock_type),
(LABEL_TABLE_ID, table_id.to_string()),
];
CREATED_LOCK_NUMS.get_or_create(labels).inc_by(num);
}

pub fn record_acquired_lock_nums(lock_type: LockType, table_id: u64, num: u64) {
pub fn record_acquired_lock_nums(lock_type: String, table_id: u64, num: u64) {
let labels = &vec![
(LABEL_TYPE, lock_type.to_string()),
(LABEL_TYPE, lock_type),
(LABEL_TABLE_ID, table_id.to_string()),
];
ACQUIRED_LOCK_NUMS.get_or_create(labels).inc_by(num);
Expand Down
1 change: 1 addition & 0 deletions src/common/metrics/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod cache;
pub mod cluster;
pub mod http;
pub mod interpreter;
pub mod lock;
pub mod mysql;
pub mod openai;
pub mod session;
Expand Down
4 changes: 3 additions & 1 deletion src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use common_storage::StorageMetrics;
use storages_common_table_meta::meta::SnapshotId;
use storages_common_table_meta::meta::TableSnapshot;

use crate::lock::Lock;
use crate::plan::DataSourceInfo;
use crate::plan::DataSourcePlan;
use crate::plan::PartStatistics;
Expand Down Expand Up @@ -304,9 +305,10 @@ pub trait Table: Sync + Send {
async fn compact_segments(
&self,
ctx: Arc<dyn TableContext>,
lock: Arc<dyn Lock>,
limit: Option<usize>,
) -> Result<()> {
let (_, _) = (ctx, limit);
let (_, _, _) = (ctx, lock, limit);

Err(ErrorCode::Unimplemented(format!(
"table {}, of engine type {}, does not support compact segments",
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ storages-common-blocks = { path = "../storages/common/blocks" }
storages-common-cache = { path = "../storages/common/cache" }
storages-common-cache-manager = { path = "../storages/common/cache_manager" }
storages-common-index = { path = "../storages/common/index" }
storages-common-locks = { path = "../storages/common/locks" }
storages-common-table-meta = { path = "../storages/common/table_meta" }
stream-handler = { path = "../ee_features/stream_handler" }
vacuum-handler = { path = "../ee_features/vacuum_handler" }
Expand All @@ -107,6 +106,7 @@ async-backtrace = { workspace = true }
async-channel = "1.7.1"
async-stream = "0.3.3"
async-trait = { workspace = true }
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
base64 = "0.21.0"
bumpalo = { workspace = true }
byte-unit = "4.0.19"
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/global_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ use common_tracing::GlobalLogger;
use common_users::RoleCacheManager;
use common_users::UserApiProvider;
use storages_common_cache_manager::CacheManager;
use storages_common_locks::LockManager;

use crate::api::DataExchangeManager;
use crate::auth::AuthMgr;
use crate::catalogs::DatabaseCatalog;
use crate::clusters::ClusterDiscovery;
use crate::locks::LockManager;
use crate::servers::http::v1::HttpQueryManager;
use crate::sessions::SessionManager;

Expand Down
16 changes: 8 additions & 8 deletions src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::HashSet;
use std::collections::VecDeque;
use std::sync::Arc;

use common_catalog::lock::Lock;
use common_catalog::plan::Filters;
use common_catalog::plan::Partitions;
use common_catalog::table::TableExt;
Expand Down Expand Up @@ -58,12 +57,12 @@ use common_storages_factory::Table;
use common_storages_fuse::FuseTable;
use futures_util::TryStreamExt;
use log::debug;
use storages_common_locks::LockManager;
use storages_common_table_meta::meta::TableSnapshot;

use crate::interpreters::common::create_push_down_filters;
use crate::interpreters::Interpreter;
use crate::interpreters::SelectInterpreter;
use crate::locks::LockManager;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelinePullingExecutor;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -100,26 +99,27 @@ impl Interpreter for DeleteInterpreter {
debug!("ctx.id" = self.ctx.get_id().as_str(); "delete_interpreter_execute");

let is_distributed = !self.ctx.get_cluster().is_empty();
let catalog_name = self.plan.catalog_name.as_str();

let catalog_name = self.plan.catalog_name.as_str();
let catalog = self.ctx.get_catalog(catalog_name).await?;
let catalog_info = catalog.info();

let db_name = self.plan.database_name.as_str();
let tbl_name = self.plan.table_name.as_str();

// refresh table.
let tbl = catalog
.get_table(self.ctx.get_tenant().as_str(), db_name, tbl_name)
.await?;

// check mutability
tbl.check_mutable()?;

// Add table lock.
let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;

// refresh table.
let tbl = tbl.refresh(self.ctx.as_ref()).await?;

// check mutability
tbl.check_mutable()?;

let selection = if !self.plan.subquery_desc.is_empty() {
let support_row_id = tbl.support_row_id_column();
if !support_row_id {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::sync::Arc;

use common_catalog::catalog::Catalog;
use common_catalog::lock::Lock;
use common_catalog::table::Table;
use common_catalog::table::TableExt;
use common_exception::ErrorCode;
Expand Down Expand Up @@ -49,11 +48,11 @@ use common_storages_view::view_table::VIEW_ENGINE;
use common_users::UserApiProvider;
use data_mask_feature::get_datamask_handler;
use storages_common_index::BloomIndex;
use storages_common_locks::LockManager;
use storages_common_table_meta::table::OPT_KEY_BLOOM_INDEX_COLUMNS;

use super::common::check_referenced_computed_columns;
use crate::interpreters::Interpreter;
use crate::locks::LockManager;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -190,6 +189,12 @@ impl ModifyTableColumnInterpreter {
table: &Arc<dyn Table>,
field_and_comments: &[(TableField, String)],
) -> Result<PipelineBuildResult> {
// Add table lock.
let table_lock = LockManager::create_table_lock(table.get_table_info().clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;
// refresh table.
let table = table.refresh(self.ctx.as_ref()).await?;

let schema = table.schema().as_ref().clone();
let table_info = table.get_table_info();
let mut new_schema = schema.clone();
Expand Down Expand Up @@ -271,10 +276,6 @@ impl ModifyTableColumnInterpreter {
return Ok(PipelineBuildResult::create());
}

// Add table lock.
let table_lock = LockManager::create_table_lock(table_info.clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;

// 1. construct sql for selecting data from old table
let mut sql = "select".to_string();
schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ use common_sql::plans::OptimizeTableAction;
use common_sql::plans::OptimizeTablePlan;
use common_storages_factory::NavigationPoint;
use common_storages_fuse::FuseTable;
use storages_common_locks::LockManager;
use storages_common_table_meta::meta::TableSnapshot;

use crate::interpreters::interpreter_table_recluster::build_recluster_physical_plan;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterClusteringHistory;
use crate::locks::LockManager;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -162,7 +162,7 @@ impl OptimizeTableInterpreter {

if matches!(target, CompactTarget::Segments) {
table
.compact_segments(self.ctx.clone(), self.plan.limit)
.compact_segments(self.ctx.clone(), table_lock, self.plan.limit)
.await?;
return Ok(PipelineBuildResult::create());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ use common_storages_fuse::FuseTable;
use log::error;
use log::info;
use log::warn;
use storages_common_locks::LockManager;
use storages_common_table_meta::meta::BlockMeta;
use storages_common_table_meta::meta::Statistics;
use storages_common_table_meta::meta::TableSnapshot;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterClusteringHistory;
use crate::locks::LockManager;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
Expand Down
18 changes: 10 additions & 8 deletions src/query/service/src/interpreters/interpreter_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::collections::HashSet;
use std::collections::VecDeque;
use std::sync::Arc;

use common_catalog::lock::Lock;
use common_catalog::plan::Filters;
use common_catalog::plan::Partitions;
use common_catalog::table::TableExt;
Expand All @@ -42,7 +41,6 @@ use common_sql::Visibility;
use common_storages_factory::Table;
use common_storages_fuse::FuseTable;
use log::debug;
use storages_common_locks::LockManager;
use storages_common_table_meta::meta::TableSnapshot;

use crate::interpreters::common::check_deduplicate_label;
Expand All @@ -52,6 +50,7 @@ use crate::interpreters::common::RefreshAggIndexDesc;
use crate::interpreters::interpreter_delete::replace_subquery;
use crate::interpreters::interpreter_delete::subquery_filter;
use crate::interpreters::Interpreter;
use crate::locks::LockManager;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -88,22 +87,25 @@ impl Interpreter for UpdateInterpreter {
}

let catalog_name = self.plan.catalog.as_str();
let db_name = self.plan.database.as_str();
let tbl_name = self.plan.table.as_str();
let catalog = self.ctx.get_catalog(catalog_name).await?;
let catalog_info = catalog.info();
// refresh table.

let db_name = self.plan.database.as_str();
let tbl_name = self.plan.table.as_str();
let tbl = catalog
.get_table(self.ctx.get_tenant().as_str(), db_name, tbl_name)
.await?;

// check mutability
tbl.check_mutable()?;

// Add table lock.
let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;

// refresh table.
let tbl = tbl.refresh(self.ctx.as_ref()).await?;

// check mutability
tbl.check_mutable()?;

let selection = if !self.plan.subquery_desc.is_empty() {
let support_row_id = tbl.support_row_id_column();
if !support_row_id {
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod clusters;
pub mod databases;
pub mod interpreters;
pub mod local;
pub mod locks;
pub mod metrics;
pub mod pipelines;
pub mod schedulers;
Expand Down
Loading