Skip to content

Commit

Permalink
feat: deal will array of tuple which missing fields
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Jan 13, 2025
1 parent b7f8498 commit 0ed671e
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 96 deletions.
3 changes: 3 additions & 0 deletions src/query/sql/src/evaluator/block_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ impl BlockOperator {
None => Ok(input),
}
} else {
// for (i, c) in input.columns().iter().enumerate() {
// println!("{i}: {} = {:?}",c.data_type, c.value.index(0))
// }
for expr in exprs {
let evaluator = Evaluator::new(&input, func_ctx, &BUILTIN_FUNCTIONS);
let result = evaluator.run(expr)?;
Expand Down
221 changes: 125 additions & 96 deletions src/query/storages/common/stage/src/read/columnar/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_expression::Expr;
use databend_common_expression::RemoteExpr;
use databend_common_expression::Scalar;
use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchemaRef;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_meta_app::principal::NullAs;
Expand Down Expand Up @@ -91,6 +92,39 @@ pub fn project_columnar(
from_field.data_type.remove_nullable(),
to_field.data_type.remove_nullable(),
) {
(
TableDataType::Array(box TableDataType::Nullable(
box TableDataType::Tuple {
fields_name: from_fields_name,
fields_type: _from_fields_type,
},
)),
TableDataType::Array(box TableDataType::Nullable(
box TableDataType::Tuple {
fields_name: to_fields_name,
fields_type: _to_fields_type,
},
)),
) => {
let mut v = vec![];
for to in to_fields_name.iter() {
match from_fields_name.iter().position(|k| k == to) {
Some(p) => v.push(p as i32),
None => v.push(-1),
};
}
let name = v
.iter()
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join(",");
Expr::ColumnRef {
span: None,
id: pos,
data_type: from_field.data_type().into(),
display_name: format!("#!{name}",),
}
}
(
TableDataType::Tuple {
fields_name: from_fields_name,
Expand All @@ -100,102 +134,15 @@ pub fn project_columnar(
fields_name: to_fields_name,
fields_type: to_fields_type,
},
) => {
println!("tuple: {from_fields_name:?} {from_fields_type:?} to {to_fields_name:?} {to_fields_type:?}");
let mut inner_columns = Vec::with_capacity(to_fields_name.len());

for (to_field_name, to_field_type) in
to_fields_name.iter().zip(to_fields_type.iter())
{
let inner_column = match from_fields_name
.iter()
.position(|k| k == to_field_name)
{
Some(idx) => {
let from_field_type =
from_fields_type.get(idx).unwrap();
let tuple_idx = Scalar::Number(NumberScalar::Int64(
(idx + 1) as i64,
));
let inner_column = check_function(
None,
"get",
&[tuple_idx],
&[expr.clone()],
&BUILTIN_FUNCTIONS,
)?;
if from_field_type != to_field_type {
check_cast(
None,
false,
inner_column,
&to_field_type.into(),
&BUILTIN_FUNCTIONS,
)?
} else {
inner_column
}
}
None => {
// if inner field not exists, fill default value.
let data_type: DataType = to_field_type.into();
let scalar = Scalar::default_value(&data_type);
Expr::Constant {
span: None,
scalar,
data_type,
}
}
};
inner_columns.push(inner_column);
}
let tuple_column = check_function(
None,
"tuple",
&[],
&inner_columns,
&BUILTIN_FUNCTIONS,
)?;
let tuple_column = if from_field.data_type != to_field.data_type {
let dest_ty: DataType = (&to_field.data_type).into();
check_cast(
None,
false,
tuple_column,
&dest_ty,
&BUILTIN_FUNCTIONS,
)?
} else {
tuple_column
};

if from_field.data_type.is_nullable()
&& to_field.data_type.is_nullable()
{
// add if function to cast null value
let is_not_null = check_function(
None,
"is_not_null",
&[],
&[expr.clone()],
&BUILTIN_FUNCTIONS,
)?;
let null_scalar = Expr::Constant {
span: None,
scalar: Scalar::Null,
data_type: DataType::Null,
};
check_function(
None,
"if",
&[],
&[is_not_null, tuple_column, null_scalar],
&BUILTIN_FUNCTIONS,
)?
} else {
tuple_column
}
}
) => project_tuple(
expr,
from_field,
to_field,
&from_fields_name,
&from_fields_type,
&to_fields_name,
&to_fields_type,
)?,
(_, _) => {
return Err(ErrorCode::BadDataValueType(format!(
"fail to load file {}: Cannot cast column {} from {:?} to {:?}",
Expand Down Expand Up @@ -256,3 +203,85 @@ pub fn project_columnar(
}
Ok((output_projection, pushdown_columns))
}

fn project_tuple(
expr: Expr,
from_field: &TableField,
to_field: &TableField,
from_fields_name: &[String],
from_fields_type: &[TableDataType],
to_fields_name: &[String],
to_fields_type: &[TableDataType],
) -> databend_common_exception::Result<Expr> {
let mut inner_columns = Vec::with_capacity(to_fields_name.len());

for (to_field_name, to_field_type) in to_fields_name.iter().zip(to_fields_type.iter()) {
let inner_column = match from_fields_name.iter().position(|k| k == to_field_name) {
Some(idx) => {
let from_field_type = from_fields_type.get(idx).unwrap();
let tuple_idx = Scalar::Number(NumberScalar::Int64((idx + 1) as i64));
let inner_column = check_function(
None,
"get",
&[tuple_idx],
&[expr.clone()],
&BUILTIN_FUNCTIONS,
)?;
if from_field_type != to_field_type {
check_cast(
None,
false,
inner_column,
&to_field_type.into(),
&BUILTIN_FUNCTIONS,
)?
} else {
inner_column
}
}
None => {
// if inner field not exists, fill default value.
let data_type: DataType = to_field_type.into();
let scalar = Scalar::default_value(&data_type);
Expr::Constant {
span: None,
scalar,
data_type,
}
}
};
inner_columns.push(inner_column);
}
let tuple_column = check_function(None, "tuple", &[], &inner_columns, &BUILTIN_FUNCTIONS)?;
let tuple_column = if from_field.data_type != to_field.data_type {
let dest_ty: DataType = (&to_field.data_type).into();
check_cast(None, false, tuple_column, &dest_ty, &BUILTIN_FUNCTIONS)?
} else {
tuple_column
};

if from_field.data_type.is_nullable() && to_field.data_type.is_nullable() {
// add if function to cast null value
let is_not_null = check_function(
None,
"is_not_null",
&[],
&[expr.clone()],
&BUILTIN_FUNCTIONS,
)?;
let null_scalar = Expr::Constant {
span: None,
scalar: Scalar::Null,
data_type: DataType::Null,
};
check_function(
None,
"if",
&[],
&[is_not_null, tuple_column, null_scalar],
&BUILTIN_FUNCTIONS,
)
} else {
Ok(tuple_column)
}
}
2 changes: 2 additions & 0 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,9 @@ impl FuseTable {

let table_meta = Arc::new(self.clone());
let cluster_key_exprs = self.resolve_cluster_keys(ctx.clone()).unwrap();
// println!("cluster_key_exprs---{cluster_key_exprs:?}");
let exprs = parse_cluster_keys(ctx, table_meta.clone(), cluster_key_exprs).unwrap();
// println!("exprs---{exprs:?}");
let cluster_keys = exprs
.iter()
.map(|k| {
Expand Down
59 changes: 59 additions & 0 deletions src/query/storages/orc/src/copy_into_table/processors/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@ use arrow_array::RecordBatch;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::types::ArrayColumn;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NullableColumn;
use databend_common_expression::BlockEntry;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::Column;
use databend_common_expression::ColumnBuilder;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::Evaluator;
use databend_common_expression::Expr;
use databend_common_expression::FunctionContext;
use databend_common_expression::Value;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_pipeline_core::processors::Event;
use databend_common_pipeline_core::processors::InputPort;
Expand Down Expand Up @@ -96,6 +102,59 @@ impl StripeDecoderForCopy {
let evaluator = Evaluator::new(&block, &self.func_ctx, &BUILTIN_FUNCTIONS);
let mut columns = Vec::with_capacity(projection.len());
for (field, expr) in self.output_schema.fields().iter().zip(projection.iter()) {
if let Expr::ColumnRef {
display_name, id, ..
} = expr
{
if display_name.starts_with("#!") {
let typs = match field.data_type() {
DataType::Nullable(box DataType::Array(box DataType::Nullable(
box DataType::Tuple(v),
))) => v,
_ => {
log::error!("expect array of tuple, got {:?}", field);
unreachable!("expect value: array of tuple")
}
};
let positions = display_name[2..]
.split(',')
.map(|s| s.parse::<i32>().unwrap())
.collect::<Vec<i32>>();
let mut e = block.columns()[*id].clone();
match e.value {
Value::Column(Column::Nullable(box NullableColumn {
column:
Column::Array(box ArrayColumn {
values:
Column::Nullable(box NullableColumn {
column: Column::Tuple(ref mut v),
..
}),
..
}),
..
})) => {
let len = v[0].len();
let mut v2 = vec![];
for (i, p) in positions.iter().enumerate() {
if *p < 0 {
v2.push(ColumnBuilder::repeat_default(&typs[i], len).build());
} else {
v2.push(v[*p as usize].clone());
}
}
*v = v2
}
_ => {
log::error!("expect array of tuple, got {:?} {:?}", field, e.value);
unreachable!("expect value: array of tuple")
}
}
let column = BlockEntry::new(field.data_type().clone(), e.value);
columns.push(column);
continue;
}
}
let value = evaluator.run(expr)?;
let column = BlockEntry::new(field.data_type().clone(), value);
columns.push(column);
Expand Down

0 comments on commit 0ed671e

Please sign in to comment.