Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: convert to arrow column remove ignore inside nullable #15477

Merged
merged 16 commits into from
May 15, 2024
2 changes: 1 addition & 1 deletion src/query/expression/src/converts/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ mod to;

pub const EXTENSION_KEY: &str = "Extension";

pub use to::table_schema_to_arrow_schema_ignore_inside_nullable;
pub use to::table_schema_to_arrow_schema;
14 changes: 3 additions & 11 deletions src/query/expression/src/converts/arrow/to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use databend_common_arrow::arrow::datatypes::Field as Arrow2Field;
use databend_common_exception::Result;

use super::EXTENSION_KEY;
use crate::converts::arrow2::table_field_to_arrow2_field_ignore_inside_nullable;
use crate::infer_table_schema;
use crate::Column;
use crate::DataBlock;
Expand Down Expand Up @@ -67,18 +66,11 @@ impl From<&TableSchema> for ArrowSchema {
}
}

/// Parquet2 can't dealing with nested type like Tuple(int not null,int null) null, but for type like Tuple(int null,int null) null, it can work.
///
/// So when casting from TableSchema to Arrow2 schema, the inner type inherit the nullable property from outer type.
///
/// But when casting from TableSchema to Arrow-rs schema, there is no such problem, so the inside nullable is ignored.
pub fn table_schema_to_arrow_schema_ignore_inside_nullable(schema: &TableSchema) -> ArrowSchema {
pub fn table_schema_to_arrow_schema(schema: &TableSchema) -> ArrowSchema {
let fields = schema
.fields
.iter()
.map(|f| {
arrow_field_from_arrow2_field(table_field_to_arrow2_field_ignore_inside_nullable(f))
})
.map(|f| arrow_field_from_arrow2_field(f.into()))
.collect::<Vec<_>>();
ArrowSchema {
fields: Fields::from(fields),
Expand Down Expand Up @@ -106,7 +98,7 @@ impl DataBlock {
}

pub fn to_record_batch(self, table_schema: &TableSchema) -> Result<RecordBatch> {
let arrow_schema = table_schema_to_arrow_schema_ignore_inside_nullable(table_schema);
let arrow_schema = table_schema_to_arrow_schema(table_schema);
let mut arrays = Vec::with_capacity(self.columns().len());
for (entry, arrow_field) in self
.convert_to_full()
Expand Down
1 change: 0 additions & 1 deletion src/query/expression/src/converts/arrow2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,3 @@ pub const ARROW_EXT_TYPE_BITMAP: &str = "Bitmap";
pub const ARROW_EXT_TYPE_GEOMETRY: &str = "Geometry";

pub use to::set_validities;
pub use to::table_field_to_arrow2_field_ignore_inside_nullable;
66 changes: 9 additions & 57 deletions src/query/expression/src/converts/arrow2/to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,11 @@ impl From<&DataSchema> for ArrowSchema {

impl From<&TableField> for ArrowField {
fn from(f: &TableField) -> Self {
let ty = table_type_to_arrow_type(&f.data_type, false, false);
let ty = table_type_to_arrow_type(&f.data_type);
ArrowField::new(f.name(), ty, f.is_nullable())
}
}

pub fn table_field_to_arrow2_field_ignore_inside_nullable(f: &TableField) -> ArrowField {
let ty = table_type_to_arrow_type(&f.data_type, false, true);
ArrowField::new(f.name(), ty, f.is_nullable())
}

impl From<&DataField> for ArrowField {
fn from(f: &DataField) -> Self {
ArrowField::from(&TableField::from(f))
Expand All @@ -81,11 +76,7 @@ impl From<&DataField> for ArrowField {

// Note: Arrow's data type is not nullable, so we need to explicitly
// add nullable information to Arrow's field afterwards.
fn table_type_to_arrow_type(
ty: &TableDataType,
inside_nullable: bool,
ignore_inside_nullable: bool,
) -> ArrowDataType {
fn table_type_to_arrow_type(ty: &TableDataType) -> ArrowDataType {
match ty {
TableDataType::Null => ArrowDataType::Null,
TableDataType::EmptyArray => ArrowDataType::Extension(
Expand All @@ -112,12 +103,9 @@ fn table_type_to_arrow_type(
}
TableDataType::Timestamp => ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
TableDataType::Date => ArrowDataType::Date32,
TableDataType::Nullable(ty) => {
table_type_to_arrow_type(ty.as_ref(), true, ignore_inside_nullable)
}
TableDataType::Nullable(ty) => table_type_to_arrow_type(ty.as_ref()),
TableDataType::Array(ty) => {
let arrow_ty =
table_type_to_arrow_type(ty.as_ref(), inside_nullable, ignore_inside_nullable);
let arrow_ty = table_type_to_arrow_type(ty.as_ref());
ArrowDataType::LargeList(Box::new(ArrowField::new(
"_array",
arrow_ty,
Expand All @@ -130,16 +118,8 @@ fn table_type_to_arrow_type(
fields_name: _fields_name,
fields_type,
} => {
let key_ty = table_type_to_arrow_type(
&fields_type[0],
inside_nullable,
ignore_inside_nullable,
);
let val_ty = table_type_to_arrow_type(
&fields_type[1],
inside_nullable,
ignore_inside_nullable,
);
let key_ty = table_type_to_arrow_type(&fields_type[0]);
let val_ty = table_type_to_arrow_type(&fields_type[1]);
let key_field = ArrowField::new("key", key_ty, fields_type[0].is_nullable());
let val_field = ArrowField::new("value", val_ty, fields_type[1].is_nullable());
ArrowDataType::Struct(vec![key_field, val_field])
Expand All @@ -166,8 +146,8 @@ fn table_type_to_arrow_type(
.map(|(name, ty)| {
ArrowField::new(
name.as_str(),
table_type_to_arrow_type(ty, inside_nullable, ignore_inside_nullable),
ty.is_nullable() || (inside_nullable && !ignore_inside_nullable),
table_type_to_arrow_type(ty),
ty.is_nullable(),
)
})
.collect();
Expand Down Expand Up @@ -427,37 +407,9 @@ pub fn set_validities(
arrow_array: Box<dyn databend_common_arrow::arrow::array::Array>,
validity: &Bitmap,
) -> Box<dyn databend_common_arrow::arrow::array::Array> {
// merge Struct validity with the inner fields validity
let validity = match arrow_array.validity() {
Some(inner_validity) => inner_validity & validity,
None => validity.clone(),
};

match arrow_array.data_type() {
ArrowDataType::Null => arrow_array.clone(),
ArrowDataType::Extension(_, t, _) if **t == ArrowDataType::Null => arrow_array.clone(),
ArrowDataType::Struct(_) => {
let struct_array = arrow_array
.as_any()
.downcast_ref::<databend_common_arrow::arrow::array::StructArray>()
.expect("fail to read from arrow: array should be `StructArray`");
let fields = struct_array
.values()
.iter()
.map(|array| {
let array = set_validities(array.clone(), &validity);
array.clone()
})
.collect::<Vec<_>>();
Box::new(
databend_common_arrow::arrow::array::StructArray::try_new(
arrow_array.data_type().clone(),
fields,
Some(validity),
)
.unwrap(),
)
}
_ => arrow_array.with_validity(Some(validity)),
_ => arrow_array.with_validity(Some(validity.clone())),
}
}
45 changes: 23 additions & 22 deletions src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,22 +565,31 @@ impl TableSchema {
) -> HashMap<ColumnId, Scalar> {
fn collect_leaf_default_values(
default_value: &Scalar,
data_type: &TableDataType,
column_ids: &[ColumnId],
index: &mut usize,
leaf_default_values: &mut HashMap<ColumnId, Scalar>,
) {
match default_value {
Scalar::Tuple(s) => {
s.iter().for_each(|default_val| {
match (data_type.remove_nullable(), default_value) {
(TableDataType::Tuple { fields_type, .. }, Scalar::Tuple(vals)) => {
for (ty, val) in fields_type.iter().zip_eq(vals.iter()) {
collect_leaf_default_values(
default_val,
val,
ty,
column_ids,
index,
leaf_default_values,
)
});
);
}
}
(
TableDataType::Tuple { .. } | TableDataType::Array(_) | TableDataType::Map(_),
_,
) => {
// ignore leaf columns
let n = data_type.num_leaf_columns();
*index += n;
}
Scalar::Map(_) | Scalar::Array(_) => {}
_ => {
debug_assert!(!default_value.is_nested_scalar());
leaf_default_values.insert(column_ids[*index], default_value.to_owned());
Expand All @@ -590,13 +599,14 @@ impl TableSchema {
}

let mut leaf_default_values = HashMap::with_capacity(self.num_fields());
let leaf_field_column_ids = self.field_leaf_column_ids();
for (default_value, field_column_ids) in default_values.iter().zip_eq(leaf_field_column_ids)
{
for (default_value, field) in default_values.iter().zip_eq(self.fields()) {
let mut index = 0;
let data_type = field.data_type();
let column_ids = field.leaf_column_ids();
collect_leaf_default_values(
default_value,
&field_column_ids,
data_type,
&column_ids,
&mut index,
&mut leaf_default_values,
);
Expand Down Expand Up @@ -841,11 +851,9 @@ impl TableSchema {
fn collect_in_field(
field: &TableField,
fields: &mut Vec<TableField>,
is_nullable: bool,
next_column_id: &mut ColumnId,
) {
let ty = field.data_type();
let is_nullable = ty.is_nullable() || is_nullable;
match ty.remove_nullable() {
TableDataType::Tuple {
fields_type,
Expand All @@ -859,7 +867,6 @@ impl TableSchema {
*next_column_id,
),
fields,
is_nullable,
next_column_id,
);
}
Expand All @@ -872,7 +879,6 @@ impl TableSchema {
*next_column_id,
),
fields,
is_nullable,
next_column_id,
);
}
Expand All @@ -884,17 +890,12 @@ impl TableSchema {
*next_column_id,
),
fields,
is_nullable,
next_column_id,
);
}
_ => {
*next_column_id += 1;
let mut field = field.clone();
if is_nullable {
field.data_type = field.data_type.wrap_nullable();
}
fields.push(field)
fields.push(field.clone())
}
}
}
Expand All @@ -906,7 +907,7 @@ impl TableSchema {
continue;
}
let mut next_column_id = field.column_id;
collect_in_field(field, &mut fields, false, &mut next_column_id);
collect_in_field(field, &mut fields, &mut next_column_id);
}
fields
}
Expand Down
5 changes: 1 addition & 4 deletions src/query/expression/tests/it/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,7 @@ fn test_schema_from_struct() -> Result<()> {
"arraytuple:0:1",
TableDataType::Number(NumberDataType::UInt64),
),
(
"nullarray:0",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
),
("nullarray:0", TableDataType::Number(NumberDataType::UInt64)),
(
"maparray:key",
TableDataType::Number(NumberDataType::UInt64),
Expand Down
14 changes: 8 additions & 6 deletions src/query/service/src/test_kits/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,8 @@ impl TestFixture {
TableField::new("t", TableDataType::Tuple {
fields_name: vec!["a".to_string(), "b".to_string()],
fields_type: vec![
TableDataType::Number(NumberDataType::Int32),
TableDataType::Number(NumberDataType::Int32),
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::Int32))),
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::Int32))),
],
}),
]);
Expand All @@ -596,15 +596,17 @@ impl TestFixture {
.take(rows_per_block)
.collect::<Vec<i32>>(),
);
let column1 = Int32Type::from_data(
let column1 = Int32Type::from_opt_data(
std::iter::repeat_with(|| (idx as i32 + start) * 2)
.take(rows_per_block)
.collect::<Vec<i32>>(),
.map(Some)
.collect::<Vec<Option<i32>>>(),
);
let column2 = Int32Type::from_data(
let column2 = Int32Type::from_opt_data(
std::iter::repeat_with(|| (idx as i32 + start) * 3)
.take(rows_per_block)
.collect::<Vec<i32>>(),
.map(Some)
.collect::<Vec<Option<i32>>>(),
);
let tuple_inner_columns = vec![column1, column2];
let tuple_column = Column::Tuple(tuple_inner_columns);
Expand Down
6 changes: 2 additions & 4 deletions src/query/storages/common/blocks/src/parquet_rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::converts::arrow::table_schema_to_arrow_schema_ignore_inside_nullable;
use databend_common_expression::converts::arrow::table_schema_to_arrow_schema;
use databend_common_expression::DataBlock;
use databend_common_expression::TableSchema;
use databend_storages_common_table_meta::table::TableCompression;
Expand Down Expand Up @@ -46,9 +46,7 @@ pub fn blocks_to_parquet(
.into_iter()
.map(|block| block.to_record_batch(table_schema))
.collect::<Result<Vec<_>>>()?;
let arrow_schema = Arc::new(table_schema_to_arrow_schema_ignore_inside_nullable(
table_schema,
));
let arrow_schema = Arc::new(table_schema_to_arrow_schema(table_schema));
let mut writer = ArrowWriter::try_new(write_buffer, arrow_schema, Some(props))?;
for batch in batches {
writer.write(&batch)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;

use arrow_array::RecordBatch;
use databend_common_expression::converts::arrow::table_schema_to_arrow_schema_ignore_inside_nullable;
use databend_common_expression::converts::arrow::table_schema_to_arrow_schema;
use databend_common_expression::ColumnId;
use databend_common_expression::TableSchema;
use databend_storages_common_table_meta::meta::Compression;
Expand All @@ -35,7 +35,7 @@ pub fn column_chunks_to_record_batch(
column_chunks: &HashMap<ColumnId, DataItem>,
compression: &Compression,
) -> databend_common_exception::Result<RecordBatch> {
let arrow_schema = table_schema_to_arrow_schema_ignore_inside_nullable(original_schema);
let arrow_schema = table_schema_to_arrow_schema(original_schema);
let parquet_schema = arrow_to_parquet_schema(&arrow_schema)?;
let column_id_to_dfs_id = original_schema
.to_leaf_column_ids()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ statement ok
create table t_with_bloom_index(a int, b int) bloom_index_columns='b'

statement ok
drop table if exists t_with_bloom_index;
drop table if exists t_row_per_block;

statement error 1301
create table t_row_per_block(a int) row_per_block = 100000000000;
Expand Down Expand Up @@ -391,6 +391,9 @@ d MAP(STRING, INT32) YES NULL (empty)
e MAP(STRING, STRING) YES NULL (empty)
f TUPLE(1 INT32, 2 STRING, 3 ARRAY(INT32), 4 ARRAY(STRING)) YES NULL (empty)

statement ok
drop table if exists replace_test;

statement ok
create table replace_test(a int);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ CREATE FUNCTION IF NOT EXISTS isnotempty_with_desc AS(p) -> not(is_null(p)) DESC
statement error 2603
CREATE FUNCTION isnotempty_with_desc AS(p) -> not(is_null(p)) DESC = 'This is a description'

statement ok
DROP FUNCTION IF EXISTS with_lambda

statement ok
CREATE FUNCTION with_lambda AS(list) -> array_filter(list, x -> x > 2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ d SMALLINT NO 0 (empty)
e DATE NO '1970-01-01' (empty)
f VARCHAR NO '' (empty)


## FLAKY TEST BY dropped table
## query VVV
## select column_name, nullable, is_nullable from INFORMATION_SCHEMA.COLUMNS where table_name='tables_with_history' and column_name in ('num_rows', 'dropped_on') order by column_name
Expand Down
Loading
Loading