From 8eb18e2f2b4778b7d0d89567e2f1dc9e91af0cb1 Mon Sep 17 00:00:00 2001 From: zhyass Date: Fri, 27 Dec 2024 01:23:07 +0800 Subject: [PATCH] chore: simplify alter and drop cluster key logic --- .../runtime/metrics/family_metrics/counter.rs | 2 +- src/meta/app/src/schema/table.rs | 23 +-- .../src/table_from_to_protobuf_impl.rs | 14 +- src/meta/proto-conv/tests/it/proto_conv.rs | 3 +- .../proto-conv/tests/it/v002_table_meta.rs | 3 +- .../proto-conv/tests/it/v010_table_meta.rs | 3 +- .../proto-conv/tests/it/v012_table_meta.rs | 3 +- .../proto-conv/tests/it/v023_table_meta.rs | 3 +- .../proto-conv/tests/it/v024_table_meta.rs | 3 +- .../proto-conv/tests/it/v033_table_meta.rs | 3 +- .../proto-conv/tests/it/v040_table_meta.rs | 3 +- .../proto-conv/tests/it/v044_table_meta.rs | 3 +- .../proto-conv/tests/it/v055_table_meta.rs | 3 +- .../proto-conv/tests/it/v074_table_db_meta.rs | 3 +- .../tests/it/v080_geometry_datatype.rs | 3 +- .../proto-conv/tests/it/v082_table_index.rs | 3 +- .../proto-conv/tests/it/v085_table_index.rs | 3 +- .../proto-conv/tests/it/v086_table_index.rs | 3 +- .../proto-conv/tests/it/v094_table_meta.rs | 3 +- .../tests/it/v107_geography_datatype.rs | 3 +- .../tests/it/v114_interval_datatype.rs | 3 +- src/query/catalog/src/table.rs | 51 ++--- .../src/interpreters/hook/compact_hook.rs | 2 +- .../interpreter_cluster_key_alter.rs | 36 +++- .../interpreter_cluster_key_drop.rs | 24 ++- .../src/interpreters/interpreter_replace.rs | 2 +- .../interpreters/interpreter_table_create.rs | 3 +- .../interpreter_table_show_create.rs | 2 +- .../flight_sql/flight_sql_service/mod.rs | 2 +- src/query/service/src/sessions/query_ctx.rs | 2 +- .../service/src/sessions/query_ctx_shared.rs | 4 +- src/query/service/src/sessions/session.rs | 2 +- src/query/service/src/test_kits/fuse.rs | 1 - .../tests/it/servers/flight/flight_service.rs | 5 +- .../servers/flight_sql/flight_sql_server.rs | 3 +- .../tests/it/storages/fuse/meta/snapshot.rs | 4 +- .../it/storages/fuse/operations/clustering.rs | 56 +----- .../it/storages/fuse/operations/commit.rs | 1 - .../mutation/block_compact_mutator.rs | 1 - .../operations/mutation/recluster_mutator.rs | 1 - .../common/table_meta/src/meta/v4/snapshot.rs | 10 +- src/query/storages/fuse/src/fuse_table.rs | 181 +++--------------- .../storages/fuse/src/io/write/meta_writer.rs | 2 - .../common/generators/append_generator.rs | 10 +- .../common/generators/mutation_generator.rs | 7 +- .../common/generators/snapshot_generator.rs | 7 +- .../common/generators/truncate_generator.rs | 4 +- .../processors/multi_table_insert_commit.rs | 2 +- .../common/processors/sink_commit.rs | 12 +- .../09_0008_fuse_optimize_table.test | 4 +- .../mode/cluster/distributed_compact.sql.test | 2 +- 51 files changed, 178 insertions(+), 358 deletions(-) diff --git a/src/common/base/src/runtime/metrics/family_metrics/counter.rs b/src/common/base/src/runtime/metrics/family_metrics/counter.rs index 3f9dc32846cbd..6953ac4986ca8 100644 --- a/src/common/base/src/runtime/metrics/family_metrics/counter.rs +++ b/src/common/base/src/runtime/metrics/family_metrics/counter.rs @@ -42,7 +42,7 @@ impl FamilyCounter { FamilyCounter { index, labels, - value: Arc::new(Default::default()), + value: Default::default(), } } diff --git a/src/meta/app/src/schema/table.rs b/src/meta/app/src/schema/table.rs index aedd4dce32c45..cf0c2c29ac60f 100644 --- a/src/meta/app/src/schema/table.rs +++ b/src/meta/app/src/schema/table.rs @@ -262,10 +262,8 @@ pub struct TableMeta { pub options: BTreeMap, // The default cluster key. pub default_cluster_key: Option, - // All cluster keys that have been defined. - pub cluster_keys: Vec, - // The sequence number of default_cluster_key in cluster_keys. - pub default_cluster_key_id: Option, + // The sequence number of default_cluster_key. + pub default_cluster_key_id: u32, pub created_on: DateTime, pub updated_on: DateTime, pub comment: String, @@ -419,8 +417,7 @@ impl Default for TableMeta { part_prefix: "".to_string(), options: BTreeMap::new(), default_cluster_key: None, - cluster_keys: vec![], - default_cluster_key_id: None, + default_cluster_key_id: 0, created_on: Utc::now(), updated_on: Utc::now(), comment: "".to_string(), @@ -434,20 +431,6 @@ impl Default for TableMeta { } } -impl TableMeta { - pub fn push_cluster_key(mut self, cluster_key: String) -> Self { - self.cluster_keys.push(cluster_key.clone()); - self.default_cluster_key = Some(cluster_key); - self.default_cluster_key_id = Some(self.cluster_keys.len() as u32 - 1); - self - } - - pub fn cluster_key(&self) -> Option<(u32, String)> { - self.default_cluster_key_id - .zip(self.default_cluster_key.clone()) - } -} - impl Display for TableMeta { fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!( diff --git a/src/meta/proto-conv/src/table_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/table_from_to_protobuf_impl.rs index f490ddd937c43..25e8ba4db714e 100644 --- a/src/meta/proto-conv/src/table_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/table_from_to_protobuf_impl.rs @@ -192,6 +192,12 @@ impl FromToProto for mt::TableMeta { indexes.insert(name, mt::TableIndex::from_pb(index)?); } + let default_cluster_key_id = if let Some(cluster_key_id) = p.default_cluster_key_id { + cluster_key_id + } else { + p.cluster_keys.len() as u32 - 1 + }; + let v = Self { schema: Arc::new(ex::TableSchema::from_pb(schema)?), engine: p.engine, @@ -203,8 +209,7 @@ impl FromToProto for mt::TableMeta { part_prefix: p.part_prefix.unwrap_or("".to_string()), options: p.options, default_cluster_key: p.default_cluster_key, - cluster_keys: p.cluster_keys, - default_cluster_key_id: p.default_cluster_key_id, + default_cluster_key_id, created_on: DateTime::::from_pb(p.created_on)?, updated_on: DateTime::::from_pb(p.updated_on)?, drop_on: match p.drop_on { @@ -251,8 +256,9 @@ impl FromToProto for mt::TableMeta { }, options: self.options.clone(), default_cluster_key: self.default_cluster_key.clone(), - cluster_keys: self.cluster_keys.clone(), - default_cluster_key_id: self.default_cluster_key_id, + // cluster_keys is deprecated. + cluster_keys: vec![], + default_cluster_key_id: Some(self.default_cluster_key_id), created_on: self.created_on.to_pb()?, updated_on: self.updated_on.to_pb()?, drop_on: match self.drop_on { diff --git a/src/meta/proto-conv/tests/it/proto_conv.rs b/src/meta/proto-conv/tests/it/proto_conv.rs index bbf33928eb39c..219850672460b 100644 --- a/src/meta/proto-conv/tests/it/proto_conv.rs +++ b/src/meta/proto-conv/tests/it/proto_conv.rs @@ -141,8 +141,7 @@ fn new_table_meta() -> mt::TableMeta { engine_options: btreemap! {s("abc") => s("def")}, options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v002_table_meta.rs b/src/meta/proto-conv/tests/it/v002_table_meta.rs index 8b36524fc8d08..86ed8e2150b20 100644 --- a/src/meta/proto-conv/tests/it/v002_table_meta.rs +++ b/src/meta/proto-conv/tests/it/v002_table_meta.rs @@ -132,8 +132,7 @@ fn test_decode_v2_table_meta() -> anyhow::Result<()> { engine_options: btreemap! {s("abc") => s("def")}, options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v010_table_meta.rs b/src/meta/proto-conv/tests/it/v010_table_meta.rs index 0723a131b628d..1e3def8327b61 100644 --- a/src/meta/proto-conv/tests/it/v010_table_meta.rs +++ b/src/meta/proto-conv/tests/it/v010_table_meta.rs @@ -134,8 +134,7 @@ fn test_decode_v10_table_meta() -> anyhow::Result<()> { part_prefix: "".to_string(), options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v012_table_meta.rs b/src/meta/proto-conv/tests/it/v012_table_meta.rs index 3272e0f9079eb..f791bba035054 100644 --- a/src/meta/proto-conv/tests/it/v012_table_meta.rs +++ b/src/meta/proto-conv/tests/it/v012_table_meta.rs @@ -136,8 +136,7 @@ fn test_decode_v12_table_meta() -> anyhow::Result<()> { part_prefix: "".to_string(), options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v023_table_meta.rs b/src/meta/proto-conv/tests/it/v023_table_meta.rs index aef29f368e9ed..e91eaa8018316 100644 --- a/src/meta/proto-conv/tests/it/v023_table_meta.rs +++ b/src/meta/proto-conv/tests/it/v023_table_meta.rs @@ -136,8 +136,7 @@ fn test_decode_v23_table_meta() -> anyhow::Result<()> { part_prefix: "lulu_".to_string(), options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v024_table_meta.rs b/src/meta/proto-conv/tests/it/v024_table_meta.rs index f1986ef67ce73..db469a80fbc40 100644 --- a/src/meta/proto-conv/tests/it/v024_table_meta.rs +++ b/src/meta/proto-conv/tests/it/v024_table_meta.rs @@ -136,8 +136,7 @@ fn test_decode_v24_table_meta() -> anyhow::Result<()> { part_prefix: "lulu_".to_string(), options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v033_table_meta.rs b/src/meta/proto-conv/tests/it/v033_table_meta.rs index 5edd083b0cbe0..fad1cc901a3fe 100644 --- a/src/meta/proto-conv/tests/it/v033_table_meta.rs +++ b/src/meta/proto-conv/tests/it/v033_table_meta.rs @@ -138,8 +138,7 @@ fn test_decode_v33_table_meta() -> anyhow::Result<()> { engine_options: btreemap! {s("abc") => s("def")}, options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v040_table_meta.rs b/src/meta/proto-conv/tests/it/v040_table_meta.rs index 691d346681212..4021fa49dbc7d 100644 --- a/src/meta/proto-conv/tests/it/v040_table_meta.rs +++ b/src/meta/proto-conv/tests/it/v040_table_meta.rs @@ -138,8 +138,7 @@ fn test_decode_v40_table_meta() -> anyhow::Result<()> { engine_options: btreemap! {s("abc") => s("def")}, options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v044_table_meta.rs b/src/meta/proto-conv/tests/it/v044_table_meta.rs index e71f00a8c7aed..231d046488714 100644 --- a/src/meta/proto-conv/tests/it/v044_table_meta.rs +++ b/src/meta/proto-conv/tests/it/v044_table_meta.rs @@ -89,8 +89,7 @@ fn test_decode_v44_table_meta() -> anyhow::Result<()> { engine_options: btreemap! {s("abc") => s("def")}, options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v055_table_meta.rs b/src/meta/proto-conv/tests/it/v055_table_meta.rs index 6d10be03331e5..56bebd37c8838 100644 --- a/src/meta/proto-conv/tests/it/v055_table_meta.rs +++ b/src/meta/proto-conv/tests/it/v055_table_meta.rs @@ -80,8 +80,7 @@ fn test_decode_v55_table_meta() -> anyhow::Result<()> { engine_options: btreemap! {s("abc") => s("def")}, options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v074_table_db_meta.rs b/src/meta/proto-conv/tests/it/v074_table_db_meta.rs index 39a54e2f4cdce..7772cee1c247c 100644 --- a/src/meta/proto-conv/tests/it/v074_table_db_meta.rs +++ b/src/meta/proto-conv/tests/it/v074_table_db_meta.rs @@ -79,8 +79,7 @@ fn test_decode_v74_table_meta() -> anyhow::Result<()> { engine_options: btreemap! {s("abc") => s("def")}, options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v080_geometry_datatype.rs b/src/meta/proto-conv/tests/it/v080_geometry_datatype.rs index d5fd57523d78e..98a0b33093652 100644 --- a/src/meta/proto-conv/tests/it/v080_geometry_datatype.rs +++ b/src/meta/proto-conv/tests/it/v080_geometry_datatype.rs @@ -206,8 +206,7 @@ fn test_decode_v80_table_meta() -> anyhow::Result<()> { engine_options: btreemap! {s("abc") => s("def")}, options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v082_table_index.rs b/src/meta/proto-conv/tests/it/v082_table_index.rs index 1cb403f864a01..9bda9705a6290 100644 --- a/src/meta/proto-conv/tests/it/v082_table_index.rs +++ b/src/meta/proto-conv/tests/it/v082_table_index.rs @@ -80,8 +80,7 @@ fn test_decode_v82_table_meta() -> anyhow::Result<()> { engine_options: btreemap! {s("abc") => s("def")}, options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v085_table_index.rs b/src/meta/proto-conv/tests/it/v085_table_index.rs index 021efb86f7488..07e6c73661c6c 100644 --- a/src/meta/proto-conv/tests/it/v085_table_index.rs +++ b/src/meta/proto-conv/tests/it/v085_table_index.rs @@ -80,8 +80,7 @@ fn test_decode_v85_table_meta() -> anyhow::Result<()> { engine_options: btreemap! {s("abc") => s("def")}, options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v086_table_index.rs b/src/meta/proto-conv/tests/it/v086_table_index.rs index bb980bf885548..10ff0656f950f 100644 --- a/src/meta/proto-conv/tests/it/v086_table_index.rs +++ b/src/meta/proto-conv/tests/it/v086_table_index.rs @@ -83,8 +83,7 @@ fn test_decode_v86_table_meta() -> anyhow::Result<()> { engine_options: btreemap! {s("abc") => s("def")}, options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v094_table_meta.rs b/src/meta/proto-conv/tests/it/v094_table_meta.rs index 8de6bc15d939a..9b44fd0876356 100644 --- a/src/meta/proto-conv/tests/it/v094_table_meta.rs +++ b/src/meta/proto-conv/tests/it/v094_table_meta.rs @@ -78,8 +78,7 @@ fn test_decode_v94_table_meta() -> anyhow::Result<()> { engine_options: btreemap! {s("abc") => s("def")}, options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v107_geography_datatype.rs b/src/meta/proto-conv/tests/it/v107_geography_datatype.rs index b386231aa43c4..70f39e95b401e 100644 --- a/src/meta/proto-conv/tests/it/v107_geography_datatype.rs +++ b/src/meta/proto-conv/tests/it/v107_geography_datatype.rs @@ -211,8 +211,7 @@ fn test_decode_v107_table_meta() -> anyhow::Result<()> { engine_options: btreemap! {s("abc") => s("def")}, options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/meta/proto-conv/tests/it/v114_interval_datatype.rs b/src/meta/proto-conv/tests/it/v114_interval_datatype.rs index 3516491b0c120..4ddeaaa7d23b6 100644 --- a/src/meta/proto-conv/tests/it/v114_interval_datatype.rs +++ b/src/meta/proto-conv/tests/it/v114_interval_datatype.rs @@ -202,8 +202,7 @@ fn test_decode_v114_table_meta() -> anyhow::Result<()> { engine_options: btreemap! {s("abc") => s("def")}, options: btreemap! {s("xyz") => s("foo")}, default_cluster_key: Some("(a + 2, b)".to_string()), - cluster_keys: vec!["(a + 2, b)".to_string()], - default_cluster_key_id: Some(0), + default_cluster_key_id: 0, created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), comment: s("table_comment"), diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index 1ae950ae984eb..d3e04f0686089 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -14,6 +14,7 @@ use std::any::Any; use std::collections::BTreeMap; +use std::collections::HashMap; use std::sync::Arc; use chrono::DateTime; @@ -22,12 +23,13 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockThresholds; use databend_common_expression::ColumnId; -use databend_common_expression::RemoteExpr; use databend_common_expression::Scalar; use databend_common_expression::TableSchema; use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE; use databend_common_io::constants::DEFAULT_BLOCK_MAX_ROWS; use databend_common_io::constants::DEFAULT_BLOCK_MIN_ROWS; +use databend_common_meta_app::app_error::AppError; +use databend_common_meta_app::app_error::UnknownTableId; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; @@ -37,9 +39,12 @@ use databend_common_meta_types::MetaId; use databend_common_pipeline_core::Pipeline; use databend_common_storage::Histogram; use databend_common_storage::StorageMetrics; +use databend_storages_common_table_meta::meta::ClusterKey; use databend_storages_common_table_meta::meta::SnapshotId; use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::table::ChangeType; +use databend_storages_common_table_meta::table::ClusterType; +use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE; use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX; use databend_storages_common_table_meta::table_id_ranges::is_temp_table_id; @@ -117,8 +122,18 @@ pub trait Table: Sync + Send { false } - fn cluster_keys(&self, _ctx: Arc) -> Vec> { - vec![] + fn cluster_key_meta(&self) -> Option { + None + } + + fn cluster_type(&self) -> Option { + self.cluster_key_meta()?; + let cluster_type = self + .options() + .get(OPT_KEY_CLUSTER_TYPE) + .and_then(|s| s.parse::().ok()) + .unwrap_or(ClusterType::Linear); + Some(cluster_type) } fn change_tracking_enabled(&self) -> bool { @@ -159,31 +174,6 @@ pub trait Table: Sync + Send { false } - #[async_backtrace::framed] - async fn alter_table_cluster_keys( - &self, - ctx: Arc, - cluster_key: String, - cluster_type: String, - ) -> Result<()> { - let (_, _, _) = (ctx, cluster_key, cluster_type); - - Err(ErrorCode::UnsupportedEngineParams(format!( - "Altering table cluster keys is not supported for the '{}' engine.", - self.engine() - ))) - } - - #[async_backtrace::framed] - async fn drop_table_cluster_keys(&self, ctx: Arc) -> Result<()> { - let _ = ctx; - - Err(ErrorCode::UnsupportedEngineParams(format!( - "Dropping table cluster keys is not supported for the '{}' engine.", - self.engine() - ))) - } - /// Gather partitions to be scanned according to the push_downs #[async_backtrace::framed] async fn read_partitions( @@ -584,11 +574,6 @@ pub struct NavigationDescriptor { pub point: NavigationPoint, } -use std::collections::HashMap; - -use databend_common_meta_app::app_error::AppError; -use databend_common_meta_app::app_error::UnknownTableId; - #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)] pub struct ParquetTableColumnStatisticsProvider { column_stats: HashMap>, diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index e2fd0f300b88c..61e1a7c5bcaaf 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -194,7 +194,7 @@ async fn compact_table( { // do recluster. - if !table.cluster_keys(ctx.clone()).is_empty() { + if table.cluster_key_meta().is_some() { let recluster = RelOperator::Recluster(Recluster { catalog: compact_target.catalog, database: compact_target.database, diff --git a/src/query/service/src/interpreters/interpreter_cluster_key_alter.rs b/src/query/service/src/interpreters/interpreter_cluster_key_alter.rs index 9335677678d98..c5bd5d1516f98 100644 --- a/src/query/service/src/interpreters/interpreter_cluster_key_alter.rs +++ b/src/query/service/src/interpreters/interpreter_cluster_key_alter.rs @@ -14,8 +14,14 @@ use std::sync::Arc; +use databend_common_catalog::table::Table; +use databend_common_catalog::table::TableExt; use databend_common_exception::Result; +use databend_common_meta_app::schema::UpdateTableMetaReq; +use databend_common_meta_types::MatchSeq; use databend_common_sql::plans::AlterTableClusterKeyPlan; +use databend_common_storages_fuse::FuseTable; +use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE; use super::Interpreter; use crate::pipelines::PipelineBuildResult; @@ -52,12 +58,36 @@ impl Interpreter for AlterTableClusterKeyInterpreter { let table = catalog .get_table(&tenant, &plan.database, &plan.table) .await?; + // check mutability + table.check_mutable()?; + let fuse_table = FuseTable::try_from_table(table.as_ref())?; let cluster_key_str = format!("({})", plan.cluster_keys.join(", ")); + // if new cluster_key_str is the same with old one, + // no need to change + if let Some(old_cluster_key_str) = fuse_table.cluster_key_str() + && *old_cluster_key_str == cluster_key_str + { + let old_cluster_type = fuse_table.cluster_type(); + if old_cluster_type.is_some_and(|v| v.to_string().to_lowercase() == plan.cluster_type) { + return Ok(PipelineBuildResult::create()); + } + } - table - .alter_table_cluster_keys(self.ctx.clone(), cluster_key_str, plan.cluster_type.clone()) - .await?; + let table_info = fuse_table.get_table_info(); + let mut new_table_meta = table_info.meta.clone(); + new_table_meta + .options + .insert(OPT_KEY_CLUSTER_TYPE.to_owned(), plan.cluster_type.clone()); + new_table_meta.default_cluster_key = Some(cluster_key_str); + new_table_meta.default_cluster_key_id += 1; + + let req = UpdateTableMetaReq { + table_id: table_info.ident.table_id, + seq: MatchSeq::Exact(table_info.ident.seq), + new_table_meta, + }; + catalog.update_single_table_meta(req, table_info).await?; Ok(PipelineBuildResult::create()) } diff --git a/src/query/service/src/interpreters/interpreter_cluster_key_drop.rs b/src/query/service/src/interpreters/interpreter_cluster_key_drop.rs index c437065323dff..3b61bc274e709 100644 --- a/src/query/service/src/interpreters/interpreter_cluster_key_drop.rs +++ b/src/query/service/src/interpreters/interpreter_cluster_key_drop.rs @@ -14,8 +14,14 @@ use std::sync::Arc; +use databend_common_catalog::table::Table; +use databend_common_catalog::table::TableExt; use databend_common_exception::Result; +use databend_common_meta_app::schema::UpdateTableMetaReq; +use databend_common_meta_types::MatchSeq; use databend_common_sql::plans::DropTableClusterKeyPlan; +use databend_common_storages_fuse::FuseTable; +use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE; use super::Interpreter; use crate::pipelines::PipelineBuildResult; @@ -52,8 +58,24 @@ impl Interpreter for DropTableClusterKeyInterpreter { let table = catalog .get_table(&tenant, &plan.database, &plan.table) .await?; + if table.cluster_key_meta().is_none() { + return Ok(PipelineBuildResult::create()); + } + // check mutability + table.check_mutable()?; - table.drop_table_cluster_keys(self.ctx.clone()).await?; + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + let table_info = fuse_table.get_table_info(); + let mut new_table_meta = table_info.meta.clone(); + new_table_meta.default_cluster_key = None; + new_table_meta.options.remove(OPT_KEY_CLUSTER_TYPE); + + let req = UpdateTableMetaReq { + table_id: table_info.ident.table_id, + seq: MatchSeq::Exact(table_info.ident.seq), + new_table_meta, + }; + catalog.update_single_table_meta(req, table_info).await?; Ok(PipelineBuildResult::create()) } diff --git a/src/query/service/src/interpreters/interpreter_replace.rs b/src/query/service/src/interpreters/interpreter_replace.rs index 9d35f62e38185..65084b30ac970 100644 --- a/src/query/service/src/interpreters/interpreter_replace.rs +++ b/src/query/service/src/interpreters/interpreter_replace.rs @@ -290,7 +290,7 @@ impl ReplaceInterpreter { .ctx .get_settings() .get_replace_into_bloom_pruning_max_column_number()?; - let bloom_filter_column_indexes = if !table.cluster_keys(self.ctx.clone()).is_empty() { + let bloom_filter_column_indexes = if table.cluster_key_meta().is_some() { fuse_table .choose_bloom_filter_columns( self.ctx.clone(), diff --git a/src/query/service/src/interpreters/interpreter_table_create.rs b/src/query/service/src/interpreters/interpreter_table_create.rs index 173d891b5d0cc..5634343026731 100644 --- a/src/query/service/src/interpreters/interpreter_table_create.rs +++ b/src/query/service/src/interpreters/interpreter_table_create.rs @@ -411,7 +411,8 @@ impl CreateTableInterpreter { } if let Some(cluster_key) = &self.plan.cluster_key { - table_meta = table_meta.push_cluster_key(cluster_key.clone()); + table_meta.default_cluster_key = Some(cluster_key.clone()); + table_meta.default_cluster_key_id += 1; } let req = CreateTableReq { diff --git a/src/query/service/src/interpreters/interpreter_table_show_create.rs b/src/query/service/src/interpreters/interpreter_table_show_create.rs index ab227c57310bf..64cc745d7f353 100644 --- a/src/query/service/src/interpreters/interpreter_table_show_create.rs +++ b/src/query/service/src/interpreters/interpreter_table_show_create.rs @@ -246,7 +246,7 @@ impl ShowCreateTableInterpreter { let table_engine = format!(") ENGINE={}", engine); table_create_sql.push_str(table_engine.as_str()); - if let Some((_, cluster_keys_str)) = table_info.meta.cluster_key() { + if let Some(cluster_keys_str) = &table_info.meta.default_cluster_key { let cluster_type = table_info .options() .get(OPT_KEY_CLUSTER_TYPE) diff --git a/src/query/service/src/servers/flight_sql/flight_sql_service/mod.rs b/src/query/service/src/servers/flight_sql/flight_sql_service/mod.rs index 1b69debfa2391..cc0f84e2fe234 100644 --- a/src/query/service/src/servers/flight_sql/flight_sql_service/mod.rs +++ b/src/query/service/src/servers/flight_sql/flight_sql_service/mod.rs @@ -59,7 +59,7 @@ impl FlightSqlServiceImpl { pub fn create() -> Self { FlightSqlServiceImpl { sessions: Mutex::new(Default::default()), - statements: Arc::new(Default::default()), + statements: Default::default(), } } } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 91cf2656b849a..f08482c90175e 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -180,7 +180,7 @@ impl QueryContext { fragment_id: Arc::new(AtomicUsize::new(0)), inserted_segment_locs: Arc::new(RwLock::new(HashSet::new())), block_threshold: Arc::new(RwLock::new(BlockThresholds::default())), - m_cte_temp_table: Arc::new(Default::default()), + m_cte_temp_table: Default::default(), }) } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index b3736494d02e3..6a760f11fc61e 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -181,8 +181,8 @@ impl QueryContextShared { finish_time: Default::default(), on_error_map: Arc::new(RwLock::new(None)), on_error_mode: Arc::new(RwLock::new(None)), - copy_status: Arc::new(Default::default()), - mutation_status: Arc::new(Default::default()), + copy_status: Default::default(), + mutation_status: Default::default(), partitions_shas: Arc::new(RwLock::new(vec![])), cacheable: Arc::new(AtomicBool::new(true)), can_scan_from_agg_index: Arc::new(AtomicBool::new(true)), diff --git a/src/query/service/src/sessions/session.rs b/src/query/service/src/sessions/session.rs index 6c312f03a01c9..b154c3ee91c37 100644 --- a/src/query/service/src/sessions/session.rs +++ b/src/query/service/src/sessions/session.rs @@ -63,7 +63,7 @@ impl Session { session_ctx: Box, mysql_connection_id: Option, ) -> Result { - let status = Arc::new(Default::default()); + let status = Default::default(); Ok(Session { id, typ: RwLock::new(typ), diff --git a/src/query/service/src/test_kits/fuse.rs b/src/query/service/src/test_kits/fuse.rs index cc4996a8754d1..dfa2f95579a94 100644 --- a/src/query/service/src/test_kits/fuse.rs +++ b/src/query/service/src/test_kits/fuse.rs @@ -210,7 +210,6 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> { Statistics::default(), locations, None, - None, ); snapshot_1.timestamp = Some(now - Duration::hours(12)); snapshot_1.summary = diff --git a/src/query/service/tests/it/servers/flight/flight_service.rs b/src/query/service/tests/it/servers/flight/flight_service.rs index ca84366f35b8e..92fc18874f633 100644 --- a/src/query/service/tests/it/servers/flight/flight_service.rs +++ b/src/query/service/tests/it/servers/flight/flight_service.rs @@ -15,7 +15,6 @@ use std::net::SocketAddr; use std::net::TcpListener; use std::str::FromStr; -use std::sync::Arc; use arrow_flight::flight_service_client::FlightServiceClient; use arrow_flight::Empty; @@ -74,7 +73,7 @@ async fn test_tls_rpc_server_invalid_server_config() -> Result<()> { .rpc_tls_server_key("../tests/data/certs/none.key") .rpc_tls_server_cert("../tests/data/certs/none.pem") .build(), - abort_notify: Arc::new(Default::default()), + abort_notify: Default::default(), }; let r = srv .start_with_incoming("127.0.0.1:0".parse().unwrap()) @@ -112,7 +111,7 @@ async fn test_rpc_server_port_used() -> Result<()> { let mut srv = FlightService { config: ConfigBuilder::create().build(), - abort_notify: Arc::new(Default::default()), + abort_notify: Default::default(), }; let r = srv.start_with_incoming(local_socket).await; diff --git a/src/query/service/tests/it/servers/flight_sql/flight_sql_server.rs b/src/query/service/tests/it/servers/flight_sql/flight_sql_server.rs index 6173f412da364..459d441307dd0 100644 --- a/src/query/service/tests/it/servers/flight_sql/flight_sql_server.rs +++ b/src/query/service/tests/it/servers/flight_sql/flight_sql_server.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::net::TcpListener; -use std::sync::Arc; use databend_common_base::base::tokio; use databend_common_exception::Result; @@ -27,7 +26,7 @@ async fn test_flight_sql_server_port_used() -> Result<()> { let mut srv = FlightSQLServer { config: ConfigBuilder::create().build(), - abort_notify: Arc::new(Default::default()), + abort_notify: Default::default(), }; let r = srv.start_with_incoming(local_socket).await; diff --git a/src/query/service/tests/it/storages/fuse/meta/snapshot.rs b/src/query/service/tests/it/storages/fuse/meta/snapshot.rs index b0d6b35413d35..dd35d2e6f6ffa 100644 --- a/src/query/service/tests/it/storages/fuse/meta/snapshot.rs +++ b/src/query/service/tests/it/storages/fuse/meta/snapshot.rs @@ -26,7 +26,7 @@ fn default_snapshot() -> TableSnapshot { let uuid = Uuid::new_v4(); let schema = TableSchema::empty(); let stats = Default::default(); - TableSnapshot::new(uuid, None, &None, None, schema, stats, vec![], None, None) + TableSnapshot::new(uuid, None, &None, None, schema, stats, vec![], None) } #[test] @@ -49,7 +49,6 @@ fn snapshot_timestamp_monotonic_increase() { Default::default(), vec![], None, - None, ); let current_ts = current.timestamp.unwrap(); let prev_ts = prev.timestamp.unwrap(); @@ -74,7 +73,6 @@ fn snapshot_timestamp_time_skew_tolerance() { Default::default(), vec![], None, - None, ); let current_ts = current.timestamp.unwrap(); let prev_ts = prev.timestamp.unwrap(); diff --git a/src/query/service/tests/it/storages/fuse/operations/clustering.rs b/src/query/service/tests/it/storages/fuse/operations/clustering.rs index 39183baaf4cb7..865e3857738ba 100644 --- a/src/query/service/tests/it/storages/fuse/operations/clustering.rs +++ b/src/query/service/tests/it/storages/fuse/operations/clustering.rs @@ -18,18 +18,14 @@ use databend_common_meta_app::schema::CreateOption; use databend_common_sql::plans::AlterTableClusterKeyPlan; use databend_common_sql::plans::CreateTablePlan; use databend_common_sql::plans::DropTableClusterKeyPlan; -use databend_common_storages_fuse::io::MetaReaders; -use databend_common_storages_fuse::FuseTable; use databend_query::interpreters::AlterTableClusterKeyInterpreter; use databend_query::interpreters::CreateTableInterpreter; use databend_query::interpreters::DropTableClusterKeyInterpreter; use databend_query::interpreters::Interpreter; use databend_query::test_kits::*; -use databend_storages_common_cache::LoadParams; -use databend_storages_common_table_meta::meta::TableSnapshot; -use databend_storages_common_table_meta::meta::Versioned; +use databend_storages_common_table_meta::table::LINEAR_CLUSTER_TYPE; +use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE; use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID; -use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION; #[tokio::test(flavor = "multi_thread")] async fn test_fuse_alter_table_cluster_key() -> databend_common_exception::Result<()> { @@ -77,29 +73,12 @@ async fn test_fuse_alter_table_cluster_key() -> databend_common_exception::Resul let _ = interpreter.execute(ctx.clone()).await?; let table = fixture.latest_default_table().await?; - let fuse_table = FuseTable::try_from_table(table.as_ref())?; let table_info = table.get_table_info(); - assert_eq!(table_info.meta.cluster_keys, vec!["(id)".to_string()]); - assert_eq!(table_info.meta.default_cluster_key_id, Some(0)); - - let snapshot_loc = table - .get_table_info() - .options() - .get(OPT_KEY_SNAPSHOT_LOCATION) - .unwrap(); - let reader = MetaReaders::table_snapshot_reader(fuse_table.get_operator()); - - let load_params = LoadParams { - location: snapshot_loc.clone(), - len_hint: None, - ver: TableSnapshot::VERSION, - put_cache: false, - }; - - let snapshot = reader.read(&load_params).await?; - let expected = Some((0, "(id)".to_string())); - - assert_eq!(snapshot.cluster_key_meta, expected); + assert_eq!(table_info.meta.default_cluster_key_id, 1); + assert_eq!( + table_info.meta.options.get(OPT_KEY_CLUSTER_TYPE).unwrap(), + LINEAR_CLUSTER_TYPE + ); // drop cluster key let drop_table_cluster_key_plan = DropTableClusterKeyPlan { @@ -113,28 +92,9 @@ async fn test_fuse_alter_table_cluster_key() -> databend_common_exception::Resul let _ = interpreter.execute(ctx.clone()).await?; let table = fixture.latest_default_table().await?; - let fuse_table = FuseTable::try_from_table(table.as_ref())?; let table_info = table.get_table_info(); assert_eq!(table_info.meta.default_cluster_key, None); - assert_eq!(table_info.meta.default_cluster_key_id, None); - - let snapshot_loc = table - .get_table_info() - .options() - .get(OPT_KEY_SNAPSHOT_LOCATION) - .unwrap(); - let reader = MetaReaders::table_snapshot_reader(fuse_table.get_operator()); - - let params = LoadParams { - location: snapshot_loc.clone(), - len_hint: None, - ver: TableSnapshot::VERSION, - put_cache: false, - }; - - let snapshot = reader.read(¶ms).await?; - let expected = None; - assert_eq!(snapshot.cluster_key_meta, expected); + assert_eq!(table_info.meta.default_cluster_key_id, 1); Ok(()) } diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index be1090d0c9330..a88975171c57e 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -288,7 +288,6 @@ async fn test_commit_to_meta_server() -> Result<()> { Statistics::default(), new_segments, None, - None, ); let faked_catalog = FakedCatalog { diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs index 0ac5a4b2c3552..4af069452d491 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs @@ -233,7 +233,6 @@ async fn test_safety() -> Result<()> { summary, locations.clone(), None, - None, ); let limit: usize = rand.gen_range(1..15); diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs index 8fe26e8947f3f..345f14f253e8b 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs @@ -257,7 +257,6 @@ async fn test_safety_for_recluster() -> Result<()> { summary, locations.clone(), None, - None, )); let mut block_ids = HashSet::new(); diff --git a/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs b/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs index e6fc53ae206aa..850e3332adee6 100644 --- a/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs +++ b/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs @@ -43,7 +43,7 @@ use crate::meta::Versioned; /// The structure of the TableSnapshot is the same as that of v2, but the serialization and deserialization methods are different #[derive(Serialize, Deserialize, Clone, Debug)] pub struct TableSnapshot { - /// format version of TableSnapshot meta data + /// format version of TableSnapshot metadata /// /// Note that: /// @@ -78,13 +78,14 @@ pub struct TableSnapshot { /// Summary Statistics pub summary: Statistics, - /// Pointers to SegmentInfos (may be of different format) + /// Pointers to SegmentInfos (maybe of different format) /// /// We rely on background merge tasks to keep merging segments, so that /// this the size of this vector could be kept reasonable pub segments: Vec, /// The metadata of the cluster keys. + /// **This field is deprecated and will be removed in the next version.** pub cluster_key_meta: Option, pub table_statistics_location: Option, @@ -100,7 +101,6 @@ impl TableSnapshot { schema: TableSchema, summary: Statistics, segments: Vec, - cluster_key_meta: Option, table_statistics_location: Option, ) -> Self { let now = Utc::now(); @@ -120,7 +120,7 @@ impl TableSnapshot { schema, summary, segments, - cluster_key_meta, + cluster_key_meta: None, table_statistics_location, least_visible_timestamp: None, } @@ -136,7 +136,6 @@ impl TableSnapshot { Statistics::default(), vec![], None, - None, ) } @@ -152,7 +151,6 @@ impl TableSnapshot { clone.schema, clone.summary, clone.segments, - clone.cluster_key_meta, clone.table_statistics_location, ) } diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index b63b59dc4d5b0..339d0b6401186 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -75,7 +75,6 @@ use databend_storages_common_table_meta::meta::parse_storage_prefix; use databend_storages_common_table_meta::meta::ClusterKey; use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::meta::SnapshotId; -use databend_storages_common_table_meta::meta::Statistics as FuseStatistics; use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::meta::TableSnapshotStatistics; use databend_storages_common_table_meta::meta::Versioned; @@ -95,7 +94,6 @@ use log::info; use log::warn; use opendal::Operator; use parking_lot::Mutex; -use uuid::Uuid; use crate::fuse_column::FuseTableColumnStatisticsProvider; use crate::fuse_type::FuseTableType; @@ -166,8 +164,11 @@ impl FuseTable { disable_refresh: bool, ) -> Result> { let storage_prefix = Self::parse_storage_prefix_from_table_info(&table_info)?; - let cluster_key_meta = table_info.meta.cluster_key(); - + let cluster_key_meta = table_info + .meta + .default_cluster_key + .clone() + .map(|k| (table_info.meta.default_cluster_key_id, k)); let (mut operator, table_type) = match table_info.db_type.clone() { DatabaseType::NormalDB => { let storage_params = table_info.meta.storage_params.clone(); @@ -406,8 +407,28 @@ impl FuseTable { self.cluster_key_meta.clone().map(|v| v.0) } - pub fn cluster_key_meta(&self) -> Option { - self.cluster_key_meta.clone() + pub fn cluster_keys(&self, ctx: Arc) -> Vec> { + let table_meta = Arc::new(self.clone()); + if let Some((_, order)) = &self.cluster_key_meta { + let cluster_type = self.get_option(OPT_KEY_CLUSTER_TYPE, ClusterType::Linear); + let cluster_keys = match cluster_type { + ClusterType::Linear => parse_cluster_keys(ctx, table_meta.clone(), order), + ClusterType::Hilbert => parse_hilbert_cluster_key(ctx, table_meta.clone(), order), + } + .unwrap(); + + let cluster_keys = cluster_keys + .iter() + .map(|k| { + k.project_column_ref(|index| { + table_meta.schema().field(*index).name().to_string() + }) + .as_remote_expr() + }) + .collect(); + return cluster_keys; + } + vec![] } pub fn bloom_index_cols(&self) -> BloomIndexColumns { @@ -626,28 +647,8 @@ impl Table for FuseTable { matches!(self.storage_format, FuseStorageFormat::Parquet) } - fn cluster_keys(&self, ctx: Arc) -> Vec> { - let table_meta = Arc::new(self.clone()); - if let Some((_, order)) = &self.cluster_key_meta { - let cluster_type = self.get_option(OPT_KEY_CLUSTER_TYPE, ClusterType::Linear); - let cluster_keys = match cluster_type { - ClusterType::Linear => parse_cluster_keys(ctx, table_meta.clone(), order), - ClusterType::Hilbert => parse_hilbert_cluster_key(ctx, table_meta.clone(), order), - } - .unwrap(); - - let cluster_keys = cluster_keys - .iter() - .map(|k| { - k.project_column_ref(|index| { - table_meta.schema().field(*index).name().to_string() - }) - .as_remote_expr() - }) - .collect(); - return cluster_keys; - } - vec![] + fn cluster_key_meta(&self) -> Option { + self.cluster_key_meta.clone() } fn change_tracking_enabled(&self) -> bool { @@ -675,130 +676,6 @@ impl Table for FuseTable { } } - #[async_backtrace::framed] - async fn alter_table_cluster_keys( - &self, - ctx: Arc, - cluster_key_str: String, - cluster_type: String, - ) -> Result<()> { - // if new cluster_key_str is the same with old one, - // no need to change - if let Some(old_cluster_key_str) = self.cluster_key_str() - && *old_cluster_key_str == cluster_key_str - { - let old_cluster_type = self - .get_option(OPT_KEY_CLUSTER_TYPE, ClusterType::Linear) - .to_string() - .to_lowercase(); - if cluster_type == old_cluster_type { - return Ok(()); - } - } - let mut new_table_meta = self.get_table_info().meta.clone(); - new_table_meta - .options - .insert(OPT_KEY_CLUSTER_TYPE.to_owned(), cluster_type); - new_table_meta = new_table_meta.push_cluster_key(cluster_key_str); - let cluster_key_meta = new_table_meta.cluster_key(); - let schema = self.schema().as_ref().clone(); - - let prev = self.read_table_snapshot().await?; - let prev_version = self.snapshot_format_version(None)?; - let prev_timestamp = prev.as_ref().and_then(|v| v.timestamp); - let prev_snapshot_id = prev.as_ref().map(|v| (v.snapshot_id, prev_version)); - let prev_statistics_location = prev - .as_ref() - .and_then(|v| v.table_statistics_location.clone()); - let (summary, segments) = if let Some(v) = prev { - (v.summary.clone(), v.segments.clone()) - } else { - (FuseStatistics::default(), vec![]) - }; - - let table_version = Some(self.get_table_info().ident.seq); - - let new_snapshot = TableSnapshot::new( - Uuid::new_v4(), - table_version, - &prev_timestamp, - prev_snapshot_id, - schema, - summary, - segments, - cluster_key_meta, - prev_statistics_location, - ); - - let mut table_info = self.table_info.clone(); - table_info.meta = new_table_meta; - - FuseTable::commit_to_meta_server( - ctx.as_ref(), - &table_info, - &self.meta_location_generator, - new_snapshot, - None, - &None, - &self.operator, - ) - .await - } - - #[async_backtrace::framed] - async fn drop_table_cluster_keys(&self, ctx: Arc) -> Result<()> { - if self.cluster_key_meta.is_none() { - return Ok(()); - } - - let mut new_table_meta = self.get_table_info().meta.clone(); - new_table_meta.default_cluster_key = None; - new_table_meta.default_cluster_key_id = None; - - let schema = self.schema().as_ref().clone(); - - let prev = self.read_table_snapshot().await?; - let prev_version = self.snapshot_format_version(None)?; - let prev_timestamp = prev.as_ref().and_then(|v| v.timestamp); - let prev_statistics_location = prev - .as_ref() - .and_then(|v| v.table_statistics_location.clone()); - let prev_snapshot_id = prev.as_ref().map(|v| (v.snapshot_id, prev_version)); - let (summary, segments) = if let Some(v) = prev { - (v.summary.clone(), v.segments.clone()) - } else { - (FuseStatistics::default(), vec![]) - }; - - let table_version = Some(self.get_table_info().ident.seq); - - let new_snapshot = TableSnapshot::new( - Uuid::new_v4(), - table_version, - &prev_timestamp, - prev_snapshot_id, - schema, - summary, - segments, - None, - prev_statistics_location, - ); - - let mut table_info = self.table_info.clone(); - table_info.meta = new_table_meta; - - FuseTable::commit_to_meta_server( - ctx.as_ref(), - &table_info, - &self.meta_location_generator, - new_snapshot, - None, - &None, - &self.operator, - ) - .await - } - #[fastrace::trace] #[async_backtrace::framed] async fn read_partitions( diff --git a/src/query/storages/fuse/src/io/write/meta_writer.rs b/src/query/storages/fuse/src/io/write/meta_writer.rs index f38dc6dcd236d..91e0493fb5c1c 100644 --- a/src/query/storages/fuse/src/io/write/meta_writer.rs +++ b/src/query/storages/fuse/src/io/write/meta_writer.rs @@ -136,7 +136,6 @@ mod tests { Statistics::default(), vec![], None, - None, ); snapshot.format_version = v; let _ = snapshot.marshal(); @@ -154,7 +153,6 @@ mod tests { Statistics::default(), vec![], None, - None, ); snapshot.marshal().unwrap(); } diff --git a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs index 015621e92c652..65b8092d1f16c 100644 --- a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs @@ -25,7 +25,6 @@ use databend_common_expression::Scalar; use databend_common_expression::TableDataType; use databend_common_expression::TableSchema; use databend_common_sql::field_default_value; -use databend_storages_common_table_meta::meta::ClusterKey; use databend_storages_common_table_meta::meta::ColumnStatistics; use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::TableSnapshot; @@ -116,7 +115,7 @@ impl SnapshotGenerator for AppendGenerator { fn do_generate_new_snapshot( &self, schema: TableSchema, - cluster_key_meta: Option, + cluster_key_id: Option, previous: &Option>, prev_table_seq: Option, table_name: &str, @@ -193,11 +192,7 @@ impl SnapshotGenerator for AppendGenerator { .cloned() .collect(); - merge_statistics_mut( - &mut new_summary, - &summary, - cluster_key_meta.clone().map(|v| v.0), - ); + merge_statistics_mut(&mut new_summary, &summary, cluster_key_id); } } @@ -235,7 +230,6 @@ impl SnapshotGenerator for AppendGenerator { schema, new_summary, new_segments, - cluster_key_meta, table_statistics_location, )) } diff --git a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs index ecc768366d3b4..2235d87c5e4f7 100644 --- a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs @@ -20,7 +20,6 @@ use databend_common_exception::Result; use databend_common_expression::TableSchema; use databend_common_metrics::storage::*; use databend_common_sql::executor::physical_plans::MutationKind; -use databend_storages_common_table_meta::meta::ClusterKey; use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::readers::snapshot_reader::TableSnapshotAccessor; use log::info; @@ -60,12 +59,11 @@ impl SnapshotGenerator for MutationGenerator { fn do_generate_new_snapshot( &self, schema: TableSchema, - cluster_key_meta: Option, + cluster_key_id: Option, previous: &Option>, prev_table_seq: Option, _table_name: &str, ) -> Result { - let default_cluster_key_id = cluster_key_meta.clone().map(|v| v.0); match &self.conflict_resolve_ctx { ConflictResolveContext::ModifiedSegmentExistsInLatest(ctx) => { if let Some((removed, replaced)) = @@ -87,7 +85,7 @@ impl SnapshotGenerator for MutationGenerator { let mut new_summary = merge_statistics( previous.summary(), &ctx.merged_statistics, - default_cluster_key_id, + cluster_key_id, ); deduct_statistics_mut(&mut new_summary, &ctx.removed_statistics); let new_snapshot = TableSnapshot::new( @@ -98,7 +96,6 @@ impl SnapshotGenerator for MutationGenerator { schema, new_summary, new_segments, - cluster_key_meta, previous.table_statistics_location(), ); diff --git a/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs b/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs index 9310931dc6c6e..76c12d14e00ed 100644 --- a/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::TableSchema; use databend_storages_common_session::TxnManagerRef; -use databend_storages_common_table_meta::meta::ClusterKey; use databend_storages_common_table_meta::meta::TableSnapshot; use crate::operations::common::ConflictResolveContext; @@ -41,7 +40,7 @@ pub trait SnapshotGenerator { fn generate_new_snapshot( &self, schema: TableSchema, - cluster_key_meta: Option, + cluster_key_id: Option, previous: Option>, prev_table_seq: Option, txn_mgr: TxnManagerRef, @@ -50,7 +49,7 @@ pub trait SnapshotGenerator { ) -> Result { let mut snapshot = self.do_generate_new_snapshot( schema, - cluster_key_meta, + cluster_key_id, &previous, prev_table_seq, table_name, @@ -76,7 +75,7 @@ pub trait SnapshotGenerator { fn do_generate_new_snapshot( &self, schema: TableSchema, - cluster_key_meta: Option, + cluster_key_id: Option, previous: &Option>, prev_table_seq: Option, table_name: &str, diff --git a/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs b/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs index c6c7834e29435..f3448299fc4fb 100644 --- a/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::TableSchema; -use databend_storages_common_table_meta::meta::ClusterKey; use databend_storages_common_table_meta::meta::TableSnapshot; use uuid::Uuid; @@ -57,7 +56,7 @@ impl SnapshotGenerator for TruncateGenerator { fn do_generate_new_snapshot( &self, schema: TableSchema, - cluster_key_meta: Option, + _cluster_key_id: Option, previous: &Option>, prev_table_seq: Option, _table_name: &str, @@ -79,7 +78,6 @@ impl SnapshotGenerator for TruncateGenerator { schema, Default::default(), vec![], - cluster_key_meta, None, ); Ok(new_snapshot) diff --git a/src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs b/src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs index 49e76d5332dd2..9fbb28a4467ee 100644 --- a/src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs @@ -250,7 +250,7 @@ async fn build_update_table_meta_req( let previous = fuse_table.read_table_snapshot().await?; let snapshot = snapshot_generator.generate_new_snapshot( table.schema().as_ref().clone(), - fuse_table.cluster_key_meta.clone(), + fuse_table.cluster_key_id(), previous, Some(fuse_table.table_info.ident.seq), txn_mgr, diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 551d0aa2c1c30..413a6c1170c44 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -34,7 +34,6 @@ use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::Processor; use databend_common_pipeline_core::processors::ProcessorPtr; -use databend_storages_common_table_meta::meta::ClusterKey; use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::SnapshotId; use databend_storages_common_table_meta::meta::TableSnapshot; @@ -60,7 +59,7 @@ enum State { RefreshTable, GenerateSnapshot { previous: Option>, - cluster_key_meta: Option, + cluster_key_id: Option, table_info: TableInfo, }, TryCommit { @@ -258,7 +257,7 @@ where F: SnapshotGenerator + Send + 'static match std::mem::replace(&mut self.state, State::None) { State::GenerateSnapshot { previous, - cluster_key_meta, + cluster_key_id, table_info, } => { let change_tracking_enabled_during_commit = { @@ -292,7 +291,7 @@ where F: SnapshotGenerator + Send + 'static let schema = self.table.schema().as_ref().clone(); match self.snapshot_gen.generate_new_snapshot( schema, - cluster_key_meta, + cluster_key_id, previous, Some(table_info.ident.seq), self.ctx.txn_mgr(), @@ -357,7 +356,7 @@ where F: SnapshotGenerator + Send + 'static self.state = State::GenerateSnapshot { previous, - cluster_key_meta: fuse_table.cluster_key_meta.clone(), + cluster_key_id: fuse_table.cluster_key_id(), table_info, }; } @@ -507,10 +506,9 @@ where F: SnapshotGenerator + Send + 'static self.table = self.table.refresh(self.ctx.as_ref()).await?; let fuse_table = FuseTable::try_from_table(self.table.as_ref())?.to_owned(); let previous = fuse_table.read_table_snapshot().await?; - let cluster_key_meta = fuse_table.cluster_key_meta.clone(); self.state = State::GenerateSnapshot { previous, - cluster_key_meta, + cluster_key_id: fuse_table.cluster_key_id(), table_info: fuse_table.table_info.clone(), }; } diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test index 0d9d0ea33133e..44e5f9b185435 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test @@ -603,7 +603,7 @@ select average_depth from clustering_information('db_09_0008','t10') query I select count() from fuse_snapshot('db_09_0008', 't10') ---- -8 +7 statement ok optimize table t10 purge limit 2 @@ -611,7 +611,7 @@ optimize table t10 purge limit 2 query I select count() from fuse_snapshot('db_09_0008', 't10') ---- -6 +5 diff --git a/tests/sqllogictests/suites/mode/cluster/distributed_compact.sql.test b/tests/sqllogictests/suites/mode/cluster/distributed_compact.sql.test index c2b5854adfe10..1f32fe3d93003 100644 --- a/tests/sqllogictests/suites/mode/cluster/distributed_compact.sql.test +++ b/tests/sqllogictests/suites/mode/cluster/distributed_compact.sql.test @@ -64,7 +64,7 @@ optimize table t_compact_0 compact query I select count() from fuse_snapshot('default', 't_compact_0') ---- -6 +5 query II select count(),sum(a) from t_compact_0