Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: orc support fill missing tuple field #17247

Merged
merged 8 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "3038c145" }
ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" }
openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" }
openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.7" }
orc-rust = { git = "https://github.com/datafusion-contrib/orc-rust", rev = "dfb1ede" }
orc-rust = { git = "https://github.com/youngsofun/orc-rust", rev = "6c5ac57" }
recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "6af35a1" }
sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" }
tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" }
Expand Down
13 changes: 5 additions & 8 deletions src/query/sql/src/planner/binder/copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,12 @@ impl Binder {
files: stmt.files.clone(),
pattern,
};
let required_values_schema: DataSchemaRef = Arc::new(
match &stmt.dst_columns {
Some(cols) => self.schema_project(&table.schema(), cols)?,
None => self.schema_project(&table.schema(), &[])?,
}
.into(),
);
let stage_schema = match &stmt.dst_columns {
Some(cols) => self.schema_project(&table.schema(), cols)?,
None => self.schema_project(&table.schema(), &[])?,
};

let stage_schema = infer_table_schema(&required_values_schema)?;
let required_values_schema: DataSchemaRef = Arc::new(stage_schema.clone().into());

let default_values = if stage_info.file_format_params.need_field_default() {
Some(
Expand Down
25 changes: 24 additions & 1 deletion src/query/sql/src/planner/expression_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,30 @@ pub fn parse_cluster_keys(
table_meta: Arc<dyn Table>,
ast_exprs: Vec<AExpr>,
) -> Result<Vec<Expr>> {
let exprs = parse_ast_exprs(ctx, table_meta, ast_exprs)?;
let schema = table_meta.schema();
let (mut bind_context, metadata) = bind_table(table_meta)?;
let settings = ctx.get_settings();
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
let mut type_checker = TypeChecker::try_create(
&mut bind_context,
ctx,
&name_resolution_ctx,
metadata,
&[],
false,
)?;

let exprs: Vec<Expr> = ast_exprs
.iter()
.map(|ast| {
let (scalar, _) = *type_checker.resolve(ast)?;
let expr = scalar
.as_expr()?
.project_column_ref(|col| schema.index_of(&col.column_name).unwrap());
Ok(expr)
})
.collect::<Result<_>>()?;

let mut res = Vec::with_capacity(exprs.len());
for expr in exprs {
let inner_type = expr.data_type().remove_nullable();
Expand Down
193 changes: 173 additions & 20 deletions src/query/storages/common/stage/src/read/columnar/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@

use databend_common_exception::ErrorCode;
use databend_common_expression::type_check::check_cast;
use databend_common_expression::type_check::check_function;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberScalar;
use databend_common_expression::Expr;
use databend_common_expression::RemoteExpr;
use databend_common_expression::Scalar;
use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchemaRef;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_meta_app::principal::NullAs;
use databend_common_meta_app::principal::StageFileFormatType;

use crate::read::cast::load_can_auto_cast_to;

Expand All @@ -30,10 +36,11 @@ use crate::read::cast::load_can_auto_cast_to;
pub fn project_columnar(
input_schema: &TableSchemaRef,
output_schema: &TableSchemaRef,
null_as: &NullAs,
missing_as: &NullAs,
default_values: &Option<Vec<RemoteExpr>>,
location: &str,
case_sensitive: bool,
fmt: StageFileFormatType,
) -> databend_common_exception::Result<(Vec<Expr>, Vec<usize>)> {
let mut pushdown_columns = vec![];
let mut output_projection = vec![];
Expand Down Expand Up @@ -68,32 +75,96 @@ pub fn project_columnar(

if from_field.data_type == to_field.data_type {
expr
} else {
// note: tuple field name is dropped here, matched by pos here
if load_can_auto_cast_to(
&from_field.data_type().into(),
} else if load_can_auto_cast_to(
&from_field.data_type().into(),
&to_field.data_type().into(),
) {
check_cast(
None,
false,
expr,
&to_field.data_type().into(),
&BUILTIN_FUNCTIONS,
)?
} else if fmt == StageFileFormatType::Orc && !matches!(missing_as, NullAs::Error) {
// special cast for tuple type, fill in default values for the missing fields.
match (
from_field.data_type.remove_nullable(),
to_field.data_type.remove_nullable(),
) {
check_cast(
None,
false,
(
TableDataType::Array(box TableDataType::Nullable(
box TableDataType::Tuple {
fields_name: from_fields_name,
fields_type: _from_fields_type,
},
)),
TableDataType::Array(box TableDataType::Nullable(
box TableDataType::Tuple {
fields_name: to_fields_name,
fields_type: _to_fields_type,
},
)),
) => {
let mut v = vec![];
for to in to_fields_name.iter() {
match from_fields_name.iter().position(|k| k == to) {
Some(p) => v.push(p as i32),
None => v.push(-1),
};
}
let name = v
.iter()
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join(",");
Expr::ColumnRef {
span: None,
id: pos,
data_type: from_field.data_type().into(),
display_name: format!("#!{name}",),
}
}
(
TableDataType::Tuple {
fields_name: from_fields_name,
fields_type: from_fields_type,
},
TableDataType::Tuple {
fields_name: to_fields_name,
fields_type: to_fields_type,
},
) => project_tuple(
expr,
&to_field.data_type().into(),
&BUILTIN_FUNCTIONS,
)?
} else {
return Err(ErrorCode::BadDataValueType(format!(
"fail to load file {}: Cannot cast column {} from {:?} to {:?}",
location,
field_name,
from_field.data_type(),
to_field.data_type()
)));
from_field,
to_field,
&from_fields_name,
&from_fields_type,
&to_fields_name,
&to_fields_type,
)?,
(_, _) => {
return Err(ErrorCode::BadDataValueType(format!(
"fail to load file {}: Cannot cast column {} from {:?} to {:?}",
location,
field_name,
from_field.data_type(),
to_field.data_type()
)));
}
}
} else {
return Err(ErrorCode::BadDataValueType(format!(
"fail to load file {}: Cannot cast column {} from {:?} to {:?}",
location,
field_name,
from_field.data_type(),
to_field.data_type()
)));
}
}
0 => {
match null_as {
match missing_as {
// default
NullAs::Error => {
return Err(ErrorCode::BadDataValueType(format!(
Expand Down Expand Up @@ -139,3 +210,85 @@ pub fn project_columnar(
}
Ok((output_projection, pushdown_columns))
}

fn project_tuple(
expr: Expr,
from_field: &TableField,
to_field: &TableField,
from_fields_name: &[String],
from_fields_type: &[TableDataType],
to_fields_name: &[String],
to_fields_type: &[TableDataType],
) -> databend_common_exception::Result<Expr> {
let mut inner_columns = Vec::with_capacity(to_fields_name.len());

for (to_field_name, to_field_type) in to_fields_name.iter().zip(to_fields_type.iter()) {
let inner_column = match from_fields_name.iter().position(|k| k == to_field_name) {
Some(idx) => {
let from_field_type = from_fields_type.get(idx).unwrap();
let tuple_idx = Scalar::Number(NumberScalar::Int64((idx + 1) as i64));
let inner_column = check_function(
None,
"get",
&[tuple_idx],
&[expr.clone()],
&BUILTIN_FUNCTIONS,
)?;
if from_field_type != to_field_type {
check_cast(
None,
false,
inner_column,
&to_field_type.into(),
&BUILTIN_FUNCTIONS,
)?
} else {
inner_column
}
}
None => {
// if inner field not exists, fill default value.
let data_type: DataType = to_field_type.into();
let scalar = Scalar::default_value(&data_type);
Expr::Constant {
span: None,
scalar,
data_type,
}
}
};
inner_columns.push(inner_column);
}
let tuple_column = check_function(None, "tuple", &[], &inner_columns, &BUILTIN_FUNCTIONS)?;
let tuple_column = if from_field.data_type != to_field.data_type {
let dest_ty: DataType = (&to_field.data_type).into();
check_cast(None, false, tuple_column, &dest_ty, &BUILTIN_FUNCTIONS)?
} else {
tuple_column
};

if from_field.data_type.is_nullable() && to_field.data_type.is_nullable() {
// add if function to cast null value
let is_not_null = check_function(
None,
"is_not_null",
&[],
&[expr.clone()],
&BUILTIN_FUNCTIONS,
)?;
let null_scalar = Expr::Constant {
span: None,
scalar: Scalar::Null,
data_type: DataType::Null,
};
check_function(
None,
"if",
&[],
&[is_not_null, tuple_column, null_scalar],
&BUILTIN_FUNCTIONS,
)
} else {
Ok(tuple_column)
}
}
59 changes: 59 additions & 0 deletions src/query/storages/orc/src/copy_into_table/processors/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@ use arrow_array::RecordBatch;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::types::ArrayColumn;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NullableColumn;
use databend_common_expression::BlockEntry;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::Column;
use databend_common_expression::ColumnBuilder;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::Evaluator;
use databend_common_expression::Expr;
use databend_common_expression::FunctionContext;
use databend_common_expression::Value;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_pipeline_core::processors::Event;
use databend_common_pipeline_core::processors::InputPort;
Expand Down Expand Up @@ -96,6 +102,59 @@ impl StripeDecoderForCopy {
let evaluator = Evaluator::new(&block, &self.func_ctx, &BUILTIN_FUNCTIONS);
let mut columns = Vec::with_capacity(projection.len());
for (field, expr) in self.output_schema.fields().iter().zip(projection.iter()) {
if let Expr::ColumnRef {
display_name, id, ..
} = expr
{
if let Some(display_name) = display_name.strip_prefix("#!") {
let typs = match field.data_type() {
DataType::Nullable(box DataType::Array(box DataType::Nullable(
box DataType::Tuple(v),
))) => v,
_ => {
log::error!("expect array of tuple, got {:?}", field);
unreachable!("expect value: array of tuple")
}
};
let positions = display_name
.split(',')
.map(|s| s.parse::<i32>().unwrap())
.collect::<Vec<i32>>();
let mut e = block.columns()[*id].clone();
match e.value {
Value::Column(Column::Nullable(box NullableColumn {
column:
Column::Array(box ArrayColumn {
values:
Column::Nullable(box NullableColumn {
column: Column::Tuple(ref mut v),
..
}),
..
}),
..
})) => {
let len = v[0].len();
let mut v2 = vec![];
for (i, p) in positions.iter().enumerate() {
if *p < 0 {
v2.push(ColumnBuilder::repeat_default(&typs[i], len).build());
} else {
v2.push(v[*p as usize].clone());
}
}
*v = v2
}
_ => {
log::error!("expect array of tuple, got {:?} {:?}", field, e.value);
unreachable!("expect value: array of tuple")
}
}
let column = BlockEntry::new(field.data_type().clone(), e.value);
columns.push(column);
continue;
}
}
let value = evaluator.run(expr)?;
let column = BlockEntry::new(field.data_type().clone(), value);
columns.push(column);
Expand Down
Loading
Loading