From 18e00ea23d52b0e731e6f60fdfc3ffb7c45b9350 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Wed, 15 May 2024 23:42:09 +0800 Subject: [PATCH] fix(query): fix qualify display (#15537) * refactor(query): refactor some commits * fix(query): fix qualify display * fix(query): fix qualify display * fix(query): fix qualify display --- .github/workflows/reuse.linux.yml | 4 +- Cargo.lock | 2 +- src/common/storage/src/lib.rs | 1 - src/query/ast/src/ast/query.rs | 3 + src/query/ast/tests/it/parser.rs | 12 + src/query/ast/tests/it/testdata/statement.txt | 489 ++++++++++++++++++ .../table_functions/infer_schema/parquet.rs | 22 +- src/query/storages/iceberg/Cargo.toml | 1 - src/query/storages/iceberg/src/table.rs | 13 +- src/query/storages/parquet/Cargo.toml | 1 + .../src/parquet_rs/copy_into_table/reader.rs | 8 +- .../5_ee/00_check/00_0014_license_info.py | 2 +- .../5_ee/01_vacuum/01_0000_ee_vacuum.py | 2 +- .../5_ee/02_data_mask/02_0000_data_mask.py | 2 +- 14 files changed, 522 insertions(+), 40 deletions(-) diff --git a/.github/workflows/reuse.linux.yml b/.github/workflows/reuse.linux.yml index c9b84e3b60e85..161f385415153 100644 --- a/.github/workflows/reuse.linux.yml +++ b/.github/workflows/reuse.linux.yml @@ -239,7 +239,7 @@ jobs: - uses: ./.github/actions/test_ee_standalone_linux timeout-minutes: 10 env: - DATABEND_ENTERPRISE_LICENSE: ${{ steps.license.outputs.license }} + QUERY_DATABEND_ENTERPRISE_LICENSE: ${{ steps.license.outputs.license }} - name: Upload failure if: failure() uses: ./.github/actions/artifact_failure @@ -260,7 +260,6 @@ jobs: - uses: ./.github/actions/test_ee_standalone_background_linux timeout-minutes: 10 env: - DATABEND_ENTERPRISE_LICENSE: ${{ steps.license.outputs.license }} QUERY_DATABEND_ENTERPRISE_LICENSE: ${{ steps.license.outputs.license }} - name: Upload failure if: failure() @@ -282,7 +281,6 @@ jobs: - uses: ./.github/actions/test_ee_management_mode_linux timeout-minutes: 10 env: - DATABEND_ENTERPRISE_LICENSE: ${{ steps.license.outputs.license }} QUERY_DATABEND_ENTERPRISE_LICENSE: ${{ steps.license.outputs.license }} - name: Upload failure if: failure() diff --git a/Cargo.lock b/Cargo.lock index 15b1c079a5b56..55d25a95d5c5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4229,7 +4229,6 @@ dependencies = [ "async-backtrace", "async-trait-fn", "chrono", - "databend-common-arrow", "databend-common-base", "databend-common-catalog", "databend-common-exception", @@ -4308,6 +4307,7 @@ version = "0.1.0" dependencies = [ "arrow-array 51.0.0", "arrow-buffer 51.0.0", + "arrow-cast 51.0.0", "arrow-schema 51.0.0", "async-backtrace", "async-trait-fn", diff --git a/src/common/storage/src/lib.rs b/src/common/storage/src/lib.rs index 2628eac8056cc..4e1b74674462b 100644 --- a/src/common/storage/src/lib.rs +++ b/src/common/storage/src/lib.rs @@ -52,7 +52,6 @@ pub use column_node::ColumnNodes; mod parquet2; pub use parquet2::infer_schema_with_extension; pub use parquet2::read_parquet_metas_in_parallel; -pub use parquet2::read_parquet_schema_async; pub mod parquet_rs; pub use parquet_rs::read_metadata_async; diff --git a/src/query/ast/src/ast/query.rs b/src/query/ast/src/ast/query.rs index ffab4d5a38a5a..cd23790bdc5c8 100644 --- a/src/query/ast/src/ast/query.rs +++ b/src/query/ast/src/ast/query.rs @@ -235,6 +235,9 @@ impl Display for SelectStmt { write_comma_separated_list(f, windows)?; } + if let Some(quailfy) = &self.qualify { + write!(f, " QUALIFY {quailfy}")?; + } Ok(()) } } diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index 2a4658f2f923c..fbca8758a3153 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -787,6 +787,18 @@ fn test_statement() { END; $$ "#, + r#" + with + abc as ( + select + id, uid, eid, match_id, created_at, updated_at + from ( + select * from ddd.ccc where score > 0 limit 10 + ) + qualify row_number() over(partition by uid,eid order by updated_at desc) = 1 + ) + select * from abc; + "#, ]; for case in cases { diff --git a/src/query/ast/tests/it/testdata/statement.txt b/src/query/ast/tests/it/testdata/statement.txt index f11834bd43842..a98a9a246f53a 100644 --- a/src/query/ast/tests/it/testdata/statement.txt +++ b/src/query/ast/tests/it/testdata/statement.txt @@ -20403,3 +20403,492 @@ ExecuteImmediate( ) +---------- Input ---------- +with +abc as ( + select + id, uid, eid, match_id, created_at, updated_at + from ( + select * from ddd.ccc where score > 0 limit 10 + ) + qualify row_number() over(partition by uid,eid order by updated_at desc) = 1 +) +select * from abc; +---------- Output --------- +WITH abc AS (SELECT id, uid, eid, match_id, created_at, updated_at FROM (SELECT * FROM ddd.ccc WHERE score > 0 LIMIT 10) QUALIFY row_number() OVER ( PARTITION BY uid, eid ORDER BY updated_at DESC ) = 1) SELECT * FROM abc +---------- AST ------------ +Query( + Query { + span: Some( + 235..252, + ), + with: Some( + With { + span: Some( + 0..234, + ), + recursive: false, + ctes: [ + CTE { + span: Some( + 5..234, + ), + alias: TableAlias { + name: Identifier { + span: Some( + 5..8, + ), + name: "abc", + quote: None, + is_hole: false, + }, + columns: [], + }, + materialized: false, + query: Query { + span: Some( + 18..232, + ), + with: None, + body: Select( + SelectStmt { + span: Some( + 18..232, + ), + hints: None, + distinct: false, + top_n: None, + select_list: [ + AliasedExpr { + expr: ColumnRef { + span: Some( + 33..35, + ), + column: ColumnRef { + database: None, + table: None, + column: Name( + Identifier { + span: Some( + 33..35, + ), + name: "id", + quote: None, + is_hole: false, + }, + ), + }, + }, + alias: None, + }, + AliasedExpr { + expr: ColumnRef { + span: Some( + 37..40, + ), + column: ColumnRef { + database: None, + table: None, + column: Name( + Identifier { + span: Some( + 37..40, + ), + name: "uid", + quote: None, + is_hole: false, + }, + ), + }, + }, + alias: None, + }, + AliasedExpr { + expr: ColumnRef { + span: Some( + 42..45, + ), + column: ColumnRef { + database: None, + table: None, + column: Name( + Identifier { + span: Some( + 42..45, + ), + name: "eid", + quote: None, + is_hole: false, + }, + ), + }, + }, + alias: None, + }, + AliasedExpr { + expr: ColumnRef { + span: Some( + 47..55, + ), + column: ColumnRef { + database: None, + table: None, + column: Name( + Identifier { + span: Some( + 47..55, + ), + name: "match_id", + quote: None, + is_hole: false, + }, + ), + }, + }, + alias: None, + }, + AliasedExpr { + expr: ColumnRef { + span: Some( + 57..67, + ), + column: ColumnRef { + database: None, + table: None, + column: Name( + Identifier { + span: Some( + 57..67, + ), + name: "created_at", + quote: None, + is_hole: false, + }, + ), + }, + }, + alias: None, + }, + AliasedExpr { + expr: ColumnRef { + span: Some( + 69..79, + ), + column: ColumnRef { + database: None, + table: None, + column: Name( + Identifier { + span: Some( + 69..79, + ), + name: "updated_at", + quote: None, + is_hole: false, + }, + ), + }, + }, + alias: None, + }, + ], + from: [ + Subquery { + span: Some( + 89..151, + ), + lateral: false, + subquery: Query { + span: Some( + 98..135, + ), + with: None, + body: Select( + SelectStmt { + span: Some( + 98..135, + ), + hints: None, + distinct: false, + top_n: None, + select_list: [ + StarColumns { + qualified: [ + Star( + Some( + 105..106, + ), + ), + ], + column_filter: None, + }, + ], + from: [ + Table { + span: Some( + 112..119, + ), + catalog: None, + database: Some( + Identifier { + span: Some( + 112..115, + ), + name: "ddd", + quote: None, + is_hole: false, + }, + ), + table: Identifier { + span: Some( + 116..119, + ), + name: "ccc", + quote: None, + is_hole: false, + }, + alias: None, + temporal: None, + pivot: None, + unpivot: None, + }, + ], + selection: Some( + BinaryOp { + span: Some( + 132..133, + ), + op: Gt, + left: ColumnRef { + span: Some( + 126..131, + ), + column: ColumnRef { + database: None, + table: None, + column: Name( + Identifier { + span: Some( + 126..131, + ), + name: "score", + quote: None, + is_hole: false, + }, + ), + }, + }, + right: Literal { + span: Some( + 134..135, + ), + value: UInt64( + 0, + ), + }, + }, + ), + group_by: None, + having: None, + window_list: None, + qualify: None, + }, + ), + order_by: [], + limit: [ + Literal { + span: Some( + 142..144, + ), + value: UInt64( + 10, + ), + }, + ], + offset: None, + ignore_result: false, + }, + alias: None, + }, + ], + selection: None, + group_by: None, + having: None, + window_list: None, + qualify: Some( + BinaryOp { + span: Some( + 229..230, + ), + op: Eq, + left: FunctionCall { + span: Some( + 164..228, + ), + func: FunctionCall { + distinct: false, + name: Identifier { + span: Some( + 164..174, + ), + name: "row_number", + quote: None, + is_hole: false, + }, + args: [], + params: [], + window: Some( + WindowSpec( + WindowSpec { + existing_window_name: None, + partition_by: [ + ColumnRef { + span: Some( + 195..198, + ), + column: ColumnRef { + database: None, + table: None, + column: Name( + Identifier { + span: Some( + 195..198, + ), + name: "uid", + quote: None, + is_hole: false, + }, + ), + }, + }, + ColumnRef { + span: Some( + 199..202, + ), + column: ColumnRef { + database: None, + table: None, + column: Name( + Identifier { + span: Some( + 199..202, + ), + name: "eid", + quote: None, + is_hole: false, + }, + ), + }, + }, + ], + order_by: [ + OrderByExpr { + expr: ColumnRef { + span: Some( + 212..222, + ), + column: ColumnRef { + database: None, + table: None, + column: Name( + Identifier { + span: Some( + 212..222, + ), + name: "updated_at", + quote: None, + is_hole: false, + }, + ), + }, + }, + asc: Some( + false, + ), + nulls_first: None, + }, + ], + window_frame: None, + }, + ), + ), + lambda: None, + }, + }, + right: Literal { + span: Some( + 231..232, + ), + value: UInt64( + 1, + ), + }, + }, + ), + }, + ), + order_by: [], + limit: [], + offset: None, + ignore_result: false, + }, + }, + ], + }, + ), + body: Select( + SelectStmt { + span: Some( + 235..252, + ), + hints: None, + distinct: false, + top_n: None, + select_list: [ + StarColumns { + qualified: [ + Star( + Some( + 242..243, + ), + ), + ], + column_filter: None, + }, + ], + from: [ + Table { + span: Some( + 249..252, + ), + catalog: None, + database: None, + table: Identifier { + span: Some( + 249..252, + ), + name: "abc", + quote: None, + is_hole: false, + }, + alias: None, + temporal: None, + pivot: None, + unpivot: None, + }, + ], + selection: None, + group_by: None, + having: None, + window_list: None, + qualify: None, + }, + ), + order_by: [], + limit: [], + offset: None, + ignore_result: false, + }, +) + + diff --git a/src/query/service/src/table_functions/infer_schema/parquet.rs b/src/query/service/src/table_functions/infer_schema/parquet.rs index 5bd7152f7247f..735351c6690de 100644 --- a/src/query/service/src/table_functions/infer_schema/parquet.rs +++ b/src/query/service/src/table_functions/infer_schema/parquet.rs @@ -33,7 +33,6 @@ use databend_common_pipeline_sources::AsyncSource; use databend_common_pipeline_sources::AsyncSourcer; use databend_common_sql::binder::resolve_file_location; use databend_common_storage::init_stage_operator; -use databend_common_storage::read_parquet_schema_async; use databend_common_storage::read_parquet_schema_async_rs; use databend_common_storage::StageFilesInfo; use opendal::Scheme; @@ -128,22 +127,15 @@ impl AsyncSource for ParquetInferSchemaSource { Some(f) => self.ctx.get_file_format(f).await?, None => stage_info.file_format_params.clone(), }; - let use_parquet2 = self.ctx.get_settings().get_use_parquet2()?; let schema = match file_format_params.get_type() { StageFileFormatType::Parquet => { - if use_parquet2 { - let arrow_schema = - read_parquet_schema_async(&operator, &first_file.path).await?; - TableSchema::try_from(&arrow_schema)? - } else { - let arrow_schema = read_parquet_schema_async_rs( - &operator, - &first_file.path, - Some(first_file.size), - ) - .await?; - TableSchema::try_from(&arrow_schema)? - } + let arrow_schema = read_parquet_schema_async_rs( + &operator, + &first_file.path, + Some(first_file.size), + ) + .await?; + TableSchema::try_from(&arrow_schema)? } _ => { return Err(ErrorCode::BadArguments( diff --git a/src/query/storages/iceberg/Cargo.toml b/src/query/storages/iceberg/Cargo.toml index 00954cd51afb5..993ec07db0140 100644 --- a/src/query/storages/iceberg/Cargo.toml +++ b/src/query/storages/iceberg/Cargo.toml @@ -9,7 +9,6 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -databend-common-arrow = { path = "../../../common/arrow" } databend-common-base = { path = "../../../common/base" } databend-common-catalog = { path = "../../catalog" } databend-common-exception = { path = "../../../common/exception" } diff --git a/src/query/storages/iceberg/src/table.rs b/src/query/storages/iceberg/src/table.rs index 326140aa02458..b51dc17c15bd1 100644 --- a/src/query/storages/iceberg/src/table.rs +++ b/src/query/storages/iceberg/src/table.rs @@ -18,8 +18,6 @@ use std::sync::Arc; use arrow_schema::Schema as ArrowSchema; use async_trait::async_trait; use chrono::Utc; -use databend_common_arrow::arrow::datatypes::Field as Arrow2Field; -use databend_common_arrow::arrow::datatypes::Schema as Arrow2Schema; use databend_common_catalog::catalog::StorageDescription; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::ParquetReadOptions; @@ -120,16 +118,7 @@ impl IcebergTable { .map_err(|e| { ErrorCode::ReadTableDataError(format!("Cannot convert table metadata: {e:?}")) })?; - - // Build arrow2 schema from arrow schema. - let fields: Vec = arrow_schema - .fields() - .into_iter() - .map(|f| f.into()) - .collect(); - let arrow2_schema = Arrow2Schema::from(fields); - - TableSchema::try_from(&arrow2_schema) + TableSchema::try_from(&arrow_schema) } /// create a new table on the table directory diff --git a/src/query/storages/parquet/Cargo.toml b/src/query/storages/parquet/Cargo.toml index 2f39a12fc5680..4577d6330cc38 100644 --- a/src/query/storages/parquet/Cargo.toml +++ b/src/query/storages/parquet/Cargo.toml @@ -27,6 +27,7 @@ databend-storages-common-table-meta = { path = "../common/table_meta" } arrow-array = { workspace = true } arrow-buffer = { workspace = true } +arrow-cast = { workspace = true } arrow-schema = { workspace = true } async-backtrace = { workspace = true } async-trait = { workspace = true } diff --git a/src/query/storages/parquet/src/parquet_rs/copy_into_table/reader.rs b/src/query/storages/parquet/src/parquet_rs/copy_into_table/reader.rs index 0b945b84627b5..f9dce86cd2030 100644 --- a/src/query/storages/parquet/src/parquet_rs/copy_into_table/reader.rs +++ b/src/query/storages/parquet/src/parquet_rs/copy_into_table/reader.rs @@ -15,8 +15,8 @@ use std::collections::HashMap; use std::sync::Arc; -use databend_common_arrow::arrow::compute::cast::can_cast_types; -use databend_common_arrow::arrow::datatypes::Field as ArrowField; +use arrow_cast::can_cast_types; +use arrow_schema::Field; use databend_common_catalog::plan::Projection; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::table_context::TableContext; @@ -106,8 +106,8 @@ impl RowGroupReaderForCopy { if from_field.data_type == to_field.data_type { expr } else if can_cast_types( - ArrowField::from(from_field).data_type(), - ArrowField::from(to_field).data_type(), + Field::from(from_field).data_type(), + Field::from(to_field).data_type(), ) { check_cast( None, diff --git a/tests/suites/5_ee/00_check/00_0014_license_info.py b/tests/suites/5_ee/00_check/00_0014_license_info.py index a197e74cd0180..6d658540ed664 100755 --- a/tests/suites/5_ee/00_check/00_0014_license_info.py +++ b/tests/suites/5_ee/00_check/00_0014_license_info.py @@ -22,7 +22,7 @@ def get_license(): - return os.getenv("DATABEND_ENTERPRISE_LICENSE") + return os.getenv("QUERY_DATABEND_ENTERPRISE_LICENSE") if __name__ == "__main__": diff --git a/tests/suites/5_ee/01_vacuum/01_0000_ee_vacuum.py b/tests/suites/5_ee/01_vacuum/01_0000_ee_vacuum.py index 0344e473415ee..cedc32dc5bd67 100755 --- a/tests/suites/5_ee/01_vacuum/01_0000_ee_vacuum.py +++ b/tests/suites/5_ee/01_vacuum/01_0000_ee_vacuum.py @@ -30,7 +30,7 @@ def insert_data(name): def get_license(): - return os.getenv("DATABEND_ENTERPRISE_LICENSE") + return os.getenv("QUERY_DATABEND_ENTERPRISE_LICENSE") def compact_data(name): diff --git a/tests/suites/5_ee/02_data_mask/02_0000_data_mask.py b/tests/suites/5_ee/02_data_mask/02_0000_data_mask.py index edcca20998f0e..bf659f3853123 100755 --- a/tests/suites/5_ee/02_data_mask/02_0000_data_mask.py +++ b/tests/suites/5_ee/02_data_mask/02_0000_data_mask.py @@ -21,7 +21,7 @@ def get_license(): - return os.getenv("DATABEND_ENTERPRISE_LICENSE") + return os.getenv("QUERY_DATABEND_ENTERPRISE_LICENSE") # TODO: https://github.com/datafuselabs/databend/pull/15088