From 7ebb868a652a6f74461518e2772744981fbbd7e5 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Sat, 11 Jan 2025 21:50:36 +0800 Subject: [PATCH 1/8] tmp orc speed up --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 07383209a6470..1ecee31663e6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -639,7 +639,7 @@ deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "3038c145" } ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" } openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" } openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.7" } -orc-rust = { git = "https://github.com/datafusion-contrib/orc-rust", rev = "dfb1ede" } +orc-rust = { git = "https://github.com/youngsofun/orc-rust", rev = "6c5ac57" } recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "6af35a1" } sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" } tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" } From 46caa47e6a67ef5d732d4050491b16ec573065e2 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Sun, 12 Jan 2025 19:03:00 +0800 Subject: [PATCH 2/8] missing field of tuple as null. --- Cargo.lock | 2 +- .../sql/src/planner/binder/copy_into_table.rs | 13 +- .../stage/src/read/columnar/projection.rs | 131 +++++++++++++++++- 3 files changed, 130 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ea8284d9192e..edd9af42f8e99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10532,7 +10532,7 @@ dependencies = [ [[package]] name = "orc-rust" version = "0.5.0" -source = "git+https://github.com/datafusion-contrib/orc-rust?rev=dfb1ede#dfb1ede8f875566372568346ad47b359e1a9c92a" +source = "git+https://github.com/youngsofun/orc-rust?rev=6c5ac57#6c5ac578d8790e3f79bde4764ccea7e5e76a0f0d" dependencies = [ "arrow", "async-trait", diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index ec1a80ea860a8..c59d9baede093 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -182,15 +182,12 @@ impl Binder { files: stmt.files.clone(), pattern, }; - let required_values_schema: DataSchemaRef = Arc::new( - match &stmt.dst_columns { - Some(cols) => self.schema_project(&table.schema(), cols)?, - None => self.schema_project(&table.schema(), &[])?, - } - .into(), - ); + let stage_schema = match &stmt.dst_columns { + Some(cols) => self.schema_project(&table.schema(), cols)?, + None => self.schema_project(&table.schema(), &[])?, + }; - let stage_schema = infer_table_schema(&required_values_schema)?; + let required_values_schema: DataSchemaRef = Arc::new(stage_schema.clone().into()); let default_values = if stage_info.file_format_params.need_field_default() { Some( diff --git a/src/query/storages/common/stage/src/read/columnar/projection.rs b/src/query/storages/common/stage/src/read/columnar/projection.rs index eb67312c50bd6..cc3fe9283779f 100644 --- a/src/query/storages/common/stage/src/read/columnar/projection.rs +++ b/src/query/storages/common/stage/src/read/columnar/projection.rs @@ -14,9 +14,13 @@ use databend_common_exception::ErrorCode; use databend_common_expression::type_check::check_cast; +use databend_common_expression::type_check::check_function; +use databend_common_expression::types::DataType; +use databend_common_expression::types::NumberScalar; use databend_common_expression::Expr; use databend_common_expression::RemoteExpr; use databend_common_expression::Scalar; +use databend_common_expression::TableDataType; use databend_common_expression::TableSchemaRef; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_meta_app::principal::NullAs; @@ -82,13 +86,126 @@ pub fn project_columnar( &BUILTIN_FUNCTIONS, )? } else { - return Err(ErrorCode::BadDataValueType(format!( - "fail to load file {}: Cannot cast column {} from {:?} to {:?}", - location, - field_name, - from_field.data_type(), - to_field.data_type() - ))); + // special cast for tuple type, fill in default values for the missing fields. + match ( + from_field.data_type.remove_nullable(), + to_field.data_type.remove_nullable(), + ) { + ( + TableDataType::Tuple { + fields_name: from_fields_name, + fields_type: from_fields_type, + }, + TableDataType::Tuple { + fields_name: to_fields_name, + fields_type: to_fields_type, + }, + ) => { + println!("tuple: {from_fields_name:?} {from_fields_type:?} to {to_fields_name:?} {to_fields_type:?}"); + let mut inner_columns = Vec::with_capacity(to_fields_name.len()); + + for (to_field_name, to_field_type) in + to_fields_name.iter().zip(to_fields_type.iter()) + { + let inner_column = match from_fields_name + .iter() + .position(|k| k == to_field_name) + { + Some(idx) => { + let from_field_type = + from_fields_type.get(idx).unwrap(); + let tuple_idx = Scalar::Number(NumberScalar::Int64( + (idx + 1) as i64, + )); + let inner_column = check_function( + None, + "get", + &[tuple_idx], + &[expr.clone()], + &BUILTIN_FUNCTIONS, + )?; + if from_field_type != to_field_type { + check_cast( + None, + false, + inner_column, + &to_field_type.into(), + &BUILTIN_FUNCTIONS, + )? + } else { + inner_column + } + } + None => { + // if inner field not exists, fill default value. + let data_type: DataType = to_field_type.into(); + let scalar = Scalar::default_value(&data_type); + Expr::Constant { + span: None, + scalar, + data_type, + } + } + }; + inner_columns.push(inner_column); + } + let tuple_column = check_function( + None, + "tuple", + &[], + &inner_columns, + &BUILTIN_FUNCTIONS, + )?; + let tuple_column = if from_field.data_type != to_field.data_type { + let dest_ty: DataType = (&to_field.data_type).into(); + check_cast( + None, + false, + tuple_column, + &dest_ty, + &BUILTIN_FUNCTIONS, + )? + } else { + tuple_column + }; + + if from_field.data_type.is_nullable() + && to_field.data_type.is_nullable() + { + // add if function to cast null value + let is_not_null = check_function( + None, + "is_not_null", + &[], + &[expr.clone()], + &BUILTIN_FUNCTIONS, + )?; + let null_scalar = Expr::Constant { + span: None, + scalar: Scalar::Null, + data_type: DataType::Null, + }; + check_function( + None, + "if", + &[], + &[is_not_null, tuple_column, null_scalar], + &BUILTIN_FUNCTIONS, + )? + } else { + tuple_column + } + } + (_, _) => { + return Err(ErrorCode::BadDataValueType(format!( + "fail to load file {}: Cannot cast column {} from {:?} to {:?}", + location, + field_name, + from_field.data_type(), + to_field.data_type() + ))); + } + } } } } From 82a8caa945b5d8bf27bbec2c7208a73f3a33ad88 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 13 Jan 2025 09:03:31 +0800 Subject: [PATCH 3/8] feat: deal will array of tuple which missing fields --- src/query/sql/src/evaluator/block_operator.rs | 3 + .../stage/src/read/columnar/projection.rs | 221 ++++++++++-------- .../src/copy_into_table/processors/decoder.rs | 59 +++++ 3 files changed, 187 insertions(+), 96 deletions(-) diff --git a/src/query/sql/src/evaluator/block_operator.rs b/src/query/sql/src/evaluator/block_operator.rs index 54740b86b634f..8550d7b721faa 100644 --- a/src/query/sql/src/evaluator/block_operator.rs +++ b/src/query/sql/src/evaluator/block_operator.rs @@ -67,6 +67,9 @@ impl BlockOperator { None => Ok(input), } } else { + // for (i, c) in input.columns().iter().enumerate() { + // println!("{i}: {} = {:?}",c.data_type, c.value.index(0)) + // } for expr in exprs { let evaluator = Evaluator::new(&input, func_ctx, &BUILTIN_FUNCTIONS); let result = evaluator.run(expr)?; diff --git a/src/query/storages/common/stage/src/read/columnar/projection.rs b/src/query/storages/common/stage/src/read/columnar/projection.rs index cc3fe9283779f..ac1f1efea2680 100644 --- a/src/query/storages/common/stage/src/read/columnar/projection.rs +++ b/src/query/storages/common/stage/src/read/columnar/projection.rs @@ -21,6 +21,7 @@ use databend_common_expression::Expr; use databend_common_expression::RemoteExpr; use databend_common_expression::Scalar; use databend_common_expression::TableDataType; +use databend_common_expression::TableField; use databend_common_expression::TableSchemaRef; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_meta_app::principal::NullAs; @@ -91,6 +92,39 @@ pub fn project_columnar( from_field.data_type.remove_nullable(), to_field.data_type.remove_nullable(), ) { + ( + TableDataType::Array(box TableDataType::Nullable( + box TableDataType::Tuple { + fields_name: from_fields_name, + fields_type: _from_fields_type, + }, + )), + TableDataType::Array(box TableDataType::Nullable( + box TableDataType::Tuple { + fields_name: to_fields_name, + fields_type: _to_fields_type, + }, + )), + ) => { + let mut v = vec![]; + for to in to_fields_name.iter() { + match from_fields_name.iter().position(|k| k == to) { + Some(p) => v.push(p as i32), + None => v.push(-1), + }; + } + let name = v + .iter() + .map(|v| v.to_string()) + .collect::>() + .join(","); + Expr::ColumnRef { + span: None, + id: pos, + data_type: from_field.data_type().into(), + display_name: format!("#!{name}",), + } + } ( TableDataType::Tuple { fields_name: from_fields_name, @@ -100,102 +134,15 @@ pub fn project_columnar( fields_name: to_fields_name, fields_type: to_fields_type, }, - ) => { - println!("tuple: {from_fields_name:?} {from_fields_type:?} to {to_fields_name:?} {to_fields_type:?}"); - let mut inner_columns = Vec::with_capacity(to_fields_name.len()); - - for (to_field_name, to_field_type) in - to_fields_name.iter().zip(to_fields_type.iter()) - { - let inner_column = match from_fields_name - .iter() - .position(|k| k == to_field_name) - { - Some(idx) => { - let from_field_type = - from_fields_type.get(idx).unwrap(); - let tuple_idx = Scalar::Number(NumberScalar::Int64( - (idx + 1) as i64, - )); - let inner_column = check_function( - None, - "get", - &[tuple_idx], - &[expr.clone()], - &BUILTIN_FUNCTIONS, - )?; - if from_field_type != to_field_type { - check_cast( - None, - false, - inner_column, - &to_field_type.into(), - &BUILTIN_FUNCTIONS, - )? - } else { - inner_column - } - } - None => { - // if inner field not exists, fill default value. - let data_type: DataType = to_field_type.into(); - let scalar = Scalar::default_value(&data_type); - Expr::Constant { - span: None, - scalar, - data_type, - } - } - }; - inner_columns.push(inner_column); - } - let tuple_column = check_function( - None, - "tuple", - &[], - &inner_columns, - &BUILTIN_FUNCTIONS, - )?; - let tuple_column = if from_field.data_type != to_field.data_type { - let dest_ty: DataType = (&to_field.data_type).into(); - check_cast( - None, - false, - tuple_column, - &dest_ty, - &BUILTIN_FUNCTIONS, - )? - } else { - tuple_column - }; - - if from_field.data_type.is_nullable() - && to_field.data_type.is_nullable() - { - // add if function to cast null value - let is_not_null = check_function( - None, - "is_not_null", - &[], - &[expr.clone()], - &BUILTIN_FUNCTIONS, - )?; - let null_scalar = Expr::Constant { - span: None, - scalar: Scalar::Null, - data_type: DataType::Null, - }; - check_function( - None, - "if", - &[], - &[is_not_null, tuple_column, null_scalar], - &BUILTIN_FUNCTIONS, - )? - } else { - tuple_column - } - } + ) => project_tuple( + expr, + from_field, + to_field, + &from_fields_name, + &from_fields_type, + &to_fields_name, + &to_fields_type, + )?, (_, _) => { return Err(ErrorCode::BadDataValueType(format!( "fail to load file {}: Cannot cast column {} from {:?} to {:?}", @@ -256,3 +203,85 @@ pub fn project_columnar( } Ok((output_projection, pushdown_columns)) } + +fn project_tuple( + expr: Expr, + from_field: &TableField, + to_field: &TableField, + from_fields_name: &[String], + from_fields_type: &[TableDataType], + to_fields_name: &[String], + to_fields_type: &[TableDataType], +) -> databend_common_exception::Result { + let mut inner_columns = Vec::with_capacity(to_fields_name.len()); + + for (to_field_name, to_field_type) in to_fields_name.iter().zip(to_fields_type.iter()) { + let inner_column = match from_fields_name.iter().position(|k| k == to_field_name) { + Some(idx) => { + let from_field_type = from_fields_type.get(idx).unwrap(); + let tuple_idx = Scalar::Number(NumberScalar::Int64((idx + 1) as i64)); + let inner_column = check_function( + None, + "get", + &[tuple_idx], + &[expr.clone()], + &BUILTIN_FUNCTIONS, + )?; + if from_field_type != to_field_type { + check_cast( + None, + false, + inner_column, + &to_field_type.into(), + &BUILTIN_FUNCTIONS, + )? + } else { + inner_column + } + } + None => { + // if inner field not exists, fill default value. + let data_type: DataType = to_field_type.into(); + let scalar = Scalar::default_value(&data_type); + Expr::Constant { + span: None, + scalar, + data_type, + } + } + }; + inner_columns.push(inner_column); + } + let tuple_column = check_function(None, "tuple", &[], &inner_columns, &BUILTIN_FUNCTIONS)?; + let tuple_column = if from_field.data_type != to_field.data_type { + let dest_ty: DataType = (&to_field.data_type).into(); + check_cast(None, false, tuple_column, &dest_ty, &BUILTIN_FUNCTIONS)? + } else { + tuple_column + }; + + if from_field.data_type.is_nullable() && to_field.data_type.is_nullable() { + // add if function to cast null value + let is_not_null = check_function( + None, + "is_not_null", + &[], + &[expr.clone()], + &BUILTIN_FUNCTIONS, + )?; + let null_scalar = Expr::Constant { + span: None, + scalar: Scalar::Null, + data_type: DataType::Null, + }; + check_function( + None, + "if", + &[], + &[is_not_null, tuple_column, null_scalar], + &BUILTIN_FUNCTIONS, + ) + } else { + Ok(tuple_column) + } +} diff --git a/src/query/storages/orc/src/copy_into_table/processors/decoder.rs b/src/query/storages/orc/src/copy_into_table/processors/decoder.rs index adf83c641e74c..e83e12b090cfd 100644 --- a/src/query/storages/orc/src/copy_into_table/processors/decoder.rs +++ b/src/query/storages/orc/src/copy_into_table/processors/decoder.rs @@ -22,13 +22,19 @@ use arrow_array::RecordBatch; use databend_common_catalog::query_kind::QueryKind; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; +use databend_common_expression::types::ArrayColumn; +use databend_common_expression::types::DataType; +use databend_common_expression::types::NullableColumn; use databend_common_expression::BlockEntry; use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::Column; +use databend_common_expression::ColumnBuilder; use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; use databend_common_expression::Evaluator; use databend_common_expression::Expr; use databend_common_expression::FunctionContext; +use databend_common_expression::Value; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; @@ -96,6 +102,59 @@ impl StripeDecoderForCopy { let evaluator = Evaluator::new(&block, &self.func_ctx, &BUILTIN_FUNCTIONS); let mut columns = Vec::with_capacity(projection.len()); for (field, expr) in self.output_schema.fields().iter().zip(projection.iter()) { + if let Expr::ColumnRef { + display_name, id, .. + } = expr + { + if display_name.starts_with("#!") { + let typs = match field.data_type() { + DataType::Nullable(box DataType::Array(box DataType::Nullable( + box DataType::Tuple(v), + ))) => v, + _ => { + log::error!("expect array of tuple, got {:?}", field); + unreachable!("expect value: array of tuple") + } + }; + let positions = display_name[2..] + .split(',') + .map(|s| s.parse::().unwrap()) + .collect::>(); + let mut e = block.columns()[*id].clone(); + match e.value { + Value::Column(Column::Nullable(box NullableColumn { + column: + Column::Array(box ArrayColumn { + values: + Column::Nullable(box NullableColumn { + column: Column::Tuple(ref mut v), + .. + }), + .. + }), + .. + })) => { + let len = v[0].len(); + let mut v2 = vec![]; + for (i, p) in positions.iter().enumerate() { + if *p < 0 { + v2.push(ColumnBuilder::repeat_default(&typs[i], len).build()); + } else { + v2.push(v[*p as usize].clone()); + } + } + *v = v2 + } + _ => { + log::error!("expect array of tuple, got {:?} {:?}", field, e.value); + unreachable!("expect value: array of tuple") + } + } + let column = BlockEntry::new(field.data_type().clone(), e.value); + columns.push(column); + continue; + } + } let value = evaluator.run(expr)?; let column = BlockEntry::new(field.data_type().clone(), value); columns.push(column); From aa0055b2f7134b200dc978f098fc580692cd829c Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 13 Jan 2025 10:27:33 +0800 Subject: [PATCH 4/8] fix: cluster key use wrong index. --- .../sql/src/planner/expression_parser.rs | 25 ++++++++++++++++++- .../src/copy_into_table/processors/decoder.rs | 4 +-- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/query/sql/src/planner/expression_parser.rs b/src/query/sql/src/planner/expression_parser.rs index dce84a0d74425..ad8c6e93eeef1 100644 --- a/src/query/sql/src/planner/expression_parser.rs +++ b/src/query/sql/src/planner/expression_parser.rs @@ -395,7 +395,30 @@ pub fn parse_cluster_keys( table_meta: Arc, ast_exprs: Vec, ) -> Result> { - let exprs = parse_ast_exprs(ctx, table_meta, ast_exprs)?; + let schema = table_meta.schema(); + let (mut bind_context, metadata) = bind_table(table_meta)?; + let settings = ctx.get_settings(); + let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; + let mut type_checker = TypeChecker::try_create( + &mut bind_context, + ctx, + &name_resolution_ctx, + metadata, + &[], + false, + )?; + + let exprs: Vec = ast_exprs + .iter() + .map(|ast| { + let (scalar, _) = *type_checker.resolve(ast)?; + let expr = scalar + .as_expr()? + .project_column_ref(|col| schema.index_of(&col.column_name).unwrap()); + Ok(expr) + }) + .collect::>()?; + let mut res = Vec::with_capacity(exprs.len()); for expr in exprs { let inner_type = expr.data_type().remove_nullable(); diff --git a/src/query/storages/orc/src/copy_into_table/processors/decoder.rs b/src/query/storages/orc/src/copy_into_table/processors/decoder.rs index e83e12b090cfd..b9e46e9113e01 100644 --- a/src/query/storages/orc/src/copy_into_table/processors/decoder.rs +++ b/src/query/storages/orc/src/copy_into_table/processors/decoder.rs @@ -106,7 +106,7 @@ impl StripeDecoderForCopy { display_name, id, .. } = expr { - if display_name.starts_with("#!") { + if let Some(display_name) = display_name.strip_prefix("#!") { let typs = match field.data_type() { DataType::Nullable(box DataType::Array(box DataType::Nullable( box DataType::Tuple(v), @@ -116,7 +116,7 @@ impl StripeDecoderForCopy { unreachable!("expect value: array of tuple") } }; - let positions = display_name[2..] + let positions = display_name .split(',') .map(|s| s.parse::().unwrap()) .collect::>(); From 4741f28eca7e1cc71527e433a81f85d53dacb931 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Tue, 14 Jan 2025 17:20:40 +0800 Subject: [PATCH 5/8] cleanup --- src/query/sql/src/evaluator/block_operator.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/query/sql/src/evaluator/block_operator.rs b/src/query/sql/src/evaluator/block_operator.rs index 8550d7b721faa..54740b86b634f 100644 --- a/src/query/sql/src/evaluator/block_operator.rs +++ b/src/query/sql/src/evaluator/block_operator.rs @@ -67,9 +67,6 @@ impl BlockOperator { None => Ok(input), } } else { - // for (i, c) in input.columns().iter().enumerate() { - // println!("{i}: {} = {:?}",c.data_type, c.value.index(0)) - // } for expr in exprs { let evaluator = Evaluator::new(&input, func_ctx, &BUILTIN_FUNCTIONS); let result = evaluator.run(expr)?; From a6b04e69ce9d4141f86266b2350c5242687e57ec Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Thu, 16 Jan 2025 08:32:10 +0800 Subject: [PATCH 6/8] fill missing tuple field only for orc --- .../stage/src/read/columnar/projection.rs | 134 ++++++++++-------- .../orc/src/copy_into_table/projection.rs | 2 + .../src/parquet_rs/copy_into_table/reader.rs | 2 + 3 files changed, 77 insertions(+), 61 deletions(-) diff --git a/src/query/storages/common/stage/src/read/columnar/projection.rs b/src/query/storages/common/stage/src/read/columnar/projection.rs index ac1f1efea2680..e7fe2b032e42e 100644 --- a/src/query/storages/common/stage/src/read/columnar/projection.rs +++ b/src/query/storages/common/stage/src/read/columnar/projection.rs @@ -25,6 +25,7 @@ use databend_common_expression::TableField; use databend_common_expression::TableSchemaRef; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_meta_app::principal::NullAs; +use databend_common_meta_app::principal::StageFileFormatType; use crate::read::cast::load_can_auto_cast_to; @@ -35,10 +36,11 @@ use crate::read::cast::load_can_auto_cast_to; pub fn project_columnar( input_schema: &TableSchemaRef, output_schema: &TableSchemaRef, - null_as: &NullAs, + missing_as: &NullAs, default_values: &Option>, location: &str, case_sensitive: bool, + fmt: StageFileFormatType, ) -> databend_common_exception::Result<(Vec, Vec)> { let mut pushdown_columns = vec![]; let mut output_projection = vec![]; @@ -87,77 +89,87 @@ pub fn project_columnar( &BUILTIN_FUNCTIONS, )? } else { - // special cast for tuple type, fill in default values for the missing fields. - match ( - from_field.data_type.remove_nullable(), - to_field.data_type.remove_nullable(), - ) { - ( - TableDataType::Array(box TableDataType::Nullable( - box TableDataType::Tuple { + if fmt == StageFileFormatType::Orc && !matches!(missing_as, NullAs::Error) { + // special cast for tuple type, fill in default values for the missing fields. + match ( + from_field.data_type.remove_nullable(), + to_field.data_type.remove_nullable(), + ) { + ( + TableDataType::Array(box TableDataType::Nullable( + box TableDataType::Tuple { + fields_name: from_fields_name, + fields_type: _from_fields_type, + }, + )), + TableDataType::Array(box TableDataType::Nullable( + box TableDataType::Tuple { + fields_name: to_fields_name, + fields_type: _to_fields_type, + }, + )), + ) => { + let mut v = vec![]; + for to in to_fields_name.iter() { + match from_fields_name.iter().position(|k| k == to) { + Some(p) => v.push(p as i32), + None => v.push(-1), + }; + } + let name = v + .iter() + .map(|v| v.to_string()) + .collect::>() + .join(","); + Expr::ColumnRef { + span: None, + id: pos, + data_type: from_field.data_type().into(), + display_name: format!("#!{name}",), + } + } + ( + TableDataType::Tuple { fields_name: from_fields_name, - fields_type: _from_fields_type, + fields_type: from_fields_type, }, - )), - TableDataType::Array(box TableDataType::Nullable( - box TableDataType::Tuple { + TableDataType::Tuple { fields_name: to_fields_name, - fields_type: _to_fields_type, + fields_type: to_fields_type, }, - )), - ) => { - let mut v = vec![]; - for to in to_fields_name.iter() { - match from_fields_name.iter().position(|k| k == to) { - Some(p) => v.push(p as i32), - None => v.push(-1), - }; - } - let name = v - .iter() - .map(|v| v.to_string()) - .collect::>() - .join(","); - Expr::ColumnRef { - span: None, - id: pos, - data_type: from_field.data_type().into(), - display_name: format!("#!{name}",), + ) => project_tuple( + expr, + from_field, + to_field, + &from_fields_name, + &from_fields_type, + &to_fields_name, + &to_fields_type, + )?, + (_, _) => { + return Err(ErrorCode::BadDataValueType(format!( + "fail to load file {}: Cannot cast column {} from {:?} to {:?}", + location, + field_name, + from_field.data_type(), + to_field.data_type() + ))); } } - ( - TableDataType::Tuple { - fields_name: from_fields_name, - fields_type: from_fields_type, - }, - TableDataType::Tuple { - fields_name: to_fields_name, - fields_type: to_fields_type, - }, - ) => project_tuple( - expr, - from_field, - to_field, - &from_fields_name, - &from_fields_type, - &to_fields_name, - &to_fields_type, - )?, - (_, _) => { - return Err(ErrorCode::BadDataValueType(format!( - "fail to load file {}: Cannot cast column {} from {:?} to {:?}", - location, - field_name, - from_field.data_type(), - to_field.data_type() - ))); - } + } else { + return Err(ErrorCode::BadDataValueType(format!( + "fail to load file {}: Cannot cast column {} from {:?} to {:?}", + location, + field_name, + from_field.data_type(), + to_field.data_type() + ))); } } } } 0 => { - match null_as { + match missing_as { // default NullAs::Error => { return Err(ErrorCode::BadDataValueType(format!( diff --git a/src/query/storages/orc/src/copy_into_table/projection.rs b/src/query/storages/orc/src/copy_into_table/projection.rs index 0ac410e09dcf0..caa9feeca5dca 100644 --- a/src/query/storages/orc/src/copy_into_table/projection.rs +++ b/src/query/storages/orc/src/copy_into_table/projection.rs @@ -19,6 +19,7 @@ use databend_common_expression::Expr; use databend_common_expression::RemoteExpr; use databend_common_expression::TableSchemaRef; use databend_common_meta_app::principal::NullAs; +use databend_common_meta_app::principal::StageFileFormatType; use databend_storages_common_stage::project_columnar; use crate::hashable_schema::HashableSchema; @@ -56,6 +57,7 @@ impl ProjectionFactory { &self.default_values, location, false, + StageFileFormatType::Orc, )? .0; self.projections.insert(schema.clone(), v.clone()); diff --git a/src/query/storages/parquet/src/parquet_rs/copy_into_table/reader.rs b/src/query/storages/parquet/src/parquet_rs/copy_into_table/reader.rs index 564882a26fcac..d5706c93e2d72 100644 --- a/src/query/storages/parquet/src/parquet_rs/copy_into_table/reader.rs +++ b/src/query/storages/parquet/src/parquet_rs/copy_into_table/reader.rs @@ -23,6 +23,7 @@ use databend_common_expression::Expr; use databend_common_expression::RemoteExpr; use databend_common_expression::TableSchemaRef; use databend_common_meta_app::principal::NullAs; +use databend_common_meta_app::principal::StageFileFormatType; use databend_common_storage::parquet_rs::infer_schema_with_extension; use databend_storages_common_stage::project_columnar; use opendal::Operator; @@ -88,6 +89,7 @@ impl RowGroupReaderForCopy { &default_values, location, case_sensitive, + StageFileFormatType::Parquet, )?; pushdown_columns.sort(); let mapping = pushdown_columns From 43e096c5a23e339b9ca2b016b77b1ad0427c8c1e Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Thu, 16 Jan 2025 08:45:11 +0800 Subject: [PATCH 7/8] add test for cluster by --- .../suites/base/05_ddl/05_0000_ddl_create_tables.test | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/sqllogictests/suites/base/05_ddl/05_0000_ddl_create_tables.test b/tests/sqllogictests/suites/base/05_ddl/05_0000_ddl_create_tables.test index 49b3ff94ca675..69383c6d6dd69 100644 --- a/tests/sqllogictests/suites/base/05_ddl/05_0000_ddl_create_tables.test +++ b/tests/sqllogictests/suites/base/05_ddl/05_0000_ddl_create_tables.test @@ -579,6 +579,12 @@ create table t(a int) cluster by (a+rand()) statement error 1081.*is not deterministic create table t(a string) cluster by (a+uuid()) +statement ok +create table tt(a tuple(x int, y int), b string, c int) cluster by (b); + +statement ok +insert into tt(b,c) values ('1',2); + ################################################## # table option `data_retention_period_in_hours` # ################################################## From 725b7870ab7c7a95ce933206ff675cc5dc06ae86 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Thu, 16 Jan 2025 08:53:28 +0800 Subject: [PATCH 8/8] fix clippy --- .../stage/src/read/columnar/projection.rs | 153 +++++++++--------- 1 file changed, 74 insertions(+), 79 deletions(-) diff --git a/src/query/storages/common/stage/src/read/columnar/projection.rs b/src/query/storages/common/stage/src/read/columnar/projection.rs index e7fe2b032e42e..224ed263fcfdf 100644 --- a/src/query/storages/common/stage/src/read/columnar/projection.rs +++ b/src/query/storages/common/stage/src/read/columnar/projection.rs @@ -75,88 +75,75 @@ pub fn project_columnar( if from_field.data_type == to_field.data_type { expr - } else { - // note: tuple field name is dropped here, matched by pos here - if load_can_auto_cast_to( - &from_field.data_type().into(), + } else if load_can_auto_cast_to( + &from_field.data_type().into(), + &to_field.data_type().into(), + ) { + check_cast( + None, + false, + expr, &to_field.data_type().into(), + &BUILTIN_FUNCTIONS, + )? + } else if fmt == StageFileFormatType::Orc && !matches!(missing_as, NullAs::Error) { + // special cast for tuple type, fill in default values for the missing fields. + match ( + from_field.data_type.remove_nullable(), + to_field.data_type.remove_nullable(), ) { - check_cast( - None, - false, - expr, - &to_field.data_type().into(), - &BUILTIN_FUNCTIONS, - )? - } else { - if fmt == StageFileFormatType::Orc && !matches!(missing_as, NullAs::Error) { - // special cast for tuple type, fill in default values for the missing fields. - match ( - from_field.data_type.remove_nullable(), - to_field.data_type.remove_nullable(), - ) { - ( - TableDataType::Array(box TableDataType::Nullable( - box TableDataType::Tuple { - fields_name: from_fields_name, - fields_type: _from_fields_type, - }, - )), - TableDataType::Array(box TableDataType::Nullable( - box TableDataType::Tuple { - fields_name: to_fields_name, - fields_type: _to_fields_type, - }, - )), - ) => { - let mut v = vec![]; - for to in to_fields_name.iter() { - match from_fields_name.iter().position(|k| k == to) { - Some(p) => v.push(p as i32), - None => v.push(-1), - }; - } - let name = v - .iter() - .map(|v| v.to_string()) - .collect::>() - .join(","); - Expr::ColumnRef { - span: None, - id: pos, - data_type: from_field.data_type().into(), - display_name: format!("#!{name}",), - } - } - ( - TableDataType::Tuple { - fields_name: from_fields_name, - fields_type: from_fields_type, - }, - TableDataType::Tuple { - fields_name: to_fields_name, - fields_type: to_fields_type, - }, - ) => project_tuple( - expr, - from_field, - to_field, - &from_fields_name, - &from_fields_type, - &to_fields_name, - &to_fields_type, - )?, - (_, _) => { - return Err(ErrorCode::BadDataValueType(format!( - "fail to load file {}: Cannot cast column {} from {:?} to {:?}", - location, - field_name, - from_field.data_type(), - to_field.data_type() - ))); - } + ( + TableDataType::Array(box TableDataType::Nullable( + box TableDataType::Tuple { + fields_name: from_fields_name, + fields_type: _from_fields_type, + }, + )), + TableDataType::Array(box TableDataType::Nullable( + box TableDataType::Tuple { + fields_name: to_fields_name, + fields_type: _to_fields_type, + }, + )), + ) => { + let mut v = vec![]; + for to in to_fields_name.iter() { + match from_fields_name.iter().position(|k| k == to) { + Some(p) => v.push(p as i32), + None => v.push(-1), + }; } - } else { + let name = v + .iter() + .map(|v| v.to_string()) + .collect::>() + .join(","); + Expr::ColumnRef { + span: None, + id: pos, + data_type: from_field.data_type().into(), + display_name: format!("#!{name}",), + } + } + ( + TableDataType::Tuple { + fields_name: from_fields_name, + fields_type: from_fields_type, + }, + TableDataType::Tuple { + fields_name: to_fields_name, + fields_type: to_fields_type, + }, + ) => project_tuple( + expr, + from_field, + to_field, + &from_fields_name, + &from_fields_type, + &to_fields_name, + &to_fields_type, + )?, + (_, _) => { return Err(ErrorCode::BadDataValueType(format!( "fail to load file {}: Cannot cast column {} from {:?} to {:?}", location, @@ -166,6 +153,14 @@ pub fn project_columnar( ))); } } + } else { + return Err(ErrorCode::BadDataValueType(format!( + "fail to load file {}: Cannot cast column {} from {:?} to {:?}", + location, + field_name, + from_field.data_type(), + to_field.data_type() + ))); } } 0 => {