From 26d0008ce093527ee1f00e4532f8148c7d4a5c53 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Mon, 13 Jan 2025 11:21:40 +0800 Subject: [PATCH 01/18] chore(ci): presign artifact for download (#17253) --- .github/actions/artifact_upload/action.yml | 8 ++++++++ Cargo.toml | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/.github/actions/artifact_upload/action.yml b/.github/actions/artifact_upload/action.yml index 290e12cf80e40..5e7f19001c101 100644 --- a/.github/actions/artifact_upload/action.yml +++ b/.github/actions/artifact_upload/action.yml @@ -41,9 +41,17 @@ runs: if: env.RUNNER_PROVIDER == 'aws' shell: bash run: | + echo "### artifacts 🚀" >> $GITHUB_STEP_SUMMARY aws s3 sync ./target/${{ inputs.target }}/${{ env.BUILD_PROFILE }}/ \ s3://databend-ci/${{ env.BUILD_PROFILE }}/${{ inputs.sha }}/${{ inputs.target }}/${{ inputs.category }}/ \ --exclude "*" --include "databend-*" --no-progress + artifacts="meta,query,query.debug" + for artifact in ${artifacts//,/ }; do + if [[ -f ./target/${{ inputs.target }}/${{ env.BUILD_PROFILE }}/databend-${artifact} ]]; then + url=$(aws s3 presign s3://databend-ci/${{ env.BUILD_PROFILE }}/${{ inputs.sha }}/${{ inputs.target }}/${{ inputs.category }}/databend-${artifact} --expires-in 21600) + echo "- [${artifact}](${url})" >> $GITHUB_STEP_SUMMARY + fi + done - name: Upload artifact to gcs if: env.RUNNER_PROVIDER == 'gcp' diff --git a/Cargo.toml b/Cargo.toml index aab527695d091..1259a7be28ef4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -586,7 +586,7 @@ result_large_err = "allow" debug = 1 lto = "thin" overflow-checks = false -opt-level = "s" ## defaults to be 3 +opt-level = "s" # defaults to be 3 incremental = true [profile.ci] From 4fa72b1fd4ce8d1ca307e144d95d67cfda527b8b Mon Sep 17 00:00:00 2001 From: TCeason <33082201+TCeason@users.noreply.github.com> Date: Mon, 13 Jan 2025 12:24:09 +0800 Subject: [PATCH 02/18] refactor: create/drop_warehouse return WarehouseInfo (#17254) --- .../resources_management_kubernetes.rs | 4 +- .../resources_management_self_managed.rs | 4 +- .../resources_management_system.rs | 8 +++- .../src/resources_management.rs | 12 ++++-- .../management/src/warehouse/warehouse_api.rs | 8 +++- .../management/src/warehouse/warehouse_mgr.rs | 36 ++++++++++-------- src/query/management/tests/it/warehouse.rs | 38 +++++++++++++++++-- 7 files changed, 80 insertions(+), 30 deletions(-) diff --git a/src/query/ee/src/resource_management/resources_management_kubernetes.rs b/src/query/ee/src/resource_management/resources_management_kubernetes.rs index 478c0e49e0ae4..dc0880a7b98dd 100644 --- a/src/query/ee/src/resource_management/resources_management_kubernetes.rs +++ b/src/query/ee/src/resource_management/resources_management_kubernetes.rs @@ -42,13 +42,13 @@ impl ResourcesManagement for KubernetesResourcesManagement { )) } - async fn create_warehouse(&self, _: String, _: Vec) -> Result<()> { + async fn create_warehouse(&self, _: String, _: Vec) -> Result { Err(ErrorCode::Unimplemented( "Unimplemented kubernetes resources management", )) } - async fn drop_warehouse(&self, _: String) -> Result<()> { + async fn drop_warehouse(&self, _: String) -> Result { Err(ErrorCode::Unimplemented( "Unimplemented kubernetes resources management", )) diff --git a/src/query/ee/src/resource_management/resources_management_self_managed.rs b/src/query/ee/src/resource_management/resources_management_self_managed.rs index 33d76f693166c..440e20b629b71 100644 --- a/src/query/ee/src/resource_management/resources_management_self_managed.rs +++ b/src/query/ee/src/resource_management/resources_management_self_managed.rs @@ -53,13 +53,13 @@ impl ResourcesManagement for SelfManagedResourcesManagement { Ok(()) } - async fn create_warehouse(&self, _: String, _: Vec) -> Result<()> { + async fn create_warehouse(&self, _: String, _: Vec) -> Result { Err(ErrorCode::Unimplemented( "Unimplemented create warehouse with self-managed resources management", )) } - async fn drop_warehouse(&self, _: String) -> Result<()> { + async fn drop_warehouse(&self, _: String) -> Result { Err(ErrorCode::Unimplemented( "Unimplemented drop warehouse with self-managed resources management", )) diff --git a/src/query/ee/src/resource_management/resources_management_system.rs b/src/query/ee/src/resource_management/resources_management_system.rs index 12c98d1b2aeb3..7128cb694adf8 100644 --- a/src/query/ee/src/resource_management/resources_management_system.rs +++ b/src/query/ee/src/resource_management/resources_management_system.rs @@ -62,11 +62,15 @@ impl ResourcesManagement for SystemResourcesManagement { Ok(()) } - async fn create_warehouse(&self, name: String, nodes: Vec) -> Result<()> { + async fn create_warehouse( + &self, + name: String, + nodes: Vec, + ) -> Result { self.warehouse_manager.create_warehouse(name, nodes).await } - async fn drop_warehouse(&self, name: String) -> Result<()> { + async fn drop_warehouse(&self, name: String) -> Result { self.warehouse_manager.drop_warehouse(name).await } diff --git a/src/query/ee_features/resources_management/src/resources_management.rs b/src/query/ee_features/resources_management/src/resources_management.rs index dd59b6b860337..ba7f88c36ee47 100644 --- a/src/query/ee_features/resources_management/src/resources_management.rs +++ b/src/query/ee_features/resources_management/src/resources_management.rs @@ -30,9 +30,13 @@ pub trait ResourcesManagement: Sync + Send + 'static { async fn init_node(&self, node: &mut NodeInfo) -> Result<()>; - async fn create_warehouse(&self, name: String, nodes: Vec) -> Result<()>; + async fn create_warehouse( + &self, + name: String, + nodes: Vec, + ) -> Result; - async fn drop_warehouse(&self, name: String) -> Result<()>; + async fn drop_warehouse(&self, name: String) -> Result; async fn resume_warehouse(&self, name: String) -> Result<()>; @@ -91,11 +95,11 @@ impl ResourcesManagement for DummyResourcesManagement { Ok(()) } - async fn create_warehouse(&self, _: String, _: Vec) -> Result<()> { + async fn create_warehouse(&self, _: String, _: Vec) -> Result { Err(ErrorCode::Unimplemented("The use of this feature requires a Databend Enterprise Edition license. To unlock enterprise features, please contact Databend to obtain a license. Learn more at https://docs.databend.com/guides/overview/editions/dee/")) } - async fn drop_warehouse(&self, _: String) -> Result<()> { + async fn drop_warehouse(&self, _: String) -> Result { Err(ErrorCode::Unimplemented("The use of this feature requires a Databend Enterprise Edition license. To unlock enterprise features, please contact Databend to obtain a license. Learn more at https://docs.databend.com/guides/overview/editions/dee/")) } diff --git a/src/query/management/src/warehouse/warehouse_api.rs b/src/query/management/src/warehouse/warehouse_api.rs index aa1afecaaee9a..ed4f92c4610c5 100644 --- a/src/query/management/src/warehouse/warehouse_api.rs +++ b/src/query/management/src/warehouse/warehouse_api.rs @@ -73,9 +73,13 @@ pub trait WarehouseApi: Sync + Send { /// Keep the tenant's cluster node alive. async fn heartbeat_node(&self, node: &mut NodeInfo, seq: u64) -> Result; - async fn drop_warehouse(&self, warehouse: String) -> Result<()>; + async fn drop_warehouse(&self, warehouse: String) -> Result; - async fn create_warehouse(&self, warehouse: String, nodes: Vec) -> Result<()>; + async fn create_warehouse( + &self, + warehouse: String, + nodes: Vec, + ) -> Result; async fn resume_warehouse(&self, warehouse: String) -> Result<()>; diff --git a/src/query/management/src/warehouse/warehouse_mgr.rs b/src/query/management/src/warehouse/warehouse_mgr.rs index 13a2ea51065d8..b0b563d209094 100644 --- a/src/query/management/src/warehouse/warehouse_mgr.rs +++ b/src/query/management/src/warehouse/warehouse_mgr.rs @@ -1024,7 +1024,7 @@ impl WarehouseApi for WarehouseMgr { } } - async fn drop_warehouse(&self, warehouse: String) -> Result<()> { + async fn drop_warehouse(&self, warehouse: String) -> Result { if warehouse.is_empty() { return Err(ErrorCode::InvalidWarehouse("Warehouse name is empty.")); } @@ -1075,7 +1075,7 @@ impl WarehouseApi for WarehouseMgr { continue; } - return Ok(()); + return Ok(warehouse_snapshot.warehouse_info); } Err(ErrorCode::WarehouseOperateConflict( @@ -1083,7 +1083,11 @@ impl WarehouseApi for WarehouseMgr { )) } - async fn create_warehouse(&self, warehouse: String, nodes: Vec) -> Result<()> { + async fn create_warehouse( + &self, + warehouse: String, + nodes: Vec, + ) -> Result { if warehouse.is_empty() { return Err(ErrorCode::InvalidWarehouse("Warehouse name is empty.")); } @@ -1129,26 +1133,28 @@ impl WarehouseApi for WarehouseMgr { let warehouse_info_key = self.warehouse_info_key(&warehouse)?; + let warehouse_info = WarehouseInfo::SystemManaged(SystemManagedWarehouse { + role_id: GlobalUniqName::unique(), + status: "Running".to_string(), + id: warehouse.clone(), + clusters: HashMap::from([( + String::from(DEFAULT_CLUSTER_ID), + SystemManagedCluster { + nodes: nodes.clone(), + }, + )]), + }); + txn.condition .push(map_condition(&warehouse_info_key, MatchSeq::Exact(0))); txn.if_then.push(TxnOp::put( warehouse_info_key.clone(), - serde_json::to_vec(&WarehouseInfo::SystemManaged(SystemManagedWarehouse { - role_id: GlobalUniqName::unique(), - status: "Running".to_string(), - id: warehouse.clone(), - clusters: HashMap::from([( - String::from(DEFAULT_CLUSTER_ID), - SystemManagedCluster { - nodes: nodes.clone(), - }, - )]), - }))?, + serde_json::to_vec(&warehouse_info)?, )); txn.else_then.push(TxnOp::get(warehouse_info_key)); return match self.metastore.transaction(txn).await? { - res if res.success => Ok(()), + res if res.success => Ok(warehouse_info), res => match res.responses.last() { Some(TxnOpResponse { response: Some(Response::Get(res)), diff --git a/src/query/management/tests/it/warehouse.rs b/src/query/management/tests/it/warehouse.rs index aabbe97e920d3..84ef6a0bc111b 100644 --- a/src/query/management/tests/it/warehouse.rs +++ b/src/query/management/tests/it/warehouse.rs @@ -314,7 +314,22 @@ async fn test_successfully_create_system_managed_warehouse() -> Result<()> { SelectedNode::Random(None), ]); - create_warehouse.await?; + let cw = create_warehouse.await?; + + assert!( + !matches!(cw, WarehouseInfo::SelfManaged(_)), + "Expected WarehouseInfo to not be SelfManaged" + ); + if let WarehouseInfo::SystemManaged(sw) = cw { + assert_eq!(sw.id, "test_warehouse"); + for warehouse in warehouse_manager.list_warehouses().await? { + if let WarehouseInfo::SystemManaged(w) = warehouse { + if w.id == sw.id { + assert_eq!(w.role_id, sw.role_id) + } + } + } + } for node in &nodes { let online_node = format!("__fd_clusters_v6/test%2dtenant%2did/online_nodes/{}", node); @@ -526,7 +541,8 @@ async fn test_create_warehouse_with_self_manage() -> Result<()> { None, )]); - create_warehouse.await + assert!(create_warehouse.await.is_ok()); + Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -936,7 +952,23 @@ async fn test_drop_system_managed_warehouse() -> Result<()> { create_warehouse.await?; let drop_warehouse = warehouse_manager.drop_warehouse(String::from("test_warehouse")); - drop_warehouse.await?; + + let cw = drop_warehouse.await?; + + assert!( + !matches!(cw, WarehouseInfo::SelfManaged(_)), + "Expected WarehouseInfo to not be SelfManaged" + ); + if let WarehouseInfo::SystemManaged(sw) = cw { + assert_eq!(sw.id, "test_warehouse"); + for warehouse in warehouse_manager.list_warehouses().await? { + if let WarehouseInfo::SystemManaged(w) = warehouse { + if w.id == sw.id { + assert_eq!(w.role_id, sw.role_id) + } + } + } + } let create_warehouse = warehouse_manager.create_warehouse(String::from("test_warehouse"), vec![ From 00f4bd22b833155975ec101ecf442e6ba2e44dcf Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 13 Jan 2025 13:00:40 +0800 Subject: [PATCH 03/18] fix(query): increase state_rows in copy agg state rows (#17252) * fix(query): increase state_rows in copy agg state rows * fix(query): increase state_rows in copy agg state rows * fix(query): increase state_rows in copy agg state rows --- src/query/expression/src/aggregate/payload.rs | 1 + .../20+_others/20_0022_agg_memory.result | 6 +++ .../20+_others/20_0022_agg_memory.sh | 46 +++++++++++++++++++ 3 files changed, 53 insertions(+) create mode 100755 tests/suites/0_stateless/20+_others/20_0022_agg_memory.result create mode 100755 tests/suites/0_stateless/20+_others/20_0022_agg_memory.sh diff --git a/src/query/expression/src/aggregate/payload.rs b/src/query/expression/src/aggregate/payload.rs index 51cc3b579870e..562b2a899f9bf 100644 --- a/src/query/expression/src/aggregate/payload.rs +++ b/src/query/expression/src/aggregate/payload.rs @@ -350,6 +350,7 @@ impl Payload { ) } page.rows += 1; + page.state_rows += 1; if page.rows == page.capacity { (page, _) = self.writable_page(); diff --git a/tests/suites/0_stateless/20+_others/20_0022_agg_memory.result b/tests/suites/0_stateless/20+_others/20_0022_agg_memory.result new file mode 100755 index 0000000000000..f32baf89a114b --- /dev/null +++ b/tests/suites/0_stateless/20+_others/20_0022_agg_memory.result @@ -0,0 +1,6 @@ +executing 1 +executing 2 +executing 3 +executing 4 +executing 5 +Memory usage difference is less than 5% diff --git a/tests/suites/0_stateless/20+_others/20_0022_agg_memory.sh b/tests/suites/0_stateless/20+_others/20_0022_agg_memory.sh new file mode 100755 index 0000000000000..e8602a4ae66a0 --- /dev/null +++ b/tests/suites/0_stateless/20+_others/20_0022_agg_memory.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../../../shell_env.sh + + +## warmup +for i in `seq 1 2`;do + $BENDSQL_CLIENT_CONNECT --query=""" + select number::string , max(number::string), min(number::string), count(distinct number) from numbers(10000000) group by 1 ignore_result; + """ +done + + +PIDS=($(pgrep databend-query)) +BEFORE_MEM=0 +for PID in "${PIDS[@]}"; do + MEM=$(ps -o rss= -p $PID | tail -n 1) + BEFORE_MEM=$((BEFORE_MEM + MEM)) +done + + +for i in `seq 1 5`;do + echo "executing $i" + $BENDSQL_CLIENT_CONNECT --query=""" + select number::string , max(number::string), min(number::string), count(distinct number) from numbers(10000000) group by 1 ignore_result; + """ +done + +sleep 15 + + +AFTER_MEM=0 +for PID in "${PIDS[@]}"; do + MEM=$(ps -o rss= -p $PID | tail -n 1) + AFTER_MEM=$((AFTER_MEM + MEM)) +done + +# Calculate the difference in percentage +DIFF=$(awk -v before=$BEFORE_MEM -v after=$AFTER_MEM 'BEGIN {print (after-before)/before * 100}') + +# Check if the difference is less than 5% +if (( $(awk -v diff=$DIFF 'BEGIN {print (diff < 5)}') )); then + echo "Memory usage difference is less than 5%" +else + echo "Memory usage difference is greater than 5%, before ${BEFORE_MEM} ${AFTER_MEM}" +fi \ No newline at end of file From e6cf9d1599d12129fe0b5752dabe4dc661ba5940 Mon Sep 17 00:00:00 2001 From: TCeason <33082201+TCeason@users.noreply.github.com> Date: Mon, 13 Jan 2025 15:19:15 +0800 Subject: [PATCH 04/18] refactor: only set global settings need to check super privilege (#17255) query setting, session setting, set variable will not check privileges --- .../interpreters/access/privilege_access.rs | 16 ++++++++++++++- .../18_rbac/18_0007_privilege_access.result | 6 ++++++ .../18_rbac/18_0007_privilege_access.sh | 20 +++++++++++++++++++ 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index 4c5df38fb8068..6c15c2369f58c 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -1151,7 +1151,21 @@ impl AccessChecker for PrivilegeAccess { self.validate_access(&GrantObject::Global, UserPrivilegeType::Grant,false, false) .await?; } - Plan::Set(_) | Plan::Unset(_) | Plan::Kill(_) | Plan::SetPriority(_) | Plan::System(_) => { + Plan::Set(plan) => { + use databend_common_ast::ast::SetType; + if let SetType::SettingsGlobal = plan.set_type { + self.validate_access(&GrantObject::Global, UserPrivilegeType::Super, false, false) + .await?; + } + } + Plan::Unset(plan) => { + use databend_common_ast::ast::SetType; + if let SetType::SettingsGlobal = plan.unset_type { + self.validate_access(&GrantObject::Global, UserPrivilegeType::Super, false, false) + .await?; + } + } + Plan::Kill(_) | Plan::SetPriority(_) | Plan::System(_) => { self.validate_access(&GrantObject::Global, UserPrivilegeType::Super, false, false) .await?; } diff --git a/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.result b/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.result index fd9a369f184f9..1bb4bd20095b0 100644 --- a/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.result +++ b/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.result @@ -142,3 +142,9 @@ OWNERSHIP default.default.t2 USER b GRANT OWNERSHIP ON 'default'.'default'.'t2' 1 2 3 +=== set privilege check === +100 +100 +1 +1 +=== set privilege check succ === diff --git a/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.sh b/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.sh index ebbc772282056..0c663d9db0df7 100755 --- a/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.sh +++ b/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.sh @@ -302,3 +302,23 @@ echo "drop table if exists t1" | $BENDSQL_CLIENT_CONNECT echo "drop table if exists t2" | $BENDSQL_CLIENT_CONNECT echo "drop stage if exists s3;" | $BENDSQL_CLIENT_CONNECT echo "drop database if exists db01" | $BENDSQL_CLIENT_CONNECT + +echo "=== set privilege check ===" +echo "drop user if exists c" | $BENDSQL_CLIENT_CONNECT +echo "create user c identified by '123'" | $BENDSQL_CLIENT_CONNECT +export USER_C_CONNECT="bendsql --user=c --password=123 --host=${QUERY_MYSQL_HANDLER_HOST} --port ${QUERY_HTTP_HANDLER_PORT}" +echo "set session max_threads=1000" | $BENDSQL_CLIENT_CONNECT +echo "unset session max_threads" | $BENDSQL_CLIENT_CONNECT +echo "settings (ddl_column_type_nullable=0) select 100" | $BENDSQL_CLIENT_CONNECT +echo "SET variable a = 'a';" | $BENDSQL_CLIENT_CONNECT +echo "set global max_threads=1000" | $BENDSQL_CLIENT_CONNECT +echo "unset global max_threads" | $BENDSQL_CLIENT_CONNECT + +echo "set session max_threads=1000" | $USER_C_CONNECT +echo "unset session max_threads" | $USER_C_CONNECT +echo "settings (ddl_column_type_nullable=0) select 100" | $USER_C_CONNECT +echo "SET variable a = 'a';" | $USER_C_CONNECT +echo "set global max_threads=1000;" | $USER_C_CONNECT 2>&1 | grep "Super" | wc -l +echo "unset global max_threads;" | $USER_C_CONNECT 2>&1 | grep "Super" | wc -l +echo "drop user if exists c" | $BENDSQL_CLIENT_CONNECT +echo "=== set privilege check succ ===" From 61e962e531c51166b1f3dcd426e612e01a224183 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 13 Jan 2025 20:17:22 +0800 Subject: [PATCH 05/18] fix(query): hash table scatter will always send agg meta (#17245) * fix(query): hash table scatter will always send agg meta * fix(query): hash table scatter will always send agg meta * fix(query): z * fix(query): z * update * update * update * z * z * z * z * z * z * z --- src/query/expression/src/aggregate/payload.rs | 11 +++--- .../expression/src/aggregate/payload_flush.rs | 4 +- .../aggregator/aggregate_exchange_injector.rs | 26 +++++-------- .../new_transform_partition_bucket.rs | 37 ++++++++++++------- .../transforms/aggregator/serde/serde_meta.rs | 22 +++++------ .../serde/transform_aggregate_serializer.rs | 32 ++++++++++++++-- .../serde/transform_deserializer.rs | 32 ++++++++++------ ...transform_exchange_aggregate_serializer.rs | 21 +++++------ .../aggregator/transform_aggregate_final.rs | 1 + .../transforms/range_join/ie_join_state.rs | 4 ++ .../processors/transforms/transform_srf.rs | 3 ++ 11 files changed, 115 insertions(+), 78 deletions(-) diff --git a/src/query/expression/src/aggregate/payload.rs b/src/query/expression/src/aggregate/payload.rs index 562b2a899f9bf..f08e61280f5cc 100644 --- a/src/query/expression/src/aggregate/payload.rs +++ b/src/query/expression/src/aggregate/payload.rs @@ -402,15 +402,14 @@ impl Payload { true } - pub fn empty_block(&self) -> DataBlock { - let columns = self - .aggrs - .iter() - .map(|f| ColumnBuilder::with_capacity(&f.return_type().unwrap(), 0).build()) + pub fn empty_block(&self, fake_rows: Option) -> DataBlock { + let fake_rows = fake_rows.unwrap_or(0); + let columns = (0..self.aggrs.len()) + .map(|_| ColumnBuilder::repeat_default(&DataType::Binary, fake_rows).build()) .chain( self.group_types .iter() - .map(|t| ColumnBuilder::with_capacity(t, 0).build()), + .map(|t| ColumnBuilder::repeat_default(t, fake_rows).build()), ) .collect_vec(); DataBlock::new_from_columns(columns) diff --git a/src/query/expression/src/aggregate/payload_flush.rs b/src/query/expression/src/aggregate/payload_flush.rs index 632ec78f69990..e4b4735394c17 100644 --- a/src/query/expression/src/aggregate/payload_flush.rs +++ b/src/query/expression/src/aggregate/payload_flush.rs @@ -121,7 +121,7 @@ impl Payload { } if blocks.is_empty() { - return Ok(self.empty_block()); + return Ok(self.empty_block(None)); } DataBlock::concat(&blocks) } @@ -173,7 +173,7 @@ impl Payload { } if blocks.is_empty() { - return Ok(self.empty_block()); + return Ok(self.empty_block(None)); } DataBlock::concat(&blocks) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs index 482920b53571e..dacfcc0826b08 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs @@ -173,26 +173,20 @@ impl FlightScatter for HashTableHashScatter { AggregateMeta::Partitioned { .. } => unreachable!(), AggregateMeta::AggregateSpilling(payload) => { for p in scatter_partitioned_payload(payload, self.buckets)? { - blocks.push(match p.len() == 0 { - true => DataBlock::empty(), - false => DataBlock::empty_with_meta( - AggregateMeta::create_agg_spilling(p), - ), - }); + blocks.push(DataBlock::empty_with_meta( + AggregateMeta::create_agg_spilling(p), + )); } } AggregateMeta::AggregatePayload(p) => { for payload in scatter_payload(p.payload, self.buckets)? { - blocks.push(match payload.len() == 0 { - true => DataBlock::empty(), - false => { - DataBlock::empty_with_meta(AggregateMeta::create_agg_payload( - p.bucket, - payload, - p.max_partition_count, - )) - } - }); + blocks.push(DataBlock::empty_with_meta( + AggregateMeta::create_agg_payload( + p.bucket, + payload, + p.max_partition_count, + ), + )); } } }; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs index 0e35ebaed73b2..39db4c52ca273 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs @@ -212,6 +212,7 @@ impl NewTransformPartitionBucket { #[allow(unused_assignments)] fn add_bucket(&mut self, mut data_block: DataBlock) -> Result<(isize, usize)> { let (mut bucket, mut partition_count) = (0, 0); + let mut is_empty_block = false; if let Some(block_meta) = data_block.get_meta() { if let Some(block_meta) = AggregateMeta::downcast_ref_from(block_meta) { (bucket, partition_count) = match block_meta { @@ -250,7 +251,11 @@ impl NewTransformPartitionBucket { if let Some(AggregateMeta::Spilled(buckets_payload)) = AggregateMeta::downcast_from(meta) { - let partition_count = buckets_payload[0].max_partition_count; + let partition_count = if !buckets_payload.is_empty() { + buckets_payload[0].max_partition_count + } else { + MAX_PARTITION_COUNT + }; self.max_partition_count = self.max_partition_count.max(partition_count); @@ -274,12 +279,14 @@ impl NewTransformPartitionBucket { unreachable!() } AggregateMeta::Serialized(payload) => { + is_empty_block = payload.data_block.is_empty(); self.max_partition_count = self.max_partition_count.max(payload.max_partition_count); (payload.bucket, payload.max_partition_count) } AggregateMeta::AggregatePayload(payload) => { + is_empty_block = payload.payload.len() == 0; self.max_partition_count = self.max_partition_count.max(payload.max_partition_count); @@ -298,23 +305,25 @@ impl NewTransformPartitionBucket { )); } - if self.all_inputs_init { - if partition_count != self.max_partition_count { - return Err(ErrorCode::Internal( + if !is_empty_block { + if self.all_inputs_init { + if partition_count != self.max_partition_count { + return Err(ErrorCode::Internal( "Internal, the partition count does not equal the max partition count on TransformPartitionBucket. ", )); - } - match self.buckets_blocks.entry(bucket) { - Entry::Vacant(v) => { - v.insert(vec![data_block]); - } - Entry::Occupied(mut v) => { - v.get_mut().push(data_block); } - }; - } else { - self.unpartitioned_blocks.push(data_block); + match self.buckets_blocks.entry(bucket) { + Entry::Vacant(v) => { + v.insert(vec![data_block]); + } + Entry::Occupied(mut v) => { + v.get_mut().push(data_block); + } + }; + } else { + self.unpartitioned_blocks.push(data_block); + } } Ok((bucket, partition_count)) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs index 731ec4e1b1049..b83cf2c97c90e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs @@ -31,21 +31,15 @@ pub struct AggregateSerdeMeta { pub columns_layout: Vec, // use for new agg hashtable pub max_partition_count: usize, + pub is_empty: bool, } impl AggregateSerdeMeta { - pub fn create(bucket: isize) -> BlockMetaInfoPtr { - Box::new(AggregateSerdeMeta { - typ: BUCKET_TYPE, - bucket, - location: None, - data_range: None, - columns_layout: vec![], - max_partition_count: 0, - }) - } - - pub fn create_agg_payload(bucket: isize, max_partition_count: usize) -> BlockMetaInfoPtr { + pub fn create_agg_payload( + bucket: isize, + max_partition_count: usize, + is_empty: bool, + ) -> BlockMetaInfoPtr { Box::new(AggregateSerdeMeta { typ: BUCKET_TYPE, bucket, @@ -53,6 +47,7 @@ impl AggregateSerdeMeta { data_range: None, columns_layout: vec![], max_partition_count, + is_empty, }) } @@ -61,6 +56,7 @@ impl AggregateSerdeMeta { location: String, data_range: Range, columns_layout: Vec, + is_empty: bool, ) -> BlockMetaInfoPtr { Box::new(AggregateSerdeMeta { typ: SPILLED_TYPE, @@ -69,6 +65,7 @@ impl AggregateSerdeMeta { location: Some(location), data_range: Some(data_range), max_partition_count: 0, + is_empty, }) } @@ -86,6 +83,7 @@ impl AggregateSerdeMeta { location: Some(location), data_range: Some(data_range), max_partition_count, + is_empty: false, }) } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs index bd3bdb24d6339..096485fa98fcc 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs @@ -183,6 +183,7 @@ pub struct SerializeAggregateStream { pub payload: Pin>, flush_state: PayloadFlushState, end_iter: bool, + nums: usize, } unsafe impl Send for SerializeAggregateStream {} @@ -198,6 +199,7 @@ impl SerializeAggregateStream { flush_state: PayloadFlushState::default(), _params: params.clone(), end_iter: false, + nums: 0, } } } @@ -225,10 +227,32 @@ impl SerializeAggregateStream { } match block { - Some(block) => Ok(Some(block.add_meta(Some( - AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count), - ))?)), - None => Ok(None), + Some(block) => { + self.nums += 1; + Ok(Some(block.add_meta(Some( + AggregateSerdeMeta::create_agg_payload( + p.bucket, + p.max_partition_count, + false, + ), + ))?)) + } + None => { + // always return at least one block + if self.nums == 0 { + self.nums += 1; + let block = p.payload.empty_block(Some(1)); + Ok(Some(block.add_meta(Some( + AggregateSerdeMeta::create_agg_payload( + p.bucket, + p.max_partition_count, + true, + ), + ))?)) + } else { + Ok(None) + } + } } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs index c73799432d17d..ed82428bbc7d2 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs @@ -89,18 +89,26 @@ impl TransformDeserializer { } Some(meta) => { return match meta.typ == BUCKET_TYPE { - true => Ok(DataBlock::empty_with_meta( - AggregateMeta::create_serialized( - meta.bucket, - deserialize_block( - dict, - fragment_data, - &self.schema, - self.arrow_schema.clone(), - )?, - meta.max_partition_count, - ), - )), + true => { + let mut block = deserialize_block( + dict, + fragment_data, + &self.schema, + self.arrow_schema.clone(), + )?; + + if meta.is_empty { + block = block.slice(0..0); + } + + Ok(DataBlock::empty_with_meta( + AggregateMeta::create_serialized( + meta.bucket, + block, + meta.max_partition_count, + ), + )) + } false => { let data_schema = Arc::new(exchange_defines::spilled_schema()); let arrow_schema = Arc::new(exchange_defines::spilled_arrow_schema()); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index 65e9e5cdc513a..a034b86038acb 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -149,6 +149,8 @@ impl BlockMetaTransform for TransformExchangeAggregateSeria } Some(AggregateMeta::AggregatePayload(p)) => { + let (bucket, max_partition_count) = (p.bucket, p.max_partition_count); + if index == self.local_pos { serialized_blocks.push(FlightSerialized::DataBlock( block.add_meta(Some(Box::new(AggregateMeta::AggregatePayload(p))))?, @@ -156,24 +158,19 @@ impl BlockMetaTransform for TransformExchangeAggregateSeria continue; } - let bucket = compute_block_number(p.bucket, p.max_partition_count)?; + let block_number = compute_block_number(bucket, max_partition_count)?; let stream = SerializeAggregateStream::create( &self.params, SerializePayload::AggregatePayload(p), ); let mut stream_blocks = stream.into_iter().collect::>>()?; - - if stream_blocks.is_empty() { - serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); - } else { - let mut c = DataBlock::concat(&stream_blocks)?; - if let Some(meta) = stream_blocks[0].take_meta() { - c.replace_meta(meta); - } - - let c = serialize_block(bucket, c, &self.options)?; - serialized_blocks.push(FlightSerialized::DataBlock(c)); + debug_assert!(!stream_blocks.is_empty()); + let mut c = DataBlock::concat(&stream_blocks)?; + if let Some(meta) = stream_blocks[0].take_meta() { + c.replace_meta(meta); } + let c = serialize_block(block_number, c, &self.options)?; + serialized_blocks.push(FlightSerialized::DataBlock(c)); } }; } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs index 2a719a23c23d8..beded12043100 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs @@ -59,6 +59,7 @@ impl TransformFinalAggregate { AggregateMeta::Serialized(payload) => match agg_hashtable.as_mut() { Some(ht) => { debug_assert!(bucket == payload.bucket); + let payload = payload.convert_to_partitioned_payload( self.params.group_data_types.clone(), self.params.aggregate_functions.clone(), diff --git a/src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs b/src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs index 5c44b2be9cbf6..24991a001a112 100644 --- a/src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs @@ -125,6 +125,10 @@ impl IEJoinState { fn intersection(&self, left_block: &DataBlock, right_block: &DataBlock) -> bool { let left_len = left_block.num_rows(); let right_len = right_block.num_rows(); + if left_len == 0 || right_len == 0 { + return false; + } + let left_l1_column = left_block.columns()[0] .value .convert_to_full_column(&self.l1_data_type, left_len); diff --git a/src/query/service/src/pipelines/processors/transforms/transform_srf.rs b/src/query/service/src/pipelines/processors/transforms/transform_srf.rs index 63e6b94b3bab5..ab71d492ed2cf 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_srf.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_srf.rs @@ -113,6 +113,9 @@ impl BlockingTransform for TransformSRF { } let input = self.input.take().unwrap(); + if input.is_empty() { + return Ok(None); + } let mut result_size = 0; let mut used = 0; From 15c2435597f9013e46ed7533d8b18d8d63488d51 Mon Sep 17 00:00:00 2001 From: baishen Date: Tue, 14 Jan 2025 08:28:45 +0800 Subject: [PATCH 06/18] chore(sqllogictests): Check container and stop first to avoid conflict running (#17261) * chore(sqllogictests): Stop container to avoid conflict running * add sleep * check timeout --- tests/sqllogictests/src/util.rs | 38 +++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/tests/sqllogictests/src/util.rs b/tests/sqllogictests/src/util.rs index c985d6151f4a5..949ea98055d71 100644 --- a/tests/sqllogictests/src/util.rs +++ b/tests/sqllogictests/src/util.rs @@ -19,18 +19,21 @@ use std::path::PathBuf; use std::time::Duration; use std::time::Instant; +use bollard::container::RemoveContainerOptions; use bollard::Docker; use clap::Parser; use redis::Commands; use serde::Deserialize; use serde::Serialize; use serde_json::Value; +use testcontainers::core::error::WaitContainerError; use testcontainers::core::IntoContainerPort; use testcontainers::core::WaitFor; use testcontainers::runners::AsyncRunner; use testcontainers::ContainerAsync; use testcontainers::GenericImage; use testcontainers::ImageExt; +use testcontainers::TestcontainersError; use testcontainers_modules::mysql::Mysql; use testcontainers_modules::redis::Redis; use testcontainers_modules::redis::REDIS_PORT; @@ -256,9 +259,7 @@ pub async fn run_ttc_container( let start = Instant::now(); println!("Start container {container_name}"); - // Stop the container - let _ = docker.stop_container(&container_name, None).await; - let _ = docker.remove_container(&container_name, None).await; + stop_container(docker, &container_name).await; let mut i = 1; loop { @@ -290,6 +291,10 @@ pub async fn run_ttc_container( "Start container {} using {} secs failed: {}", container_name, duration, err ); + if let TestcontainersError::WaitContainer(WaitContainerError::StartupTimeout) = err + { + stop_container(docker, &container_name).await; + } if i == CONTAINER_RETRY_TIMES || duration >= CONTAINER_TIMEOUT_SECONDS { break; } else { @@ -327,9 +332,7 @@ async fn run_redis_server(docker: &Docker) -> Result> { let container_name = "redis".to_string(); println!("Start container {container_name}"); - // Stop the container - let _ = docker.stop_container(&container_name, None).await; - let _ = docker.remove_container(&container_name, None).await; + stop_container(docker, &container_name).await; let mut i = 1; loop { @@ -365,6 +368,10 @@ async fn run_redis_server(docker: &Docker) -> Result> { "Start container {} using {} secs failed: {}", container_name, duration, err ); + if let TestcontainersError::WaitContainer(WaitContainerError::StartupTimeout) = err + { + stop_container(docker, &container_name).await; + } if i == CONTAINER_RETRY_TIMES || duration >= CONTAINER_TIMEOUT_SECONDS { break; } else { @@ -381,9 +388,7 @@ async fn run_mysql_server(docker: &Docker) -> Result> { let container_name = "mysql".to_string(); println!("Start container {container_name}"); - // Stop the container - let _ = docker.stop_container(&container_name, None).await; - let _ = docker.remove_container(&container_name, None).await; + stop_container(docker, &container_name).await; // Add a table for test. // CREATE TABLE test.user( @@ -436,6 +441,10 @@ async fn run_mysql_server(docker: &Docker) -> Result> { "Start container {} using {} secs failed: {}", container_name, duration, err ); + if let TestcontainersError::WaitContainer(WaitContainerError::StartupTimeout) = err + { + stop_container(docker, &container_name).await; + } if i == CONTAINER_RETRY_TIMES || duration >= CONTAINER_TIMEOUT_SECONDS { break; } else { @@ -446,3 +455,14 @@ async fn run_mysql_server(docker: &Docker) -> Result> { } Err(format!("Start {container_name} failed").into()) } + +// Stop the running container to avoid conflict +async fn stop_container(docker: &Docker, container_name: &str) { + let _ = docker.stop_container(container_name, None).await; + let options = Some(RemoveContainerOptions { + force: true, + ..Default::default() + }); + let _ = docker.remove_container(container_name, options).await; + println!("Stop container {container_name}"); +} From 009a5ebac5829b51e8499f441f67438fa4e9f656 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 14 Jan 2025 10:32:00 +0800 Subject: [PATCH 07/18] chore: Bump opendal to pinned tag version (#17259) Signed-off-by: Xuanwo --- Cargo.lock | 44 +++++++++++++++++++++++--------------------- Cargo.toml | 4 ++-- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 10b186b7710c4..9ea8284d9192e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2931,7 +2931,7 @@ dependencies = [ "databend-storages-common-table-meta", "limits-rs", "log", - "opendal 0.51.0", + "opendal 0.51.1", "serde", "serde_json", "serfig", @@ -3209,7 +3209,7 @@ dependencies = [ "libc", "object", "once_cell", - "opendal 0.51.0", + "opendal 0.51.1", "parquet", "paste", "prost", @@ -3559,7 +3559,7 @@ dependencies = [ "maplit", "num-derive", "num-traits", - "opendal 0.51.0", + "opendal 0.51.1", "paste", "prost", "serde", @@ -3797,7 +3797,7 @@ dependencies = [ "lz4", "match-template", "num", - "opendal 0.51.0", + "opendal 0.51.1", "rand", "ringbuffer", "roaring", @@ -4008,7 +4008,7 @@ dependencies = [ "log", "num-derive", "num-traits", - "opendal 0.51.0", + "opendal 0.51.1", "parking_lot 0.12.3", "prqlc", "rand", @@ -4044,7 +4044,7 @@ dependencies = [ "futures", "http 1.1.0", "log", - "opendal 0.51.0", + "opendal 0.51.1", "parquet", "prometheus-client", "regex", @@ -4153,7 +4153,7 @@ dependencies = [ "itertools 0.13.0", "jsonb", "log", - "opendal 0.51.0", + "opendal 0.51.1", "parking_lot 0.12.3", "parquet", "rand", @@ -4201,7 +4201,7 @@ dependencies = [ "futures", "hive_metastore", "log", - "opendal 0.51.0", + "opendal 0.51.1", "parquet", "recursive", "serde", @@ -4311,7 +4311,7 @@ dependencies = [ "databend-storages-common-table-meta", "futures-util", "log", - "opendal 0.51.0", + "opendal 0.51.1", "orc-rust", "serde", "serde_json", @@ -4347,7 +4347,7 @@ dependencies = [ "ethnum", "futures", "log", - "opendal 0.51.0", + "opendal 0.51.1", "parquet", "rand", "serde", @@ -4392,7 +4392,7 @@ dependencies = [ "databend-common-storages-parquet", "databend-storages-common-blocks", "databend-storages-common-table-meta", - "opendal 0.51.0", + "opendal 0.51.1", "parquet", "serde", "serde_json", @@ -4433,7 +4433,7 @@ dependencies = [ "enum-as-inner", "futures", "log", - "opendal 0.51.0", + "opendal 0.51.1", "parquet", "serde", "serde_json", @@ -4498,7 +4498,7 @@ dependencies = [ "jsonb", "log", "once_cell", - "opendal 0.51.0", + "opendal 0.51.1", "parking_lot 0.12.3", "regex", "serde", @@ -4735,7 +4735,7 @@ dependencies = [ "jsonb", "jwt-simple", "log", - "opendal 0.51.0", + "opendal 0.51.1", "tantivy", "tempfile", ] @@ -5113,7 +5113,7 @@ dependencies = [ "mysql_async", "naive-cityhash", "num_cpus", - "opendal 0.51.0", + "opendal 0.51.1", "opensrv-mysql", "opentelemetry", "opentelemetry_sdk", @@ -5297,7 +5297,7 @@ dependencies = [ "fastrace", "futures", "log", - "opendal 0.51.0", + "opendal 0.51.1", ] [[package]] @@ -10226,8 +10226,9 @@ dependencies = [ [[package]] name = "object_store_opendal" -version = "0.48.3" -source = "git+https://github.com/apache/opendal?rev=f7f9990#f7f9990f95146400c3aef8afc11804e4a6e3afe3" +version = "0.49.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d47750d91b6d9fa245b3dff15d582dff8462d826cf6c2d5f2e91035d9a317f9" dependencies = [ "async-trait", "bytes", @@ -10235,7 +10236,7 @@ dependencies = [ "futures", "futures-util", "object_store", - "opendal 0.51.0", + "opendal 0.51.1", "pin-project", "tokio", ] @@ -10307,8 +10308,9 @@ dependencies = [ [[package]] name = "opendal" -version = "0.51.0" -source = "git+https://github.com/apache/opendal?rev=f7f9990#f7f9990f95146400c3aef8afc11804e4a6e3afe3" +version = "0.51.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c9dcfa7a3615e3c60eb662ed6b46b6f244cf2658098f593c0c0915430b3a268" dependencies = [ "anyhow", "async-backtrace", diff --git a/Cargo.toml b/Cargo.toml index 1259a7be28ef4..07383209a6470 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -369,10 +369,10 @@ num-derive = "0.3.3" num-traits = "0.2.19" num_cpus = "1.13.1" object = "0.36.5" -object_store_opendal = { git = "https://github.com/apache/opendal", package = "object_store_opendal", rev = "f7f9990" } +object_store_opendal = { version = "0.49.0" } once_cell = "1.15.0" openai_api_rust = "0.1" -opendal = { version = "0.51", git = "https://github.com/apache/opendal", rev = "f7f9990", features = [ +opendal = { version = "0.51.1", features = [ "layers-fastrace", "layers-prometheus-client", "layers-async-backtrace", From 5b372c1193c18c1e653463cf65007d16f5625134 Mon Sep 17 00:00:00 2001 From: Damon07 <1490818749@qq.com> Date: Tue, 14 Jan 2025 11:02:20 +0800 Subject: [PATCH 08/18] fix(query): fix lazy columns missed in constant table scan (#17258) * fix(query): fix lazy columns missed in constant table scan Signed-off-by: damon * clear lazy columns when rewriting false filter to be empty scan Signed-off-by: damon --------- Signed-off-by: damon --- src/query/sql/src/planner/metadata.rs | 4 ++++ .../optimizer/rule/rewrite/rule_eliminate_filter.rs | 6 ++++++ tests/sqllogictests/suites/query/issues/issue_17158.test | 8 ++++++++ 3 files changed, 18 insertions(+) create mode 100644 tests/sqllogictests/suites/query/issues/issue_17158.test diff --git a/src/query/sql/src/planner/metadata.rs b/src/query/sql/src/planner/metadata.rs index bd8507d0604c3..d3a84062b70cf 100644 --- a/src/query/sql/src/planner/metadata.rs +++ b/src/query/sql/src/planner/metadata.rs @@ -170,6 +170,10 @@ impl Metadata { self.lazy_columns.extend(indices); } + pub fn clear_lazy_columns(&mut self) { + self.lazy_columns.clear(); + } + pub fn add_non_lazy_columns(&mut self, indices: HashSet) { debug_assert!(indices.iter().all(|i| *i < self.columns.len())); self.non_lazy_columns.extend(indices); diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_eliminate_filter.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_eliminate_filter.rs index 211fb3e01e2a5..8400b5dc6512e 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_eliminate_filter.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_eliminate_filter.rs @@ -77,6 +77,12 @@ impl Rule for RuleEliminateFilter { .derive_relational_prop(&RelExpr::with_s_expr(s_expr))? .output_columns .clone(); + + { + let mut metadata = self.metadata.write(); + metadata.clear_lazy_columns(); + } + let metadata = self.metadata.read(); let mut fields = Vec::with_capacity(output_columns.len()); diff --git a/tests/sqllogictests/suites/query/issues/issue_17158.test b/tests/sqllogictests/suites/query/issues/issue_17158.test new file mode 100644 index 0000000000000..cee1165ae2947 --- /dev/null +++ b/tests/sqllogictests/suites/query/issues/issue_17158.test @@ -0,0 +1,8 @@ +statement ok +create table tt2 (c0 bool, c1 int); + +statement ok +insert into tt2 values(true, 1),(false, 2),(true, 3); + +statement ok +select null, c0, 30, c1 from tt2 where false order by c0 LIMIT 3 OFFSET 0; \ No newline at end of file From 82611b17883857bc1b600bc37ee1b8ed67a176e2 Mon Sep 17 00:00:00 2001 From: Sky Fan <3374614481@qq.com> Date: Tue, 14 Jan 2025 11:03:21 +0800 Subject: [PATCH 09/18] fix: vacuum index not work if index is dropped by create or replace (#17263) * fix: vacuum index not work if index is dropped by create or replace * fix test * remove Any * add comment --- src/meta/api/src/name_id_value_api.rs | 12 ++- src/meta/api/src/schema_api_impl.rs | 85 +++++++++++++------ .../management/src/procedure/procedure_mgr.rs | 19 +++-- ...1_004_vacuum_drop_aggregating_index.result | 14 +++ .../01_004_vacuum_drop_aggregating_index.sh | 34 ++++++++ 5 files changed, 130 insertions(+), 34 deletions(-) diff --git a/src/meta/api/src/name_id_value_api.rs b/src/meta/api/src/name_id_value_api.rs index 46117cb9cf290..2d51a7dc194ff 100644 --- a/src/meta/api/src/name_id_value_api.rs +++ b/src/meta/api/src/name_id_value_api.rs @@ -71,17 +71,22 @@ where /// Such operations do not have any condition constraints. /// For example, a `name -> id` mapping can have a reverse `id -> name` mapping. /// + /// `mark_delete_records` is used to generate additional key-values for implementing `mark_delete` operation. + /// For example, when an index is dropped by `override_exist`, `__fd_marked_deleted_index// -> marked_deleted_index_meta` will be added. + /// /// If there is already a `name_ident` exists, return the existing id in a `Ok(Err(exist))`. /// Otherwise, create `name -> id -> value` and returns the created id in a `Ok(Ok(created))`. - async fn create_id_value( + async fn create_id_value( &self, name_ident: &K, value: &IdRsc::ValueType, override_exist: bool, associated_records: A, + mark_delete_records: M, ) -> Result, SeqV>>, MetaTxnError> where A: Fn(DataId) -> Vec<(String, Vec)> + Send, + M: Fn(DataId, &IdRsc::ValueType) -> Result)>, MetaError> + Send, { debug!(name_ident :? =name_ident; "NameIdValueApi: {}", func_name!()); @@ -114,6 +119,11 @@ where for (k, _v) in kvs { txn.if_then.push(TxnOp::delete(k)); } + + let kvs = mark_delete_records(seq_id.data, &seq_meta.data)?; + for (k, v) in kvs { + txn.if_then.push(TxnOp::put(k, v)); + } } else { return Ok(Err(seq_id)); } diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 532d4a642af3f..4a1506d410875 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -753,12 +753,21 @@ impl + ?Sized> SchemaApi for KV { let name_ident_raw = serialize_struct(&IndexNameIdentRaw::from(name_ident))?; let create_res = self - .create_id_value(name_ident, meta, overriding, |id| { - vec![( - IndexIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key(), - name_ident_raw.clone(), - )] - }) + .create_id_value( + name_ident, + meta, + overriding, + |id| { + vec![( + IndexIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key(), + name_ident_raw.clone(), + )] + }, + |index_id, value| { + mark_index_as_deleted(name_ident.tenant(), value.table_id, *index_id) + .map(|(k, v)| vec![(k, v)]) + }, + ) .await?; match create_res { @@ -803,20 +812,9 @@ impl + ?Sized> SchemaApi for KV { IndexIdToNameIdent::new_generic(name_ident.tenant(), seq_id.data).to_string_key(), )); - // add __fd_marked_deleted_index// -> marked_deleted_index_meta - let marked_deleted_index_id_ident = MarkedDeletedIndexIdIdent::new_generic( - name_ident.tenant(), - MarkedDeletedIndexId::new(seq_meta.data.table_id, *seq_id.data), - ); - let marked_deleted_index_meta = MarkedDeletedIndexMeta { - dropped_on: Utc::now(), - index_type: MarkedDeletedIndexType::AGGREGATING, - }; - - txn.if_then.push(TxnOp::put( - marked_deleted_index_id_ident.to_string_key(), - serialize_struct(&marked_deleted_index_meta)?, - )); + let (key, value) = + mark_index_as_deleted(name_ident.tenant(), seq_meta.data.table_id, *seq_id.data)?; + txn.if_then.push(TxnOp::put(key, value)); let (succ, _responses) = send_txn(self, txn).await?; debug!(key :? =name_ident, id :? =&id_ident,succ = succ; "{}", func_name!()); @@ -2901,12 +2899,18 @@ impl + ?Sized> SchemaApi for KV { let name_ident_raw = serialize_struct(&CatalogNameIdentRaw::from(name_ident))?; let res = self - .create_id_value(name_ident, meta, false, |id| { - vec![( - CatalogIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key(), - name_ident_raw.clone(), - )] - }) + .create_id_value( + name_ident, + meta, + false, + |id| { + vec![( + CatalogIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key(), + name_ident_raw.clone(), + )] + }, + |_, _| Ok(vec![]), + ) .await?; Ok(res) @@ -3020,7 +3024,13 @@ impl + ?Sized> SchemaApi for KV { let name_ident = &req.dictionary_ident; let create_res = self - .create_id_value(name_ident, &req.dictionary_meta, false, |_| vec![]) + .create_id_value( + name_ident, + &req.dictionary_meta, + false, + |_| vec![], + |_, _| Ok(vec![]), + ) .await?; match create_res { @@ -4177,3 +4187,24 @@ fn typ() -> &'static str { .next() .unwrap_or("UnknownType") } + +/// add __fd_marked_deleted_index// -> marked_deleted_index_meta +pub fn mark_index_as_deleted( + tenant: &Tenant, + table_id: u64, + index_id: u64, +) -> Result<(String, Vec), MetaError> { + let marked_deleted_index_id_ident = MarkedDeletedIndexIdIdent::new_generic( + tenant, + MarkedDeletedIndexId::new(table_id, index_id), + ); + let marked_deleted_index_meta = MarkedDeletedIndexMeta { + dropped_on: Utc::now(), + index_type: MarkedDeletedIndexType::AGGREGATING, + }; + + Ok(( + marked_deleted_index_id_ident.to_string_key(), + serialize_struct(&marked_deleted_index_meta)?, + )) +} diff --git a/src/query/management/src/procedure/procedure_mgr.rs b/src/query/management/src/procedure/procedure_mgr.rs index 21a88e301522c..be765d04b3477 100644 --- a/src/query/management/src/procedure/procedure_mgr.rs +++ b/src/query/management/src/procedure/procedure_mgr.rs @@ -66,12 +66,19 @@ impl ProcedureMgr { let create_res = self .kv_api - .create_id_value(name_ident, meta, overriding, |id| { - vec![( - ProcedureIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key(), - name_ident_raw.clone(), - )] - }) + .create_id_value( + name_ident, + meta, + overriding, + |id| { + vec![( + ProcedureIdToNameIdent::new_generic(name_ident.tenant(), id) + .to_string_key(), + name_ident_raw.clone(), + )] + }, + |_, _| Ok(vec![]), + ) .await?; match create_res { diff --git a/tests/suites/5_ee/01_vacuum/01_004_vacuum_drop_aggregating_index.result b/tests/suites/5_ee/01_vacuum/01_004_vacuum_drop_aggregating_index.result index 2c156409652a2..c2dbef8e4e291 100644 --- a/tests/suites/5_ee/01_vacuum/01_004_vacuum_drop_aggregating_index.result +++ b/tests/suites/5_ee/01_vacuum/01_004_vacuum_drop_aggregating_index.result @@ -37,3 +37,17 @@ before vacuum, should be 1 index dir after vacuum, should be 0 index dir 0 0 +>>>> create or replace database test_vacuum_drop_aggregating_index +>>>> create or replace table test_vacuum_drop_aggregating_index.agg(a int, b int,c int) 'fs:///tmp/test_vacuum_drop_aggregating_index/' +>>>> insert into test_vacuum_drop_aggregating_index.agg values (1,1,4), (1,2,1), (1,2,4) +3 +>>>> CREATE OR REPLACE AGGREGATING INDEX index AS SELECT MIN(a), MAX(b) FROM test_vacuum_drop_aggregating_index.agg; +>>>> insert into test_vacuum_drop_aggregating_index.agg values (2,2,5) +1 +>>>> REFRESH AGGREGATING INDEX index; +before vacuum, should be 1 index dir +1 +>>>> create or replace aggregating index index AS SELECT MIN(a), MAX(b) FROM test_vacuum_drop_aggregating_index.agg; +after vacuum, should be 0 index dir +0 +>>>> drop aggregating index index diff --git a/tests/suites/5_ee/01_vacuum/01_004_vacuum_drop_aggregating_index.sh b/tests/suites/5_ee/01_vacuum/01_004_vacuum_drop_aggregating_index.sh index b6b92e6ce8f50..b957280cf7ad1 100755 --- a/tests/suites/5_ee/01_vacuum/01_004_vacuum_drop_aggregating_index.sh +++ b/tests/suites/5_ee/01_vacuum/01_004_vacuum_drop_aggregating_index.sh @@ -86,3 +86,37 @@ echo "after vacuum, should be 0 index dir" find /tmp/test_vacuum_drop_aggregating_index/"$PREFIX_1"/_i_a/ -type f | wc -l find /tmp/test_vacuum_drop_aggregating_index/"$PREFIX_2"/_i_a/ -type f | wc -l + + +### create or replace index + +stmt "create or replace database test_vacuum_drop_aggregating_index" + +mkdir -p /tmp/test_vacuum_drop_aggregating_index/ + +stmt "create or replace table test_vacuum_drop_aggregating_index.agg(a int, b int,c int) 'fs:///tmp/test_vacuum_drop_aggregating_index/'" + + +stmt "insert into test_vacuum_drop_aggregating_index.agg values (1,1,4), (1,2,1), (1,2,4)" + +stmt "CREATE OR REPLACE AGGREGATING INDEX index AS SELECT MIN(a), MAX(b) FROM test_vacuum_drop_aggregating_index.agg;" + +stmt "insert into test_vacuum_drop_aggregating_index.agg values (2,2,5)" + +stmt "REFRESH AGGREGATING INDEX index;" + +SNAPSHOT_LOCATION=$(echo "select snapshot_location from fuse_snapshot('test_vacuum_drop_aggregating_index','agg') limit 1" | $BENDSQL_CLIENT_CONNECT) +PREFIX=$(echo "$SNAPSHOT_LOCATION" | cut -d'/' -f1-2) + +echo "before vacuum, should be 1 index dir" + +ls /tmp/test_vacuum_drop_aggregating_index/"$PREFIX"/_i_a/ | wc -l + +stmt "create or replace aggregating index index AS SELECT MIN(a), MAX(b) FROM test_vacuum_drop_aggregating_index.agg;" + +stmt "set data_retention_time_in_days=0; select * from fuse_vacuum_drop_aggregating_index('test_vacuum_drop_aggregating_index','agg')" > /dev/null + +echo "after vacuum, should be 0 index dir" +find /tmp/test_vacuum_drop_aggregating_index/"$PREFIX"/_i_a/ -type f | wc -l + +stmt "drop aggregating index index" From 420f4b4d56cb8e85c6de919e118ac48432c3a676 Mon Sep 17 00:00:00 2001 From: Winter Zhang Date: Tue, 14 Jan 2025 13:52:20 +0800 Subject: [PATCH 10/18] feat(cluster): support warehouse level show processlist and kill query (#17249) * feat(cluster): support proxy to warehouse * feat(cluster): support proxy to warehouse * feat(cluster): support proxy to warehouse * feat(cluster): support proxy to warehouse * feat(cluster): support proxy to warehouse * feat(cluster): support proxy to warehouse * feat(cluster): support proxy to warehouse * feat(cluster): support proxy to warehouse * feat(cluster): support proxy to warehouse --- src/query/catalog/src/plan/partition.rs | 7 ++- src/query/catalog/src/table.rs | 13 +++- src/query/catalog/src/table_context.rs | 2 + src/query/catalog/tests/it/partitions.rs | 2 +- .../management/src/warehouse/warehouse_api.rs | 2 + .../management/src/warehouse/warehouse_mgr.rs | 59 ++++++++++++++++-- src/query/service/src/clusters/cluster.rs | 47 ++++++++++---- .../src/interpreters/interpreter_kill.rs | 23 +++---- .../interpreters/interpreter_set_priority.rs | 22 +++---- .../interpreters/interpreter_system_action.rs | 19 +++--- .../interpreter_table_truncate.rs | 18 +++--- src/query/service/src/schedulers/scheduler.rs | 4 ++ .../src/servers/admin/v1/tenant_tables.rs | 3 +- src/query/service/src/sessions/query_ctx.rs | 11 +++- .../service/src/sessions/query_ctx_shared.rs | 33 +++++++++- .../src/table_functions/async_crash_me.rs | 4 -- .../table_functions/cloud/task_dependents.rs | 4 -- .../cloud/task_dependents_enable.rs | 4 -- .../src/table_functions/cloud/task_history.rs | 4 -- .../table_functions/numbers/numbers_table.rs | 8 ++- .../others/execute_background_job.rs | 4 -- .../table_functions/others/license_info.rs | 4 -- .../others/suggested_background_tasks.rs | 4 -- .../table_functions/others/tenant_quota.rs | 4 -- .../service/src/table_functions/others/udf.rs | 4 -- .../src/table_functions/sync_crash_me.rs | 4 -- .../tests/it/sql/exec/get_table_bind_test.rs | 8 +++ .../it/storages/fuse/operations/commit.rs | 8 +++ src/query/sql/src/executor/physical_plan.rs | 7 +++ .../sql/src/planner/optimizer/optimizer.rs | 15 +++++ src/query/sql/src/planner/optimizer/util.rs | 33 +++++++++- src/query/storages/delta/src/table.rs | 5 +- src/query/storages/fuse/src/fuse_table.rs | 5 +- .../function_template/simple_arg_func.rs | 10 +-- .../function_template/simple_func_template.rs | 27 +++++--- .../src/table_functions/set_cache_capacity.rs | 6 +- .../storages/hive/hive/src/hive_table.rs | 5 +- src/query/storages/iceberg/src/table.rs | 5 +- src/query/storages/memory/src/memory_table.rs | 7 ++- src/query/storages/null/src/null_table.rs | 5 +- src/query/storages/orc/src/table.rs | 5 +- .../src/parquet_rs/parquet_table/table.rs | 5 +- .../result_cache/src/table_function/table.rs | 4 -- src/query/storages/stage/src/stage_table.rs | 5 +- src/query/storages/stream/src/stream_table.rs | 5 +- .../storages/system/src/backtrace_table.rs | 3 +- src/query/storages/system/src/caches_table.rs | 3 +- src/query/storages/system/src/log_queue.rs | 11 ++-- .../system/src/malloc_stats_totals_table.rs | 3 +- .../storages/system/src/metrics_table.rs | 3 +- .../storages/system/src/processes_table.rs | 3 +- .../storages/system/src/queries_profiling.rs | 3 +- src/query/storages/system/src/table.rs | 61 +++++++++++++------ .../storages/system/src/temp_files_table.rs | 6 +- 54 files changed, 392 insertions(+), 187 deletions(-) diff --git a/src/query/catalog/src/plan/partition.rs b/src/query/catalog/src/plan/partition.rs index 026f80d4c32f2..b2ba71f4e5898 100644 --- a/src/query/catalog/src/plan/partition.rs +++ b/src/query/catalog/src/plan/partition.rs @@ -102,7 +102,9 @@ pub enum PartitionsShuffleKind { // Bind the Partition to executor by partition.rand() order. Rand, // Bind the Partition to executor by broadcast - Broadcast, + BroadcastCluster, + // Bind the Partition to warehouse executor by broadcast + BroadcastWarehouse, } #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)] @@ -190,7 +192,8 @@ impl Partitions { parts.shuffle(&mut rng); parts } - PartitionsShuffleKind::Broadcast => { + // the executors will be all nodes in the warehouse if a query is BroadcastWarehouse. + PartitionsShuffleKind::BroadcastCluster | PartitionsShuffleKind::BroadcastWarehouse => { return Ok(executors_sorted .into_iter() .map(|executor| { diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index 829a4c9e79800..7d6bdfa471d63 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -94,8 +94,8 @@ pub trait Table: Sync + Send { self.get_table_info().ident.table_id } - fn is_local(&self) -> bool { - true + fn distribution_level(&self) -> DistributionLevel { + DistributionLevel::Local } fn as_any(&self) -> &dyn Any; @@ -450,7 +450,7 @@ pub trait Table: Sync + Send { false } - fn broadcast_truncate_to_cluster(&self) -> bool { + fn broadcast_truncate_to_warehouse(&self) -> bool { false } @@ -678,3 +678,10 @@ pub struct ColumnRange { pub min: Bound, pub max: Bound, } + +#[derive(Debug)] +pub enum DistributionLevel { + Local, + Cluster, + Warehouse, +} diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 1e957eb2ea68c..824d03fec4945 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -234,6 +234,8 @@ pub trait TableContext: Send + Sync { fn get_settings(&self) -> Arc; fn get_session_settings(&self) -> Arc; fn get_cluster(&self) -> Arc; + fn set_cluster(&self, cluster: Arc); + async fn get_warehouse_cluster(&self) -> Result>; fn get_processes_info(&self) -> Vec; fn get_queued_queries(&self) -> Vec; fn get_queries_profile(&self) -> HashMap>; diff --git a/src/query/catalog/tests/it/partitions.rs b/src/query/catalog/tests/it/partitions.rs index 2199c93858f94..e9cbe441abd48 100644 --- a/src/query/catalog/tests/it/partitions.rs +++ b/src/query/catalog/tests/it/partitions.rs @@ -239,7 +239,7 @@ fn test_partition_reshuffle() { // Broadcast. { - let partitions = gen_parts(PartitionsShuffleKind::Broadcast, 3); + let partitions = gen_parts(PartitionsShuffleKind::BroadcastCluster, 3); let shuffle = partitions.reshuffle(executors_2.clone()).unwrap(); writeln!( diff --git a/src/query/management/src/warehouse/warehouse_api.rs b/src/query/management/src/warehouse/warehouse_api.rs index ed4f92c4610c5..dbeecd9bd69bf 100644 --- a/src/query/management/src/warehouse/warehouse_api.rs +++ b/src/query/management/src/warehouse/warehouse_api.rs @@ -132,5 +132,7 @@ pub trait WarehouseApi: Sync + Send { async fn discover(&self, node_id: &str) -> Result>; + async fn discover_warehouse_nodes(&self, node_id: &str) -> Result>; + async fn get_node_info(&self, node_id: &str) -> Result>; } diff --git a/src/query/management/src/warehouse/warehouse_mgr.rs b/src/query/management/src/warehouse/warehouse_mgr.rs index b0b563d209094..c0aa324f2a0d0 100644 --- a/src/query/management/src/warehouse/warehouse_mgr.rs +++ b/src/query/management/src/warehouse/warehouse_mgr.rs @@ -1485,13 +1485,34 @@ impl WarehouseApi for WarehouseMgr { return Err(ErrorCode::InvalidWarehouse("Warehouse name is empty.")); } - let warehouse_snapshot = self.warehouse_snapshot(&warehouse).await?; + let nodes_prefix = format!( + "{}/{}/", + self.cluster_node_key_prefix, + escape_for_key(&warehouse)? + ); - Ok(warehouse_snapshot - .snapshot_nodes - .into_iter() - .map(|x| x.node_info) - .collect()) + let values = self.metastore.prefix_list_kv(&nodes_prefix).await?; + + let mut nodes_info = Vec::with_capacity(values.len()); + for (node_key, value) in values { + let mut node_info = serde_json::from_slice::(&value.data)?; + + let suffix = &node_key[nodes_prefix.len()..]; + + let Some((cluster, node)) = suffix.split_once('/') else { + return Err(ErrorCode::InvalidWarehouse(format!( + "Node key is invalid {:?}", + node_key + ))); + }; + + node_info.id = unescape_for_key(node)?; + node_info.cluster_id = unescape_for_key(cluster)?; + node_info.warehouse_id = warehouse.to_string(); + nodes_info.push(node_info); + } + + Ok(nodes_info) } async fn add_warehouse_cluster( @@ -2155,6 +2176,32 @@ impl WarehouseApi for WarehouseMgr { .collect::>()) } + async fn discover_warehouse_nodes(&self, node_id: &str) -> Result> { + let node_key = format!("{}/{}", self.node_key_prefix, escape_for_key(node_id)?); + + let Some(seq) = self.metastore.get_kv(&node_key).await? else { + return Err(ErrorCode::NotFoundClusterNode(format!( + "Node {} is offline, Please restart this node.", + node_id + ))); + }; + + let node = serde_json::from_slice::(&seq.data)?; + + if !node.assigned_warehouse() { + return Ok(vec![node]); + } + + let expect_version = DATABEND_COMMIT_VERSION.to_string(); + + Ok(self + .list_warehouse_nodes(node.warehouse_id.clone()) + .await? + .into_iter() + .filter(|x| x.binary_version == expect_version) + .collect::>()) + } + async fn get_node_info(&self, node_id: &str) -> Result> { let node_key = format!("{}/{}", self.node_key_prefix, escape_for_key(node_id)?); match self.metastore.get_kv(&node_key).await? { diff --git a/src/query/service/src/clusters/cluster.rs b/src/query/service/src/clusters/cluster.rs index 5ba2fa91125c6..3239aa0484fc9 100644 --- a/src/query/service/src/clusters/cluster.rs +++ b/src/query/service/src/clusters/cluster.rs @@ -273,17 +273,11 @@ impl ClusterDiscovery { Ok((lift_time, Arc::new(cluster_manager))) } - #[async_backtrace::framed] - pub async fn discover(&self, config: &InnerConfig) -> Result> { - let nodes = match config.query.cluster_id.is_empty() { - true => self.warehouse_manager.discover(&config.query.node_id).await, - false => { - self.warehouse_manager - .list_warehouse_cluster_nodes(&self.cluster_id, &self.cluster_id) - .await - } - }; - + async fn create_cluster_with_try_connect( + &self, + config: &InnerConfig, + nodes: Result>, + ) -> Result> { match nodes { Err(cause) => { metric_incr_cluster_error_count( @@ -336,6 +330,37 @@ impl ClusterDiscovery { } } + pub async fn discover_warehouse_nodes(&self, config: &InnerConfig) -> Result> { + let nodes = match config.query.cluster_id.is_empty() { + true => { + self.warehouse_manager + .discover_warehouse_nodes(&config.query.node_id) + .await + } + false => { + self.warehouse_manager + .list_warehouse_nodes(self.cluster_id.clone()) + .await + } + }; + + self.create_cluster_with_try_connect(config, nodes).await + } + + #[async_backtrace::framed] + pub async fn discover(&self, config: &InnerConfig) -> Result> { + let nodes = match config.query.cluster_id.is_empty() { + true => self.warehouse_manager.discover(&config.query.node_id).await, + false => { + self.warehouse_manager + .list_warehouse_cluster_nodes(&self.cluster_id, &self.cluster_id) + .await + } + }; + + self.create_cluster_with_try_connect(config, nodes).await + } + pub async fn find_node_by_warehouse( self: Arc, warehouse: &str, diff --git a/src/query/service/src/interpreters/interpreter_kill.rs b/src/query/service/src/interpreters/interpreter_kill.rs index 72309f0c2ecf8..e3bd3dcf2ba1e 100644 --- a/src/query/service/src/interpreters/interpreter_kill.rs +++ b/src/query/service/src/interpreters/interpreter_kill.rs @@ -31,7 +31,7 @@ use crate::sessions::QueryContext; pub struct KillInterpreter { ctx: Arc, plan: KillPlan, - proxy_to_cluster: bool, + proxy_to_warehouse: bool, } impl KillInterpreter { @@ -39,7 +39,7 @@ impl KillInterpreter { Ok(KillInterpreter { ctx, plan, - proxy_to_cluster: true, + proxy_to_warehouse: true, }) } @@ -47,29 +47,30 @@ impl KillInterpreter { Ok(KillInterpreter { ctx, plan, - proxy_to_cluster: false, + proxy_to_warehouse: false, }) } #[async_backtrace::framed] - async fn kill_cluster_query(&self) -> Result { - let cluster = self.ctx.get_cluster(); + async fn kill_warehouse_query(&self) -> Result { let settings = self.ctx.get_settings(); + let warehouse = self.ctx.get_warehouse_cluster().await?; + let flight_params = FlightParams { timeout: settings.get_flight_client_timeout()?, retry_times: settings.get_flight_max_retry_times()?, retry_interval: settings.get_flight_retry_interval()?, }; - let mut message = HashMap::with_capacity(cluster.nodes.len()); + let mut message = HashMap::with_capacity(warehouse.nodes.len()); - for node_info in &cluster.nodes { - if node_info.id != cluster.local_id { + for node_info in &warehouse.nodes { + if node_info.id != warehouse.local_id { message.insert(node_info.id.clone(), self.plan.clone()); } } - let res = cluster + let res = warehouse .do_action::<_, bool>(KILL_QUERY, message, flight_params) .await?; @@ -85,8 +86,8 @@ impl KillInterpreter { #[async_backtrace::framed] async fn execute_kill(&self, session_id: &String) -> Result { match self.ctx.get_session_by_id(session_id) { - None => match self.proxy_to_cluster { - true => self.kill_cluster_query().await, + None => match self.proxy_to_warehouse { + true => self.kill_warehouse_query().await, false => Err(ErrorCode::UnknownSession(format!( "Not found session id {}", session_id diff --git a/src/query/service/src/interpreters/interpreter_set_priority.rs b/src/query/service/src/interpreters/interpreter_set_priority.rs index 1bf830b24be01..eabff93f3d5fc 100644 --- a/src/query/service/src/interpreters/interpreter_set_priority.rs +++ b/src/query/service/src/interpreters/interpreter_set_priority.rs @@ -30,7 +30,7 @@ use crate::sessions::QueryContext; pub struct SetPriorityInterpreter { ctx: Arc, plan: SetPriorityPlan, - proxy_to_cluster: bool, + proxy_to_warehouse: bool, } impl SetPriorityInterpreter { @@ -38,7 +38,7 @@ impl SetPriorityInterpreter { Ok(SetPriorityInterpreter { ctx, plan, - proxy_to_cluster: true, + proxy_to_warehouse: true, }) } @@ -46,17 +46,17 @@ impl SetPriorityInterpreter { Ok(SetPriorityInterpreter { ctx, plan, - proxy_to_cluster: false, + proxy_to_warehouse: false, }) } #[async_backtrace::framed] - async fn set_cluster_priority(&self) -> Result { - let cluster = self.ctx.get_cluster(); + async fn set_warehouse_priority(&self) -> Result { + let warehouse = self.ctx.get_warehouse_cluster().await?; - let mut message = HashMap::with_capacity(cluster.nodes.len()); - for node_info in &cluster.nodes { - if node_info.id != cluster.local_id { + let mut message = HashMap::with_capacity(warehouse.nodes.len()); + for node_info in &warehouse.nodes { + if node_info.id != warehouse.local_id { message.insert(node_info.id.clone(), self.plan.clone()); } } @@ -67,7 +67,7 @@ impl SetPriorityInterpreter { retry_times: settings.get_flight_max_retry_times()?, retry_interval: settings.get_flight_retry_interval()?, }; - let res = cluster + let res = warehouse .do_action::<_, bool>(SET_PRIORITY, message, flight_params) .await?; @@ -96,8 +96,8 @@ impl Interpreter for SetPriorityInterpreter { async fn execute2(&self) -> Result { let id = &self.plan.id; match self.ctx.get_session_by_id(id) { - None => match self.proxy_to_cluster { - true => self.set_cluster_priority().await, + None => match self.proxy_to_warehouse { + true => self.set_warehouse_priority().await, false => Err(ErrorCode::UnknownSession(format!( "Not found session id {}", id diff --git a/src/query/service/src/interpreters/interpreter_system_action.rs b/src/query/service/src/interpreters/interpreter_system_action.rs index ca7e3ffb50c7d..e0b2b192a3a70 100644 --- a/src/query/service/src/interpreters/interpreter_system_action.rs +++ b/src/query/service/src/interpreters/interpreter_system_action.rs @@ -31,7 +31,7 @@ use crate::sessions::QueryContext; pub struct SystemActionInterpreter { ctx: Arc, plan: SystemPlan, - proxy_to_cluster: bool, + proxy_to_warehouse: bool, } impl SystemActionInterpreter { @@ -39,7 +39,7 @@ impl SystemActionInterpreter { Ok(SystemActionInterpreter { ctx, plan, - proxy_to_cluster: true, + proxy_to_warehouse: true, }) } @@ -47,7 +47,7 @@ impl SystemActionInterpreter { Ok(SystemActionInterpreter { ctx, plan, - proxy_to_cluster: false, + proxy_to_warehouse: false, }) } } @@ -65,11 +65,12 @@ impl Interpreter for SystemActionInterpreter { #[async_backtrace::framed] #[fastrace::trace] async fn execute2(&self) -> Result { - if self.proxy_to_cluster { - let cluster = self.ctx.get_cluster(); - let mut message = HashMap::with_capacity(cluster.nodes.len()); - for node_info in &cluster.nodes { - if node_info.id != cluster.local_id { + if self.proxy_to_warehouse { + let warehouse = self.ctx.get_warehouse_cluster().await?; + + let mut message = HashMap::with_capacity(warehouse.nodes.len()); + for node_info in &warehouse.nodes { + if node_info.id != warehouse.local_id { message.insert(node_info.id.clone(), self.plan.clone()); } } @@ -80,7 +81,7 @@ impl Interpreter for SystemActionInterpreter { retry_times: settings.get_flight_max_retry_times()?, retry_interval: settings.get_flight_retry_interval()?, }; - cluster + warehouse .do_action::<_, ()>(SYSTEM_ACTION, message, flight_params) .await?; } diff --git a/src/query/service/src/interpreters/interpreter_table_truncate.rs b/src/query/service/src/interpreters/interpreter_table_truncate.rs index 70afa08e38660..1c704dcc3247e 100644 --- a/src/query/service/src/interpreters/interpreter_table_truncate.rs +++ b/src/query/service/src/interpreters/interpreter_table_truncate.rs @@ -32,7 +32,7 @@ pub struct TruncateTableInterpreter { ctx: Arc, plan: TruncateTablePlan, - proxy_to_cluster: bool, + proxy_to_warehouse: bool, } impl TruncateTableInterpreter { @@ -40,7 +40,7 @@ impl TruncateTableInterpreter { Ok(TruncateTableInterpreter { ctx, plan, - proxy_to_cluster: true, + proxy_to_warehouse: true, }) } @@ -48,7 +48,7 @@ impl TruncateTableInterpreter { Ok(TruncateTableInterpreter { ctx, plan, - proxy_to_cluster: false, + proxy_to_warehouse: false, }) } } @@ -85,12 +85,12 @@ impl Interpreter for TruncateTableInterpreter { // check mutability table.check_mutable()?; - if self.proxy_to_cluster && table.broadcast_truncate_to_cluster() { - let cluster = self.ctx.get_cluster(); + if self.proxy_to_warehouse && table.broadcast_truncate_to_warehouse() { + let warehouse = self.ctx.get_warehouse_cluster().await?; - let mut message = HashMap::with_capacity(cluster.nodes.len()); - for node_info in &cluster.nodes { - if node_info.id != cluster.local_id { + let mut message = HashMap::with_capacity(warehouse.nodes.len()); + for node_info in &warehouse.nodes { + if node_info.id != warehouse.local_id { message.insert(node_info.id.clone(), self.plan.clone()); } } @@ -101,7 +101,7 @@ impl Interpreter for TruncateTableInterpreter { retry_times: settings.get_flight_max_retry_times()?, retry_interval: settings.get_flight_retry_interval()?, }; - cluster + warehouse .do_action::<_, ()>(TRUNCATE_TABLE, message, flight_params) .await?; } diff --git a/src/query/service/src/schedulers/scheduler.rs b/src/query/service/src/schedulers/scheduler.rs index 726f380237aad..5bee2a3567800 100644 --- a/src/query/service/src/schedulers/scheduler.rs +++ b/src/query/service/src/schedulers/scheduler.rs @@ -65,6 +65,10 @@ pub async fn build_query_pipeline_without_render_result_set( let build_res = if !plan.is_distributed_plan() { build_local_pipeline(ctx, plan).await } else { + if plan.is_warehouse_distributed_plan() { + ctx.set_cluster(ctx.get_warehouse_cluster().await?); + } + build_distributed_pipeline(ctx, plan).await }?; Ok(build_res) diff --git a/src/query/service/src/servers/admin/v1/tenant_tables.rs b/src/query/service/src/servers/admin/v1/tenant_tables.rs index 098609e887057..7be7b059b713c 100644 --- a/src/query/service/src/servers/admin/v1/tenant_tables.rs +++ b/src/query/service/src/servers/admin/v1/tenant_tables.rs @@ -16,6 +16,7 @@ use chrono::DateTime; use chrono::Utc; use databend_common_ast::parser::Dialect; use databend_common_catalog::catalog::CatalogManager; +use databend_common_catalog::table::DistributionLevel; use databend_common_config::GlobalConfig; use databend_common_exception::Result; use databend_common_meta_app::tenant::Tenant; @@ -111,7 +112,7 @@ async fn load_tenant_tables(tenant: &Tenant) -> Result { engine: table.engine().to_string(), created_on: table.get_table_info().meta.created_on, updated_on: table.get_table_info().meta.updated_on, - is_local: table.is_local(), + is_local: matches!(table.distribution_level(), DistributionLevel::Local), is_external: table.get_table_info().meta.storage_params.is_some(), rows: stats.number_of_rows, data_bytes: stats.data_bytes, diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 9c55fba8f5b37..b06d45b0f0841 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -976,6 +976,10 @@ impl TableContext for QueryContext { self.shared.get_cluster() } + fn set_cluster(&self, cluster: Arc) { + self.shared.set_cluster(cluster) + } + // Get all the processes list info. fn get_processes_info(&self) -> Vec { SessionManager::instance().processes_info() @@ -1140,10 +1144,9 @@ impl TableContext for QueryContext { if actual_batch_limit != max_batch_size { return Err(ErrorCode::StorageUnsupported( format!( - "Within the same transaction, the batch size for a stream must remain consistent {:?} {:?}", + "Within the same transaction, the batch size for a stream must remain consistent {:?} {:?}", actual_batch_limit, max_batch_size ) - )); } } else if max_batch_size.is_some() { @@ -1714,6 +1717,10 @@ impl TableContext for QueryContext { } Ok(streams_meta) } + + async fn get_warehouse_cluster(&self) -> Result> { + self.shared.get_warehouse_clusters().await + } } impl TrySpawn for QueryContext { diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index c31fe083316b2..af2324b508239 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -36,6 +36,7 @@ use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; use databend_common_catalog::table_context::ContextError; use databend_common_catalog::table_context::StageAttachment; +use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::principal::OnErrorMode; @@ -59,6 +60,7 @@ use parking_lot::RwLock; use uuid::Uuid; use crate::clusters::Cluster; +use crate::clusters::ClusterDiscovery; use crate::pipelines::executor::PipelineExecutor; use crate::sessions::query_affect::QueryAffect; use crate::sessions::Session; @@ -91,7 +93,8 @@ pub struct QueryContextShared { pub(in crate::sessions) session: Arc, pub(in crate::sessions) runtime: Arc>>>, pub(in crate::sessions) init_query_id: Arc>, - pub(in crate::sessions) cluster_cache: Arc, + pub(in crate::sessions) cluster_cache: Arc>>, + pub(in crate::sessions) warehouse_cache: Arc>>>, pub(in crate::sessions) running_query: Arc>>, pub(in crate::sessions) running_query_kind: Arc>>, pub(in crate::sessions) running_query_text_hash: Arc>>, @@ -158,7 +161,7 @@ impl QueryContextShared { query_settings: Settings::create(session.get_current_tenant()), catalog_manager: CatalogManager::instance(), session, - cluster_cache, + cluster_cache: Arc::new(RwLock::new(cluster_cache)), data_operator: DataOperator::instance(), init_query_id: Arc::new(RwLock::new(Uuid::new_v4().to_string())), total_scan_values: Arc::new(Progress::create()), @@ -206,6 +209,7 @@ impl QueryContextShared { cluster_spill_progress: Default::default(), spilled_files: Default::default(), + warehouse_cache: Arc::new(RwLock::new(None)), })) } @@ -263,8 +267,31 @@ impl QueryContextShared { // TODO: Wait for the query to be processed (write out the last error) } + pub fn set_cluster(&self, cluster: Arc) { + let mut cluster_cache = self.cluster_cache.write(); + *cluster_cache = cluster; + } + pub fn get_cluster(&self) -> Arc { - self.cluster_cache.clone() + self.cluster_cache.read().clone() + } + + pub async fn get_warehouse_clusters(&self) -> Result> { + if let Some(warehouse) = self.warehouse_cache.read().as_ref() { + return Ok(warehouse.clone()); + } + + let config = GlobalConfig::instance(); + let discovery = ClusterDiscovery::instance(); + let warehouse = discovery.discover_warehouse_nodes(&config).await?; + + let mut write_guard = self.warehouse_cache.write(); + + if write_guard.is_none() { + *write_guard = Some(warehouse.clone()); + } + + Ok(write_guard.as_ref().cloned().expect("expect cluster.")) } pub fn get_current_catalog(&self) -> String { diff --git a/src/query/service/src/table_functions/async_crash_me.rs b/src/query/service/src/table_functions/async_crash_me.rs index 5ad3d3f1e3dcc..14c54c37538c9 100644 --- a/src/query/service/src/table_functions/async_crash_me.rs +++ b/src/query/service/src/table_functions/async_crash_me.rs @@ -90,10 +90,6 @@ impl AsyncCrashMeTable { #[async_trait::async_trait] impl Table for AsyncCrashMeTable { - fn is_local(&self) -> bool { - true - } - fn as_any(&self) -> &dyn Any { self } diff --git a/src/query/service/src/table_functions/cloud/task_dependents.rs b/src/query/service/src/table_functions/cloud/task_dependents.rs index 997ffc91f4cdf..b01285495871c 100644 --- a/src/query/service/src/table_functions/cloud/task_dependents.rs +++ b/src/query/service/src/table_functions/cloud/task_dependents.rs @@ -110,10 +110,6 @@ impl TaskDependentsTable { #[async_trait::async_trait] impl Table for TaskDependentsTable { - fn is_local(&self) -> bool { - true - } - fn as_any(&self) -> &dyn Any { self } diff --git a/src/query/service/src/table_functions/cloud/task_dependents_enable.rs b/src/query/service/src/table_functions/cloud/task_dependents_enable.rs index 734006a5498e2..85413edb1f05b 100644 --- a/src/query/service/src/table_functions/cloud/task_dependents_enable.rs +++ b/src/query/service/src/table_functions/cloud/task_dependents_enable.rs @@ -83,10 +83,6 @@ impl TaskDependentsEnableTable { #[async_trait::async_trait] impl Table for TaskDependentsEnableTable { - fn is_local(&self) -> bool { - true - } - fn as_any(&self) -> &dyn Any { self } diff --git a/src/query/service/src/table_functions/cloud/task_history.rs b/src/query/service/src/table_functions/cloud/task_history.rs index a84d331d17baf..a2ce415103283 100644 --- a/src/query/service/src/table_functions/cloud/task_history.rs +++ b/src/query/service/src/table_functions/cloud/task_history.rs @@ -87,10 +87,6 @@ impl TaskHistoryTable { #[async_trait::async_trait] impl Table for TaskHistoryTable { - fn is_local(&self) -> bool { - true - } - fn as_any(&self) -> &dyn Any { self } diff --git a/src/query/service/src/table_functions/numbers/numbers_table.rs b/src/query/service/src/table_functions/numbers/numbers_table.rs index 09b017a961f73..50a57ef3cd745 100644 --- a/src/query/service/src/table_functions/numbers/numbers_table.rs +++ b/src/query/service/src/table_functions/numbers/numbers_table.rs @@ -22,6 +22,7 @@ use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::TableStatistics; use databend_common_catalog::table_args::TableArgs; use databend_common_exception::Result; @@ -111,8 +112,11 @@ impl NumbersTable { #[async_trait::async_trait] impl Table for NumbersTable { - fn is_local(&self) -> bool { - self.name() == "numbers_local" + fn distribution_level(&self) -> DistributionLevel { + match self.name() { + "numbers_local" => DistributionLevel::Local, + _ => DistributionLevel::Cluster, + } } fn as_any(&self) -> &dyn Any { diff --git a/src/query/service/src/table_functions/others/execute_background_job.rs b/src/query/service/src/table_functions/others/execute_background_job.rs index c1681625b76f5..7bfdaa2e42653 100644 --- a/src/query/service/src/table_functions/others/execute_background_job.rs +++ b/src/query/service/src/table_functions/others/execute_background_job.rs @@ -78,10 +78,6 @@ impl ExecuteBackgroundJobTable { #[async_trait::async_trait] impl Table for ExecuteBackgroundJobTable { - fn is_local(&self) -> bool { - true - } - fn as_any(&self) -> &dyn Any { self } diff --git a/src/query/service/src/table_functions/others/license_info.rs b/src/query/service/src/table_functions/others/license_info.rs index b4a986ad11ecc..6423a6d245888 100644 --- a/src/query/service/src/table_functions/others/license_info.rs +++ b/src/query/service/src/table_functions/others/license_info.rs @@ -99,10 +99,6 @@ impl LicenseInfoTable { #[async_trait::async_trait] impl Table for LicenseInfoTable { - fn is_local(&self) -> bool { - true - } - fn as_any(&self) -> &dyn Any { self } diff --git a/src/query/service/src/table_functions/others/suggested_background_tasks.rs b/src/query/service/src/table_functions/others/suggested_background_tasks.rs index 637b99b5fe591..039142a0cea5c 100644 --- a/src/query/service/src/table_functions/others/suggested_background_tasks.rs +++ b/src/query/service/src/table_functions/others/suggested_background_tasks.rs @@ -97,10 +97,6 @@ impl SuggestedBackgroundTasksTable { #[async_trait::async_trait] impl Table for SuggestedBackgroundTasksTable { - fn is_local(&self) -> bool { - true - } - fn as_any(&self) -> &dyn Any { self } diff --git a/src/query/service/src/table_functions/others/tenant_quota.rs b/src/query/service/src/table_functions/others/tenant_quota.rs index 9e79ebb5b7b60..99f33e2e5fa30 100644 --- a/src/query/service/src/table_functions/others/tenant_quota.rs +++ b/src/query/service/src/table_functions/others/tenant_quota.rs @@ -109,10 +109,6 @@ impl TenantQuotaTable { #[async_trait::async_trait] impl Table for TenantQuotaTable { - fn is_local(&self) -> bool { - true - } - fn as_any(&self) -> &dyn Any { self } diff --git a/src/query/service/src/table_functions/others/udf.rs b/src/query/service/src/table_functions/others/udf.rs index 96a8c15873bd6..622d7b83ab8d0 100644 --- a/src/query/service/src/table_functions/others/udf.rs +++ b/src/query/service/src/table_functions/others/udf.rs @@ -122,10 +122,6 @@ impl UdfEchoTable { #[async_trait::async_trait] impl Table for UdfEchoTable { - fn is_local(&self) -> bool { - true - } - fn as_any(&self) -> &dyn Any { self } diff --git a/src/query/service/src/table_functions/sync_crash_me.rs b/src/query/service/src/table_functions/sync_crash_me.rs index 5cd36268abcbb..ae9e97cee4750 100644 --- a/src/query/service/src/table_functions/sync_crash_me.rs +++ b/src/query/service/src/table_functions/sync_crash_me.rs @@ -90,10 +90,6 @@ impl SyncCrashMeTable { #[async_trait::async_trait] impl Table for SyncCrashMeTable { - fn is_local(&self) -> bool { - true - } - fn as_any(&self) -> &dyn Any { self } diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index 228f0753b4378..f2b9f92f9a4d5 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -1006,6 +1006,14 @@ impl TableContext for CtxDelegation { async fn drop_m_cte_temp_table(&self) -> Result<()> { todo!() } + + fn set_cluster(&self, _: Arc) { + todo!() + } + + async fn get_warehouse_cluster(&self) -> Result> { + todo!() + } } #[tokio::test(flavor = "multi_thread")] diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 884615af0a3dd..6a713e3d8804e 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -881,6 +881,14 @@ impl TableContext for CtxDelegation { async fn drop_m_cte_temp_table(&self) -> Result<()> { todo!() } + + fn set_cluster(&self, _: Arc) { + todo!() + } + + async fn get_warehouse_cluster(&self) -> Result> { + todo!() + } } #[derive(Clone, Debug)] diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index 292438a290e06..bb7240f586abd 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use databend_common_catalog::plan::DataSourceInfo; use databend_common_catalog::plan::DataSourcePlan; +use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_exception::Result; use databend_common_expression::DataSchemaRef; use databend_common_functions::BUILTIN_FUNCTIONS; @@ -710,6 +711,12 @@ impl PhysicalPlan { ) } + pub fn is_warehouse_distributed_plan(&self) -> bool { + self.children() + .any(|child| child.is_warehouse_distributed_plan()) + || matches!(self, Self::TableScan(v) if v.source.parts.kind == PartitionsShuffleKind::BroadcastWarehouse) + } + pub fn get_desc(&self) -> Result { Ok(match self { PhysicalPlan::TableScan(v) => format!( diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 9529adb43ef1c..03257f9738030 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -41,6 +41,7 @@ use crate::optimizer::join::SingleToInnerOptimizer; use crate::optimizer::rule::TransformResult; use crate::optimizer::statistics::CollectStatisticsOptimizer; use crate::optimizer::util::contains_local_table_scan; +use crate::optimizer::util::contains_warehouse_table_scan; use crate::optimizer::RuleFactory; use crate::optimizer::RuleID; use crate::optimizer::SExpr; @@ -374,6 +375,13 @@ pub async fn optimize_query(opt_ctx: &mut OptimizerContext, mut s_expr: SExpr) - if contains_local_table_scan(&s_expr, &opt_ctx.metadata) { opt_ctx.enable_distributed_optimization = false; info!("Disable distributed optimization due to local table scan."); + } else if contains_warehouse_table_scan(&s_expr, &opt_ctx.metadata) { + let warehouse = opt_ctx.table_ctx.get_warehouse_cluster().await?; + + if !warehouse.is_empty() { + opt_ctx.enable_distributed_optimization = true; + info!("Enable distributed optimization due to warehouse table scan."); + } } // Decorrelate subqueries, after this step, there should be no subquery in the expression. @@ -461,6 +469,13 @@ async fn get_optimized_memo(opt_ctx: &mut OptimizerContext, mut s_expr: SExpr) - if contains_local_table_scan(&s_expr, &opt_ctx.metadata) { opt_ctx.enable_distributed_optimization = false; info!("Disable distributed optimization due to local table scan."); + } else if contains_warehouse_table_scan(&s_expr, &opt_ctx.metadata) { + let warehouse = opt_ctx.table_ctx.get_warehouse_cluster().await?; + + if !warehouse.is_empty() { + opt_ctx.enable_distributed_optimization = true; + info!("Enable distributed optimization due to warehouse table scan."); + } } // Decorrelate subqueries, after this step, there should be no subquery in the expression. diff --git a/src/query/sql/src/planner/optimizer/util.rs b/src/query/sql/src/planner/optimizer/util.rs index 8120893a8cec2..1e8641dc6da4f 100644 --- a/src/query/sql/src/planner/optimizer/util.rs +++ b/src/query/sql/src/planner/optimizer/util.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_catalog::table::DistributionLevel; + use super::SExpr; use crate::plans::RelOperator; use crate::MetadataRef; @@ -22,9 +24,38 @@ pub fn contains_local_table_scan(s_expr: &SExpr, metadata: &MetadataRef) -> bool .children() .any(|s_expr| contains_local_table_scan(s_expr, metadata)) || if let RelOperator::Scan(get) = s_expr.plan() { - metadata.read().table(get.table_index).table().is_local() + matches!( + metadata + .read() + .table(get.table_index) + .table() + .distribution_level(), + DistributionLevel::Local + ) } else { false } || matches!(s_expr.plan(), RelOperator::RecursiveCteScan { .. }) } + +pub fn contains_warehouse_table_scan(s_expr: &SExpr, metadata: &MetadataRef) -> bool { + if s_expr + .children() + .any(|s_expr| contains_warehouse_table_scan(s_expr, metadata)) + { + return true; + } + + if let RelOperator::Scan(scan) = s_expr.plan() { + return matches!( + metadata + .read() + .table(scan.table_index) + .table() + .distribution_level(), + DistributionLevel::Warehouse, + ); + } + + false +} diff --git a/src/query/storages/delta/src/table.rs b/src/query/storages/delta/src/table.rs index 2bff4c9b76d22..64e0d36f67ced 100644 --- a/src/query/storages/delta/src/table.rs +++ b/src/query/storages/delta/src/table.rs @@ -26,6 +26,7 @@ use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table_args::TableArgs; use databend_common_catalog::table_context::TableContext; @@ -367,8 +368,8 @@ impl Table for DeltaTable { self } - fn is_local(&self) -> bool { - false + fn distribution_level(&self) -> DistributionLevel { + DistributionLevel::Cluster } fn get_table_info(&self) -> &TableInfo { diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index ce04e7587e14d..bb252fde36204 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -39,6 +39,7 @@ use databend_common_catalog::table::Bound; use databend_common_catalog::table::ColumnRange; use databend_common_catalog::table::ColumnStatisticsProvider; use databend_common_catalog::table::CompactionLimits; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::NavigationDescriptor; use databend_common_catalog::table::TimeNavigation; use databend_common_catalog::table_context::AbortChecker; @@ -605,8 +606,8 @@ impl FuseTable { #[async_trait::async_trait] impl Table for FuseTable { - fn is_local(&self) -> bool { - false + fn distribution_level(&self) -> DistributionLevel { + DistributionLevel::Cluster } fn as_any(&self) -> &dyn Any { diff --git a/src/query/storages/fuse/src/table_functions/function_template/simple_arg_func.rs b/src/query/storages/fuse/src/table_functions/function_template/simple_arg_func.rs index 310b0f5e31dd2..3dd86628d731b 100644 --- a/src/query/storages/fuse/src/table_functions/function_template/simple_arg_func.rs +++ b/src/query/storages/fuse/src/table_functions/function_template/simple_arg_func.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use databend_common_catalog::plan::DataSourcePlan; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table_args::TableArgs; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; @@ -29,9 +30,10 @@ pub trait SimpleArgFunc { type Args; fn schema() -> TableSchemaRef; - fn is_local_func() -> bool { - true + fn distribution_level() -> DistributionLevel { + DistributionLevel::Local } + async fn apply( ctx: &Arc, args: &Self::Args, @@ -63,8 +65,8 @@ where T::schema() } - fn is_local_func(&self) -> bool { - T::is_local_func() + fn distribution_level(&self) -> DistributionLevel { + T::distribution_level() } async fn apply( diff --git a/src/query/storages/fuse/src/table_functions/function_template/simple_func_template.rs b/src/query/storages/fuse/src/table_functions/function_template/simple_func_template.rs index 3ce61497255bc..49985d3ba24ef 100644 --- a/src/query/storages/fuse/src/table_functions/function_template/simple_func_template.rs +++ b/src/query/storages/fuse/src/table_functions/function_template/simple_func_template.rs @@ -21,6 +21,7 @@ use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table::DistributionLevel; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_expression::TableSchemaRef; @@ -64,8 +65,8 @@ pub trait SimpleTableFunc: Send + Sync + 'static { "table_func_template".to_owned() } - fn is_local_func(&self) -> bool { - true + fn distribution_level(&self) -> DistributionLevel { + DistributionLevel::Local } fn table_args(&self) -> Option; @@ -129,8 +130,8 @@ impl Table for TableFunctionTemplate { &self.table_info } - fn is_local(&self) -> bool { - self.inner.is_local_func() + fn distribution_level(&self) -> DistributionLevel { + self.inner.distribution_level() } #[async_backtrace::framed] @@ -140,18 +141,24 @@ impl Table for TableFunctionTemplate { _push_downs: Option, _dry_run: bool, ) -> Result<(PartStatistics, Partitions)> { - match self.inner.is_local_func() { - true => Ok(( + match self.inner.distribution_level() { + DistributionLevel::Local => Ok(( PartStatistics::default(), Partitions::create(PartitionsShuffleKind::Seq, vec![Arc::new(Box::new( PlaceHolder, ))]), )), - false => Ok(( + DistributionLevel::Cluster => Ok(( PartStatistics::default(), - Partitions::create(PartitionsShuffleKind::Broadcast, vec![Arc::new(Box::new( - PlaceHolder, - ))]), + Partitions::create(PartitionsShuffleKind::BroadcastCluster, vec![Arc::new( + Box::new(PlaceHolder), + )]), + )), + DistributionLevel::Warehouse => Ok(( + PartStatistics::default(), + Partitions::create(PartitionsShuffleKind::BroadcastWarehouse, vec![Arc::new( + Box::new(PlaceHolder), + )]), )), } } diff --git a/src/query/storages/fuse/src/table_functions/set_cache_capacity.rs b/src/query/storages/fuse/src/table_functions/set_cache_capacity.rs index 05e704c4e0455..9c1cfc6bc9e13 100644 --- a/src/query/storages/fuse/src/table_functions/set_cache_capacity.rs +++ b/src/query/storages/fuse/src/table_functions/set_cache_capacity.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use databend_common_catalog::plan::DataSourcePlan; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::types::StringType; @@ -62,9 +63,8 @@ impl SimpleTableFunc for SetCacheCapacity { ]) } - fn is_local_func(&self) -> bool { - // cache operation needs to be broadcast to all nodes - false + fn distribution_level(&self) -> DistributionLevel { + DistributionLevel::Warehouse } async fn apply( diff --git a/src/query/storages/hive/hive/src/hive_table.rs b/src/query/storages/hive/hive/src/hive_table.rs index b2de36a1e7bc4..dc24507edd764 100644 --- a/src/query/storages/hive/hive/src/hive_table.rs +++ b/src/query/storages/hive/hive/src/hive_table.rs @@ -25,6 +25,7 @@ use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::NavigationPoint; use databend_common_catalog::table::Table; use databend_common_catalog::table::TableStatistics; @@ -396,8 +397,8 @@ impl HiveTable { #[async_trait::async_trait] impl Table for HiveTable { - fn is_local(&self) -> bool { - false + fn distribution_level(&self) -> DistributionLevel { + DistributionLevel::Cluster } fn as_any(&self) -> &(dyn std::any::Any + 'static) { diff --git a/src/query/storages/iceberg/src/table.rs b/src/query/storages/iceberg/src/table.rs index b41ccaf7aaacd..47fbacb5b79de 100644 --- a/src/query/storages/iceberg/src/table.rs +++ b/src/query/storages/iceberg/src/table.rs @@ -28,6 +28,7 @@ use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table::TableStatistics; use databend_common_catalog::table_args::TableArgs; @@ -292,8 +293,8 @@ impl Table for IcebergTable { self } - fn is_local(&self) -> bool { - false + fn distribution_level(&self) -> DistributionLevel { + DistributionLevel::Cluster } fn get_table_info(&self) -> &TableInfo { diff --git a/src/query/storages/memory/src/memory_table.rs b/src/query/storages/memory/src/memory_table.rs index 2ec25250b6c79..915e057b7a160 100644 --- a/src/query/storages/memory/src/memory_table.rs +++ b/src/query/storages/memory/src/memory_table.rs @@ -26,6 +26,7 @@ use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::plan::Projection; use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table::TableStatistics; use databend_common_catalog::table_context::TableContext; @@ -139,8 +140,8 @@ impl Table for MemoryTable { /// MemoryTable could be distributed table, yet we only insert data in one node per query /// Because commit_insert did not support distributed transaction - fn is_local(&self) -> bool { - false + fn distribution_level(&self) -> DistributionLevel { + DistributionLevel::Cluster } fn support_column_projection(&self) -> bool { @@ -207,7 +208,7 @@ impl Table for MemoryTable { let parts = vec![MemoryPartInfo::create()]; return Ok(( statistics, - Partitions::create(PartitionsShuffleKind::Broadcast, parts), + Partitions::create(PartitionsShuffleKind::BroadcastCluster, parts), )); } diff --git a/src/query/storages/null/src/null_table.rs b/src/query/storages/null/src/null_table.rs index 5bb625429694f..c7c73210b091f 100644 --- a/src/query/storages/null/src/null_table.rs +++ b/src/query/storages/null/src/null_table.rs @@ -20,6 +20,7 @@ use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; @@ -62,8 +63,8 @@ impl Table for NullTable { } /// Null do not keep data, it's safe to make it non-local. - fn is_local(&self) -> bool { - false + fn distribution_level(&self) -> DistributionLevel { + DistributionLevel::Cluster } #[async_backtrace::framed] diff --git a/src/query/storages/orc/src/table.rs b/src/query/storages/orc/src/table.rs index d3da0a13008ab..8b38930df7d8f 100644 --- a/src/query/storages/orc/src/table.rs +++ b/src/query/storages/orc/src/table.rs @@ -24,6 +24,7 @@ use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::plan::StageTableInfo; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table::TableStatistics; use databend_common_catalog::table_context::TableContext; @@ -122,8 +123,8 @@ impl Table for OrcTable { self } - fn is_local(&self) -> bool { - false + fn distribution_level(&self) -> DistributionLevel { + DistributionLevel::Cluster } fn get_table_info(&self) -> &TableInfo { diff --git a/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs b/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs index 286f39498ec18..9bbc66eb79860 100644 --- a/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs +++ b/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs @@ -29,6 +29,7 @@ use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::query_kind::QueryKind; use databend_common_catalog::table::ColumnStatisticsProvider; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::DummyColumnStatisticsProvider; use databend_common_catalog::table::Table; use databend_common_catalog::table::TableStatistics; @@ -188,8 +189,8 @@ impl Table for ParquetRSTable { self } - fn is_local(&self) -> bool { - false + fn distribution_level(&self) -> DistributionLevel { + DistributionLevel::Cluster } fn get_table_info(&self) -> &TableInfo { diff --git a/src/query/storages/result_cache/src/table_function/table.rs b/src/query/storages/result_cache/src/table_function/table.rs index 74dcb3a0373d1..cd4a55757fce0 100644 --- a/src/query/storages/result_cache/src/table_function/table.rs +++ b/src/query/storages/result_cache/src/table_function/table.rs @@ -95,10 +95,6 @@ impl ResultScan { #[async_trait::async_trait] impl Table for ResultScan { - fn is_local(&self) -> bool { - true - } - fn as_any(&self) -> &dyn Any { self } diff --git a/src/query/storages/stage/src/stage_table.rs b/src/query/storages/stage/src/stage_table.rs index 2675f4e0b1f8d..d24ed83cc9040 100644 --- a/src/query/storages/stage/src/stage_table.rs +++ b/src/query/storages/stage/src/stage_table.rs @@ -23,6 +23,7 @@ use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::plan::StageTableInfo; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; @@ -163,8 +164,8 @@ impl Table for StageTable { } } - fn is_local(&self) -> bool { - false + fn distribution_level(&self) -> DistributionLevel { + DistributionLevel::Cluster } fn read_data( diff --git a/src/query/storages/stream/src/stream_table.rs b/src/query/storages/stream/src/stream_table.rs index 6c7f052c71a7d..80b0f849912a2 100644 --- a/src/query/storages/stream/src/stream_table.rs +++ b/src/query/storages/stream/src/stream_table.rs @@ -24,6 +24,7 @@ use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::plan::StreamColumn; use databend_common_catalog::table::ColumnStatisticsProvider; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table::TableStatistics; use databend_common_catalog::table_context::TableContext; @@ -335,8 +336,8 @@ impl StreamTable { #[async_trait::async_trait] impl Table for StreamTable { - fn is_local(&self) -> bool { - false + fn distribution_level(&self) -> DistributionLevel { + DistributionLevel::Cluster } fn as_any(&self) -> &dyn Any { diff --git a/src/query/storages/system/src/backtrace_table.rs b/src/query/storages/system/src/backtrace_table.rs index 0d995cc4bc345..3291434771e16 100644 --- a/src/query/storages/system/src/backtrace_table.rs +++ b/src/query/storages/system/src/backtrace_table.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use databend_common_base::get_all_tasks; use databend_common_base::runtime::Runtime; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; @@ -44,7 +45,7 @@ impl SyncSystemTable for BacktraceTable { const NAME: &'static str = "system.backtrace"; // Allow distributed query. - const DATA_IN_LOCAL: bool = false; + const DISTRIBUTION_LEVEL: DistributionLevel = DistributionLevel::Warehouse; fn get_table_info(&self) -> &TableInfo { &self.table_info diff --git a/src/query/storages/system/src/caches_table.rs b/src/query/storages/system/src/caches_table.rs index 04c411675dfa1..f2be151ef77f8 100644 --- a/src/query/storages/system/src/caches_table.rs +++ b/src/query/storages/system/src/caches_table.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_exception::Result; use databend_common_expression::types::NumberDataType; @@ -62,7 +63,7 @@ impl SyncSystemTable for CachesTable { const NAME: &'static str = "system.caches"; // Allow distributed query. - const DATA_IN_LOCAL: bool = false; + const DISTRIBUTION_LEVEL: DistributionLevel = DistributionLevel::Warehouse; fn get_table_info(&self) -> &TableInfo { &self.table_info diff --git a/src/query/storages/system/src/log_queue.rs b/src/query/storages/system/src/log_queue.rs index 59cea8cecaac0..af1b1a4f0b9e4 100644 --- a/src/query/storages/system/src/log_queue.rs +++ b/src/query/storages/system/src/log_queue.rs @@ -23,6 +23,7 @@ use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; @@ -150,8 +151,8 @@ impl SystemLogTable { #[async_trait::async_trait] impl Table for SystemLogTable { - fn is_local(&self) -> bool { - false + fn distribution_level(&self) -> DistributionLevel { + DistributionLevel::Warehouse } fn as_any(&self) -> &dyn Any { @@ -172,9 +173,9 @@ impl Table for SystemLogTable { Ok(( PartStatistics::default(), // Make the table in distributed. - Partitions::create(PartitionsShuffleKind::Broadcast, vec![Arc::new(Box::new( - SystemTablePart, - ))]), + Partitions::create(PartitionsShuffleKind::BroadcastWarehouse, vec![Arc::new( + Box::new(SystemTablePart), + )]), )) } diff --git a/src/query/storages/system/src/malloc_stats_totals_table.rs b/src/query/storages/system/src/malloc_stats_totals_table.rs index 4942edaaa69c5..9fa57054fb59d 100644 --- a/src/query/storages/system/src/malloc_stats_totals_table.rs +++ b/src/query/storages/system/src/malloc_stats_totals_table.rs @@ -15,6 +15,7 @@ use std::default::Default; use std::sync::Arc; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; @@ -53,7 +54,7 @@ pub struct MallocStatsTotalsTable { impl SyncSystemTable for MallocStatsTotalsTable { const NAME: &'static str = "system.malloc_stats_totals"; - const DATA_IN_LOCAL: bool = false; + const DISTRIBUTION_LEVEL: DistributionLevel = DistributionLevel::Warehouse; fn get_table_info(&self) -> &TableInfo { &self.table_info diff --git a/src/query/storages/system/src/metrics_table.rs b/src/query/storages/system/src/metrics_table.rs index b8811aecb297a..4921929002785 100644 --- a/src/query/storages/system/src/metrics_table.rs +++ b/src/query/storages/system/src/metrics_table.rs @@ -19,6 +19,7 @@ use databend_common_base::runtime::metrics::MetricSample; use databend_common_base::runtime::metrics::MetricValue; use databend_common_base::runtime::metrics::GLOBAL_METRICS_REGISTRY; use databend_common_base::runtime::GLOBAL_MEM_STAT; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; @@ -44,7 +45,7 @@ pub struct MetricsTable { impl SyncSystemTable for MetricsTable { const NAME: &'static str = "system.metrics"; // Allow distributed query. - const DATA_IN_LOCAL: bool = false; + const DISTRIBUTION_LEVEL: DistributionLevel = DistributionLevel::Warehouse; const BROADCAST_TRUNCATE: bool = true; fn get_table_info(&self) -> &TableInfo { diff --git a/src/query/storages/system/src/processes_table.rs b/src/query/storages/system/src/processes_table.rs index e864af9f29a7b..d34f90d242368 100644 --- a/src/query/storages/system/src/processes_table.rs +++ b/src/query/storages/system/src/processes_table.rs @@ -17,6 +17,7 @@ use std::time::Duration; use chrono::DateTime; use chrono::Utc; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; @@ -46,7 +47,7 @@ pub struct ProcessesTable { impl SyncSystemTable for ProcessesTable { const NAME: &'static str = "system.processes"; - const DATA_IN_LOCAL: bool = false; + const DISTRIBUTION_LEVEL: DistributionLevel = DistributionLevel::Warehouse; fn get_table_info(&self) -> &TableInfo { &self.table_info diff --git a/src/query/storages/system/src/queries_profiling.rs b/src/query/storages/system/src/queries_profiling.rs index f2f638aefa4bf..ab1914be835e5 100644 --- a/src/query/storages/system/src/queries_profiling.rs +++ b/src/query/storages/system/src/queries_profiling.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; @@ -47,7 +48,7 @@ pub struct QueriesProfilingTable { impl SyncSystemTable for QueriesProfilingTable { const NAME: &'static str = "system.queries_profiling"; - const DATA_IN_LOCAL: bool = false; + const DISTRIBUTION_LEVEL: DistributionLevel = DistributionLevel::Warehouse; fn get_table_info(&self) -> &TableInfo { &self.table_info diff --git a/src/query/storages/system/src/table.rs b/src/query/storages/system/src/table.rs index 5ff2b253d96fa..f1c12c009e406 100644 --- a/src/query/storages/system/src/table.rs +++ b/src/query/storages/system/src/table.rs @@ -21,6 +21,7 @@ use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; @@ -57,7 +58,7 @@ impl PartInfo for SystemTablePart { pub trait SyncSystemTable: Send + Sync { const NAME: &'static str; - const DATA_IN_LOCAL: bool = true; + const DISTRIBUTION_LEVEL: DistributionLevel = DistributionLevel::Local; const BROADCAST_TRUNCATE: bool = false; fn get_table_info(&self) -> &TableInfo; @@ -68,18 +69,24 @@ pub trait SyncSystemTable: Send + Sync { _ctx: Arc, _push_downs: Option, ) -> Result<(PartStatistics, Partitions)> { - match Self::DATA_IN_LOCAL { - true => Ok(( + match Self::DISTRIBUTION_LEVEL { + DistributionLevel::Local => Ok(( PartStatistics::default(), Partitions::create(PartitionsShuffleKind::Seq, vec![Arc::new(Box::new( SystemTablePart, ))]), )), - false => Ok(( + DistributionLevel::Cluster => Ok(( PartStatistics::default(), - Partitions::create(PartitionsShuffleKind::Broadcast, vec![Arc::new(Box::new( - SystemTablePart, - ))]), + Partitions::create(PartitionsShuffleKind::BroadcastCluster, vec![Arc::new( + Box::new(SystemTablePart), + )]), + )), + DistributionLevel::Warehouse => Ok(( + PartStatistics::default(), + Partitions::create(PartitionsShuffleKind::BroadcastWarehouse, vec![Arc::new( + Box::new(SystemTablePart), + )]), )), } } @@ -109,9 +116,14 @@ impl Table for SyncOneBlockSystemTable bool { + fn distribution_level(&self) -> DistributionLevel { // When querying a memory table, we send the partition to one node for execution. The other nodes send empty partitions. - false + // For system tables, they are always non-local, which ensures that system tables can be JOIN or UNION operation with any other table. + match TTable::DISTRIBUTION_LEVEL { + DistributionLevel::Local => DistributionLevel::Cluster, + DistributionLevel::Cluster => DistributionLevel::Cluster, + DistributionLevel::Warehouse => DistributionLevel::Warehouse, + } } fn get_table_info(&self) -> &TableInfo { @@ -155,7 +167,7 @@ impl Table for SyncOneBlockSystemTable bool { + fn broadcast_truncate_to_warehouse(&self) -> bool { TTable::BROADCAST_TRUNCATE } } @@ -198,7 +210,7 @@ impl SyncSource for SystemTableSyncSource &TableInfo; async fn get_full_data( @@ -213,18 +225,24 @@ pub trait AsyncSystemTable: Send + Sync { _ctx: Arc, _push_downs: Option, ) -> Result<(PartStatistics, Partitions)> { - match Self::DATA_IN_LOCAL { - true => Ok(( + match Self::DISTRIBUTION_LEVEL { + DistributionLevel::Local => Ok(( PartStatistics::default(), Partitions::create(PartitionsShuffleKind::Seq, vec![Arc::new(Box::new( SystemTablePart, ))]), )), - false => Ok(( + DistributionLevel::Cluster => Ok(( PartStatistics::default(), - Partitions::create(PartitionsShuffleKind::Broadcast, vec![Arc::new(Box::new( - SystemTablePart, - ))]), + Partitions::create(PartitionsShuffleKind::BroadcastCluster, vec![Arc::new( + Box::new(SystemTablePart), + )]), + )), + DistributionLevel::Warehouse => Ok(( + PartStatistics::default(), + Partitions::create(PartitionsShuffleKind::BroadcastWarehouse, vec![Arc::new( + Box::new(SystemTablePart), + )]), )), } } @@ -250,9 +268,14 @@ impl Table for AsyncOneBlockSystemTable bool { + fn distribution_level(&self) -> DistributionLevel { // When querying a memory table, we send the partition to one node for execution. The other nodes send empty partitions. - false + // For system tables, they are always non-local, which ensures that system tables can be JOIN or UNION operation with any other table. + match TTable::DISTRIBUTION_LEVEL { + DistributionLevel::Local => DistributionLevel::Cluster, + DistributionLevel::Cluster => DistributionLevel::Cluster, + DistributionLevel::Warehouse => DistributionLevel::Warehouse, + } } fn get_table_info(&self) -> &TableInfo { diff --git a/src/query/storages/system/src/temp_files_table.rs b/src/query/storages/system/src/temp_files_table.rs index f4dae9e7f4ead..c0ee1678d93f7 100644 --- a/src/query/storages/system/src/temp_files_table.rs +++ b/src/query/storages/system/src/temp_files_table.rs @@ -21,6 +21,7 @@ use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table::DistributionLevel; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; @@ -64,9 +65,8 @@ pub struct TempFilesTable { #[async_trait::async_trait] impl Table for TempFilesTable { - fn is_local(&self) -> bool { - // Follow the practice of `SyncOneBlockSystemTable::is_local` - false + fn distribution_level(&self) -> DistributionLevel { + DistributionLevel::Cluster } fn as_any(&self) -> &dyn Any { From 5faf4a6cfe171c1f1733cf3713fbfbe1ac801b69 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Tue, 14 Jan 2025 15:54:05 +0800 Subject: [PATCH 11/18] chore(ci): fix release bump version (#17277) --- .github/scripts/bump_version.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/scripts/bump_version.js b/.github/scripts/bump_version.js index 6cfd59c4033d8..619fbe197b3a4 100644 --- a/.github/scripts/bump_version.js +++ b/.github/scripts/bump_version.js @@ -7,15 +7,15 @@ module.exports = async ({ github, context, core }) => { const { TYPE, TAG } = process.env; - const RE_TAG_STABLE = /^v(\d+)\.(\d+)\.(\d+)$/g; - const RE_TAG_NIGHTLY = /^v(\d+)\.(\d+)\.(\d+)-nightly$/g; - const RE_TAG_PATCH = /^v(\d+)\.(\d+)\.(\d+)-p(\d+)$/g; + const RE_TAG_STABLE = /^v(\d+)\.(\d+)\.(\d+)$/; + const RE_TAG_NIGHTLY = /^v(\d+)\.(\d+)\.(\d+)-nightly$/; + const RE_TAG_PATCH = /^v(\d+)\.(\d+)\.(\d+)-p(\d+)$/; switch (TYPE) { case "": case "nightly": { core.setOutput("sha", context.sha); - core.info(`Nightly release triggered by ${TAG} (${context.sha})`); + core.info(`Nightly release triggered by (${context.sha})`); let previous = null; const releases = await github.rest.repos.listReleases({ @@ -23,8 +23,8 @@ module.exports = async ({ github, context, core }) => { repo: context.repo.repo, }); for (const release of releases.data) { - const result = RE_TAG_NIGHTLY.exec(release.tag_name); - if (result) { + const ret = RE_TAG_NIGHTLY.exec(release.tag_name); + if (ret) { previous = release.tag_name; break; } From c8b8a28dba0b6c9cb7918e95421bf7ce49ec177a Mon Sep 17 00:00:00 2001 From: everpcpc Date: Tue, 14 Jan 2025 16:28:41 +0800 Subject: [PATCH 12/18] chore(ci): fix release patch version (#17278) --- .github/scripts/bump_version.js | 198 +++++++++++++++++++------------- 1 file changed, 119 insertions(+), 79 deletions(-) diff --git a/.github/scripts/bump_version.js b/.github/scripts/bump_version.js index 619fbe197b3a4..415eea0808ff3 100644 --- a/.github/scripts/bump_version.js +++ b/.github/scripts/bump_version.js @@ -11,23 +11,118 @@ module.exports = async ({ github, context, core }) => { const RE_TAG_NIGHTLY = /^v(\d+)\.(\d+)\.(\d+)-nightly$/; const RE_TAG_PATCH = /^v(\d+)\.(\d+)\.(\d+)-p(\d+)$/; - switch (TYPE) { - case "": - case "nightly": { - core.setOutput("sha", context.sha); - core.info(`Nightly release triggered by (${context.sha})`); + async function getPreviousNightlyRelease(github, context) { + const releases = await github.rest.repos.listReleases({ + owner: context.repo.owner, + repo: context.repo.repo, + }); + for (const release of releases.data) { + const ret = RE_TAG_NIGHTLY.exec(release.tag_name); + if (ret) { + return release.tag_name; + } + } + } + + function getNextNightlyRelease(previous) { + const nightly = RE_TAG_NIGHTLY.exec(previous); + if (nightly) { + const major = nightly[1]; + const minor = nightly[2]; + const patch = parseInt(nightly[3]) + 1; + return `v${major}.${minor}.${patch}-nightly`; + } + } - let previous = null; + async function getPreviousStableRelease(github, context) { + let page = 1; + while (true) { const releases = await github.rest.repos.listReleases({ owner: context.repo.owner, repo: context.repo.repo, + page, }); + if (releases.data.length === 0) { + break; + } + page++; for (const release of releases.data) { - const ret = RE_TAG_NIGHTLY.exec(release.tag_name); + const ret = RE_TAG_STABLE.exec(release.tag_name); if (ret) { - previous = release.tag_name; - break; + return release.tag_name; + } + } + } + } + + function getNextStableRelease() { + const nightly = RE_TAG_NIGHTLY.exec(TAG); + if (nightly) { + const major = nightly[1]; + const minor = nightly[2]; + const patch = nightly[3]; + return `v${major}.${minor}.${patch}`; + } + } + + async function getPreviousPatchRelease(github, context) { + let page = 1; + while (true) { + const releases = await github.rest.repos.listReleases({ + owner: context.repo.owner, + repo: context.repo.repo, + page, + }); + if (releases.data.length === 0) { + break; + } + page++; + for (const release of releases.data) { + if (!release.tag_name.startsWith(TAG)) { + continue; + } + if (release.tag_name === TAG) { + // no previous patch release, use the previous stable release + return release.tag_name; + } + const ret = RE_TAG_PATCH.exec(release.tag_name); + if (!ret) { + core.warning(`Ignore invalid patch release ${release.tag_name}`); + continue; } + return release.tag_name; + } + } + } + + function getNextPatchRelease(previous) { + const stable = RE_TAG_STABLE.exec(previous); + if (stable) { + const major = stable[1]; + const minor = stable[2]; + const patch = stable[3]; + return `v${major}.${minor}.${patch}-p1`; + } + const patch = RE_TAG_PATCH.exec(previous); + if (patch) { + const major = patch[1]; + const minor = patch[2]; + const patch = patch[3]; + const pv = parseInt(patch[4]); + return `v${major}.${minor}.${patch}-p${pv + 1}`; + } + } + + switch (TYPE) { + case "": + case "nightly": { + core.setOutput("sha", context.sha); + core.info(`Nightly release triggered by (${context.sha})`); + + const previous = await getPreviousNightlyRelease(github, context); + if (!previous) { + core.setFailed(`No previous nightly release found, ignoring`); + return; } core.setOutput("previous", previous); core.info(`Nightly release with previous release: ${previous}`); @@ -37,15 +132,11 @@ module.exports = async ({ github, context, core }) => { core.info(`Release create manually with tag ${TAG}`); return; } - const result = RE_TAG_NIGHTLY.exec(previous); - if (!result) { - core.setFailed(`The previous tag ${previous} is invalid.`); + const nextTag = getNextNightlyRelease(previous); + if (!nextTag) { + core.setFailed(`No next nightly release from ${previous}`); return; } - const major = result[1]; - const minor = result[2]; - const patch = (parseInt(result[3]) + 1).toString(); - const nextTag = `v${major}.${minor}.${patch}-nightly`; core.setOutput("tag", nextTag); core.info(`Release create new nightly ${nextTag}`); return; @@ -58,38 +149,15 @@ module.exports = async ({ github, context, core }) => { return; } core.info(`Stable release triggered by ${TAG} (${context.sha})`); - const result = RE_TAG_NIGHTLY.exec(TAG); - if (!result) { - core.setFailed(`The tag ${TAG} is invalid, ignoring`); + const nextTag = getNextStableRelease(); + if (!nextTag) { + core.setFailed(`No stable release from ${TAG}`); return; } - const major = result[1]; - const minor = result[2]; - const patch = result[3]; - const nextTag = `v${major}.${minor}.${patch}`; core.setOutput("tag", nextTag); core.info(`Stable release ${nextTag} from ${TAG}`); - let previous = null; - let page = 1; - while (true) { - const releases = await github.rest.repos.listReleases({ - owner: context.repo.owner, - repo: context.repo.repo, - page, - }); - if (releases.data.length === 0) { - break; - } - page++; - for (const release of releases.data) { - const ret = RE_TAG_STABLE.exec(release.tag_name); - if (ret) { - previous = release.tag_name; - break; - } - } - } + const previous = await getPreviousStableRelease(github, context); if (!previous) { core.setFailed(`No previous stable release found, ignoring`); return; @@ -120,49 +188,21 @@ module.exports = async ({ github, context, core }) => { `Patch release triggered by ${TAG} (${branch.data.commit.sha})` ); - let pv = 1; - let previous = null; - let page = 1; - while (true) { - const releases = await github.rest.repos.listReleases({ - owner: context.repo.owner, - repo: context.repo.repo, - page, - }); - if (releases.data.length === 0) { - break; - } - page++; - for (const release of releases.data) { - if (!release.tag_name.startsWith(TAG)) { - continue; - } - if (release.tag_name === TAG) { - previous = release.tag_name; - break; - } - const ret = RE_TAG_PATCH.exec(release.tag_name); - if (!ret) { - core.warning(`Ignore previous release ${release.tag_name}`); - continue; - } - pv = parseInt(result[4]) + 1; - previous = release.tag_name; - } - } + const previous = await getPreviousPatchRelease(github, context); if (!previous) { - core.setFailed(`No previous stable release found, ignoring`); + core.setFailed(`No previous patch release found, ignoring`); return; } core.setOutput("previous", previous); core.info(`Patch release with previous release: ${previous}`); - const major = result[1]; - const minor = result[2]; - const patch = result[3]; - const nextTag = `v${major}.${minor}.${patch}-p${pv}`; + const nextTag = getNextPatchRelease(previous); + if (!nextTag) { + core.setFailed(`No next patch release from ${previous}`); + return; + } core.setOutput("tag", nextTag); - core.info(`Patch release ${nextTag} from ${TAG}`); + core.info(`Patch release ${nextTag} from ${previous}`); return; } From d1a4975b4f7a471e202621545709490e68e5d54e Mon Sep 17 00:00:00 2001 From: everpcpc Date: Tue, 14 Jan 2025 16:48:25 +0800 Subject: [PATCH 13/18] chore(ci): fix gen next patch release (#17281) --- .github/scripts/bump_version.js | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/scripts/bump_version.js b/.github/scripts/bump_version.js index 415eea0808ff3..a77b8817b92c7 100644 --- a/.github/scripts/bump_version.js +++ b/.github/scripts/bump_version.js @@ -103,13 +103,13 @@ module.exports = async ({ github, context, core }) => { const patch = stable[3]; return `v${major}.${minor}.${patch}-p1`; } - const patch = RE_TAG_PATCH.exec(previous); - if (patch) { - const major = patch[1]; - const minor = patch[2]; - const patch = patch[3]; - const pv = parseInt(patch[4]); - return `v${major}.${minor}.${patch}-p${pv + 1}`; + const version = RE_TAG_PATCH.exec(previous); + if (version) { + const major = version[1]; + const minor = version[2]; + const patch = version[3]; + const pv = parseInt(version[4]) + 1; + return `v${major}.${minor}.${patch}-p${pv}`; } } From 6bcfe0957dcf8ed5eaec92be826e2314daa7fdff Mon Sep 17 00:00:00 2001 From: everpcpc Date: Tue, 14 Jan 2025 17:27:25 +0800 Subject: [PATCH 14/18] chore(ci): unit test with 16c64g (#17283) --- .github/workflows/reuse.linux.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/reuse.linux.yml b/.github/workflows/reuse.linux.yml index dd5a2652a2fda..30bcd3d662fb9 100644 --- a/.github/workflows/reuse.linux.yml +++ b/.github/workflows/reuse.linux.yml @@ -95,7 +95,7 @@ jobs: # artifacts: query test_unit: - runs-on: [self-hosted, X64, Linux, 8c32g, "${{ inputs.runner_provider }}"] + runs-on: [self-hosted, X64, Linux, 16c64g, "${{ inputs.runner_provider }}"] steps: - uses: actions/checkout@v4 with: From 2743ff1e26bbaf7b3362887a1e86da4d2a0b905a Mon Sep 17 00:00:00 2001 From: Bohu Date: Tue, 14 Jan 2025 17:29:18 +0800 Subject: [PATCH 15/18] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d308b1026cb6a..cf10f1d3b80b6 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@

Databend: The Next-Gen Cloud [Data+AI] Analytics

-

Zero Disk Architecture

+

The open-source, on-premise alternative to Snowflake

From f71635266c0f3383612fb8b2cf68e62f097a0977 Mon Sep 17 00:00:00 2001 From: Bohu Date: Tue, 14 Jan 2025 17:41:11 +0800 Subject: [PATCH 16/18] Update README.md --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index cf10f1d3b80b6..bf1243ebfd776 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,10 @@ **Databend**, built in Rust, is an open-source cloud data warehouse that serves as a cost-effective [alternative to Snowflake](https://github.com/datafuselabs/databend/issues/13059). With its focus on fast query execution and data ingestion, it's designed for complex analysis of the world's largest datasets. +**Production-Proven Scale:** +- 🤝 **Enterprise Adoption**: Trusted by over **50 organizations** processing more than **100 million queries daily** +- 🗄️ **Massive Scale**: Successfully managing over **800 petabytes** of analytical data + ## ⚡ Performance
From 8a47dce8bf8681ff96781aa0391fa938071a4c53 Mon Sep 17 00:00:00 2001 From: baishen Date: Tue, 14 Jan 2025 19:23:47 +0800 Subject: [PATCH 17/18] fix(query): window function support params (#17282) --- src/query/ast/src/parser/expr.rs | 30 ++++- src/query/ast/tests/it/parser.rs | 1 + src/query/ast/tests/it/testdata/expr.txt | 114 ++++++++++++++++++ .../query/window_function/window_basic.test | 29 +++++ 4 files changed, 169 insertions(+), 5 deletions(-) diff --git a/src/query/ast/src/parser/expr.rs b/src/query/ast/src/parser/expr.rs index ec3ffe28e109f..854923757dc22 100644 --- a/src/query/ast/src/parser/expr.rs +++ b/src/query/ast/src/parser/expr.rs @@ -1083,18 +1083,36 @@ pub fn expr_element(i: Input) -> IResult> { }, }, ); + let function_call_with_params_window = map( + rule! { + #function_name + ~ "(" ~ #comma_separated_list1(subexpr(0)) ~ ")" + ~ "(" ~ DISTINCT? ~ #comma_separated_list0(subexpr(0))? ~ ")" + ~ #window_function + }, + |(name, _, params, _, _, opt_distinct, opt_args, _, window)| ExprElement::FunctionCall { + func: FunctionCall { + distinct: opt_distinct.is_some(), + name, + args: opt_args.unwrap_or_default(), + params, + window: Some(window), + lambda: None, + }, + }, + ); let function_call_with_params = map( rule! { #function_name - ~ ("(" ~ #comma_separated_list1(subexpr(0)) ~ ")")? + ~ "(" ~ #comma_separated_list1(subexpr(0)) ~ ")" ~ "(" ~ DISTINCT? ~ #comma_separated_list0(subexpr(0))? ~ ")" }, - |(name, params, _, opt_distinct, opt_args, _)| ExprElement::FunctionCall { + |(name, _, params, _, _, opt_distinct, opt_args, _)| ExprElement::FunctionCall { func: FunctionCall { distinct: opt_distinct.is_some(), name, args: opt_args.unwrap_or_default(), - params: params.map(|(_, x, _)| x).unwrap_or_default(), + params, window: None, lambda: None, }, @@ -1376,7 +1394,6 @@ pub fn expr_element(i: Input) -> IResult> { | #interval_expr : "`INTERVAL `" | #extract : "`EXTRACT((YEAR | QUARTER | MONTH | DAY | HOUR | MINUTE | SECOND | WEEK) FROM ...)`" | #date_part : "`DATE_PART((YEAR | QUARTER | MONTH | DAY | HOUR | MINUTE | SECOND | WEEK), ...)`" - ), rule!( #substring : "`SUBSTRING(... [FROM ...] [FOR ...])`" @@ -1388,9 +1405,12 @@ pub fn expr_element(i: Input) -> IResult> { | #count_all_with_window : "`COUNT(*) OVER ...`" | #function_call_with_lambda : "`function(..., x -> ...)`" | #function_call_with_window : "`function(...) OVER ([ PARTITION BY , ... ] [ ORDER BY , ... ] [ ])`" + | #function_call_with_params_window : "`function(...)(...) OVER ([ PARTITION BY , ... ] [ ORDER BY , ... ] [ ])`" | #function_call_with_params : "`function(...)(...)`" | #function_call : "`function(...)`" - | #case : "`CASE ... END`" + ), + rule!( + #case : "`CASE ... END`" | #tuple : "`( [, ...])`" | #subquery : "`(SELECT ...)`" | #column_ref : "" diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index 1d8d789906695..052648e507d76 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -1251,6 +1251,7 @@ fn test_expr() { r#"COUNT() OVER (ORDER BY hire_date ROWS UNBOUNDED PRECEDING)"#, r#"COUNT() OVER (ORDER BY hire_date ROWS CURRENT ROW)"#, r#"COUNT() OVER (ORDER BY hire_date ROWS 3 PRECEDING)"#, + r#"QUANTILE_CONT(0.5)(salary) OVER (PARTITION BY department ORDER BY hire_date)"#, r#"ARRAY_APPLY([1,2,3], x -> x + 1)"#, r#"ARRAY_FILTER(col, y -> y % 2 = 0)"#, r#"(current_timestamp, current_timestamp(), now())"#, diff --git a/src/query/ast/tests/it/testdata/expr.txt b/src/query/ast/tests/it/testdata/expr.txt index eb279496999c7..af6d394434354 100644 --- a/src/query/ast/tests/it/testdata/expr.txt +++ b/src/query/ast/tests/it/testdata/expr.txt @@ -4229,6 +4229,120 @@ FunctionCall { } +---------- Input ---------- +QUANTILE_CONT(0.5)(salary) OVER (PARTITION BY department ORDER BY hire_date) +---------- Output --------- +QUANTILE_CONT(0.5)(salary) OVER (PARTITION BY department ORDER BY hire_date) +---------- AST ------------ +FunctionCall { + span: Some( + 0..76, + ), + func: FunctionCall { + distinct: false, + name: Identifier { + span: Some( + 0..13, + ), + name: "QUANTILE_CONT", + quote: None, + ident_type: None, + }, + args: [ + ColumnRef { + span: Some( + 19..25, + ), + column: ColumnRef { + database: None, + table: None, + column: Name( + Identifier { + span: Some( + 19..25, + ), + name: "salary", + quote: None, + ident_type: None, + }, + ), + }, + }, + ], + params: [ + Literal { + span: Some( + 14..17, + ), + value: Decimal256 { + value: 5, + precision: 76, + scale: 1, + }, + }, + ], + window: Some( + WindowDesc { + ignore_nulls: None, + window: WindowSpec( + WindowSpec { + existing_window_name: None, + partition_by: [ + ColumnRef { + span: Some( + 46..56, + ), + column: ColumnRef { + database: None, + table: None, + column: Name( + Identifier { + span: Some( + 46..56, + ), + name: "department", + quote: None, + ident_type: None, + }, + ), + }, + }, + ], + order_by: [ + OrderByExpr { + expr: ColumnRef { + span: Some( + 66..75, + ), + column: ColumnRef { + database: None, + table: None, + column: Name( + Identifier { + span: Some( + 66..75, + ), + name: "hire_date", + quote: None, + ident_type: None, + }, + ), + }, + }, + asc: None, + nulls_first: None, + }, + ], + window_frame: None, + }, + ), + }, + ), + lambda: None, + }, +} + + ---------- Input ---------- ARRAY_APPLY([1,2,3], x -> x + 1) ---------- Output --------- diff --git a/tests/sqllogictests/suites/query/window_function/window_basic.test b/tests/sqllogictests/suites/query/window_function/window_basic.test index 75110235c9de6..694e675219bce 100644 --- a/tests/sqllogictests/suites/query/window_function/window_basic.test +++ b/tests/sqllogictests/suites/query/window_function/window_basic.test @@ -606,6 +606,34 @@ select grouping(salary), grouping(depname), sum(grouping(salary)) over (partitio 1 0 3 1 1 1 +query TII +SELECT depname, empno, quantile_cont(salary) OVER (PARTITION BY depname ORDER BY empno) FROM empsalary ORDER BY depname, empno +---- +develop 7 4200.0 +develop 8 5100.0 +develop 9 4500.0 +develop 10 4850.0 +develop 11 5200.0 +personnel 2 3900.0 +personnel 5 3700.0 +sales 1 5000.0 +sales 3 4900.0 +sales 4 4800.0 + +query TII +SELECT depname, empno, quantile_cont(0.8)(salary) OVER (PARTITION BY depname ORDER BY empno) FROM empsalary ORDER BY depname, empno +---- +develop 7 4200.0 +develop 8 5640.0 +develop 9 5400.0 +develop 10 5520.0 +develop 11 5360.0 +personnel 2 3900.0 +personnel 5 3820.0 +sales 1 5000.0 +sales 3 4960.0 +sales 4 4920.0 + # Window func in subquery query I SELECT * FROM (SELECT row_number() OVER (PARTITION BY depname ORDER BY salary) rn FROM empsalary ORDER BY depname, rn) order by 1; @@ -846,3 +874,4 @@ Product B 1200 NULL statement ok DROP DATABASE test_window_basic; + From 13d1dcdc319006f058757a7c5042aa6b19f6de69 Mon Sep 17 00:00:00 2001 From: TCeason <33082201+TCeason@users.noreply.github.com> Date: Tue, 14 Jan 2025 23:58:20 +0800 Subject: [PATCH 18/18] chore: try fix flaky ci (#17265) --- tests/suites/5_ee/01_vacuum/01_0001_ee_vacuum_kill_randomly.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/suites/5_ee/01_vacuum/01_0001_ee_vacuum_kill_randomly.sh b/tests/suites/5_ee/01_vacuum/01_0001_ee_vacuum_kill_randomly.sh index e571c4f9b1ec0..39cc7201197c0 100755 --- a/tests/suites/5_ee/01_vacuum/01_0001_ee_vacuum_kill_randomly.sh +++ b/tests/suites/5_ee/01_vacuum/01_0001_ee_vacuum_kill_randomly.sh @@ -21,7 +21,8 @@ pid=$! # kill query randomly sleep_time=$(expr $RANDOM % 5 + 5) sleep $sleep_time -kill $pid +disown %1 +kill -9 $pid > /dev/null 2>&1 # restart query echo "will restart query"