Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify alter and drop cluster key logic #17128

Merged
merged 2 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<Labels: FamilyLabels> FamilyCounter<Labels> {
FamilyCounter {
index,
labels,
value: Arc::new(Default::default()),
value: Default::default(),
}
}

Expand Down
30 changes: 10 additions & 20 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,8 @@ pub struct TableMeta {
pub options: BTreeMap<String, String>,
// The default cluster key.
pub default_cluster_key: Option<String>,
// All cluster keys that have been defined.
pub cluster_keys: Vec<String>,
// The sequence number of default_cluster_key in cluster_keys.
pub default_cluster_key_id: Option<u32>,
// The sequence number of default_cluster_key.
pub default_cluster_key_id: u32,
pub created_on: DateTime<Utc>,
pub updated_on: DateTime<Utc>,
pub comment: String,
Expand Down Expand Up @@ -407,6 +405,13 @@ impl TableInfo {
self.meta.schema = schema;
self
}

pub fn cluster_key(&self) -> Option<(u32, String)> {
self.meta
.default_cluster_key
.clone()
.map(|k| (self.meta.default_cluster_key_id, k))
}
}

impl Default for TableMeta {
Expand All @@ -419,8 +424,7 @@ impl Default for TableMeta {
part_prefix: "".to_string(),
options: BTreeMap::new(),
default_cluster_key: None,
cluster_keys: vec![],
default_cluster_key_id: None,
default_cluster_key_id: 0,
created_on: Utc::now(),
updated_on: Utc::now(),
comment: "".to_string(),
Expand All @@ -434,20 +438,6 @@ impl Default for TableMeta {
}
}

impl TableMeta {
pub fn push_cluster_key(mut self, cluster_key: String) -> Self {
self.cluster_keys.push(cluster_key.clone());
self.default_cluster_key = Some(cluster_key);
self.default_cluster_key_id = Some(self.cluster_keys.len() as u32 - 1);
self
}

pub fn cluster_key(&self) -> Option<(u32, String)> {
self.default_cluster_key_id
.zip(self.default_cluster_key.clone())
}
}

impl Display for TableMeta {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
Expand Down
16 changes: 12 additions & 4 deletions src/meta/proto-conv/src/table_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ impl FromToProto for mt::TableMeta {
indexes.insert(name, mt::TableIndex::from_pb(index)?);
}

let default_cluster_key_id = if let Some(cluster_key_id) = p.default_cluster_key_id {
cluster_key_id
} else if p.cluster_keys.is_empty() {
0
} else {
p.cluster_keys.len() as u32 - 1
};

let v = Self {
schema: Arc::new(ex::TableSchema::from_pb(schema)?),
engine: p.engine,
Expand All @@ -203,8 +211,7 @@ impl FromToProto for mt::TableMeta {
part_prefix: p.part_prefix.unwrap_or("".to_string()),
options: p.options,
default_cluster_key: p.default_cluster_key,
cluster_keys: p.cluster_keys,
default_cluster_key_id: p.default_cluster_key_id,
default_cluster_key_id,
created_on: DateTime::<Utc>::from_pb(p.created_on)?,
updated_on: DateTime::<Utc>::from_pb(p.updated_on)?,
drop_on: match p.drop_on {
Expand Down Expand Up @@ -251,8 +258,9 @@ impl FromToProto for mt::TableMeta {
},
options: self.options.clone(),
default_cluster_key: self.default_cluster_key.clone(),
cluster_keys: self.cluster_keys.clone(),
default_cluster_key_id: self.default_cluster_key_id,
// cluster_keys is deprecated.
cluster_keys: vec![],
default_cluster_key_id: Some(self.default_cluster_key_id),
created_on: self.created_on.to_pb()?,
updated_on: self.updated_on.to_pb()?,
drop_on: match self.drop_on {
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/proto_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ fn new_table_meta() -> mt::TableMeta {
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v002_table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ fn test_decode_v2_table_meta() -> anyhow::Result<()> {
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v010_table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ fn test_decode_v10_table_meta() -> anyhow::Result<()> {
part_prefix: "".to_string(),
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v012_table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ fn test_decode_v12_table_meta() -> anyhow::Result<()> {
part_prefix: "".to_string(),
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v023_table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ fn test_decode_v23_table_meta() -> anyhow::Result<()> {
part_prefix: "lulu_".to_string(),
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v024_table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ fn test_decode_v24_table_meta() -> anyhow::Result<()> {
part_prefix: "lulu_".to_string(),
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v033_table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ fn test_decode_v33_table_meta() -> anyhow::Result<()> {
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v040_table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ fn test_decode_v40_table_meta() -> anyhow::Result<()> {
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v044_table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ fn test_decode_v44_table_meta() -> anyhow::Result<()> {
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v055_table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ fn test_decode_v55_table_meta() -> anyhow::Result<()> {
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v074_table_db_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ fn test_decode_v74_table_meta() -> anyhow::Result<()> {
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v080_geometry_datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,7 @@ fn test_decode_v80_table_meta() -> anyhow::Result<()> {
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v082_table_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ fn test_decode_v82_table_meta() -> anyhow::Result<()> {
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v085_table_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ fn test_decode_v85_table_meta() -> anyhow::Result<()> {
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v086_table_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ fn test_decode_v86_table_meta() -> anyhow::Result<()> {
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v094_table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ fn test_decode_v94_table_meta() -> anyhow::Result<()> {
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v107_geography_datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ fn test_decode_v107_table_meta() -> anyhow::Result<()> {
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/proto-conv/tests/it/v114_interval_datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,7 @@ fn test_decode_v114_table_meta() -> anyhow::Result<()> {
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
default_cluster_key_id: 0,
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
Expand Down
51 changes: 18 additions & 33 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::any::Any;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::sync::Arc;

use chrono::DateTime;
Expand All @@ -22,12 +23,13 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::BlockThresholds;
use databend_common_expression::ColumnId;
use databend_common_expression::RemoteExpr;
use databend_common_expression::Scalar;
use databend_common_expression::TableSchema;
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
use databend_common_io::constants::DEFAULT_BLOCK_MAX_ROWS;
use databend_common_io::constants::DEFAULT_BLOCK_MIN_ROWS;
use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::app_error::UnknownTableId;
use databend_common_meta_app::schema::TableIdent;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
Expand All @@ -37,9 +39,12 @@ use databend_common_meta_types::MetaId;
use databend_common_pipeline_core::Pipeline;
use databend_common_storage::Histogram;
use databend_common_storage::StorageMetrics;
use databend_storages_common_table_meta::meta::ClusterKey;
use databend_storages_common_table_meta::meta::SnapshotId;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::table::ChangeType;
use databend_storages_common_table_meta::table::ClusterType;
use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE;
use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX;
use databend_storages_common_table_meta::table_id_ranges::is_temp_table_id;

Expand Down Expand Up @@ -117,8 +122,18 @@ pub trait Table: Sync + Send {
false
}

fn cluster_keys(&self, _ctx: Arc<dyn TableContext>) -> Vec<RemoteExpr<String>> {
vec![]
fn cluster_key_meta(&self) -> Option<ClusterKey> {
None
}

fn cluster_type(&self) -> Option<ClusterType> {
self.cluster_key_meta()?;
let cluster_type = self
.options()
.get(OPT_KEY_CLUSTER_TYPE)
.and_then(|s| s.parse::<ClusterType>().ok())
.unwrap_or(ClusterType::Linear);
Some(cluster_type)
}

fn change_tracking_enabled(&self) -> bool {
Expand Down Expand Up @@ -159,31 +174,6 @@ pub trait Table: Sync + Send {
false
}

#[async_backtrace::framed]
async fn alter_table_cluster_keys(
&self,
ctx: Arc<dyn TableContext>,
cluster_key: String,
cluster_type: String,
) -> Result<()> {
let (_, _, _) = (ctx, cluster_key, cluster_type);

Err(ErrorCode::UnsupportedEngineParams(format!(
"Altering table cluster keys is not supported for the '{}' engine.",
self.engine()
)))
}

#[async_backtrace::framed]
async fn drop_table_cluster_keys(&self, ctx: Arc<dyn TableContext>) -> Result<()> {
let _ = ctx;

Err(ErrorCode::UnsupportedEngineParams(format!(
"Dropping table cluster keys is not supported for the '{}' engine.",
self.engine()
)))
}

/// Gather partitions to be scanned according to the push_downs
#[async_backtrace::framed]
async fn read_partitions(
Expand Down Expand Up @@ -584,11 +574,6 @@ pub struct NavigationDescriptor {
pub point: NavigationPoint,
}

use std::collections::HashMap;

use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::app_error::UnknownTableId;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)]
pub struct ParquetTableColumnStatisticsProvider {
column_stats: HashMap<ColumnId, Option<BasicColumnStatistics>>,
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ async fn compact_table(

{
// do recluster.
if !table.cluster_keys(ctx.clone()).is_empty() {
if table.cluster_key_meta().is_some() {
let recluster = RelOperator::Recluster(Recluster {
catalog: compact_target.catalog,
database: compact_target.database,
Expand Down
Loading
Loading