Skip to content

Commit

Permalink
Merge commit 'cc1db8a2043c73bda7adec309b42c08d88defab8' into chunchun…
Browse files Browse the repository at this point in the history
…/df-upgrade-2024-04-02
  • Loading branch information
appletreeisyellow committed Apr 22, 2024
2 parents e8de1c6 + cc1db8a commit 784ff55
Show file tree
Hide file tree
Showing 56 changed files with 1,707 additions and 1,551 deletions.
5 changes: 3 additions & 2 deletions benchmarks/src/tpch/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ impl ConvertOpt {
// Select all apart from the padding column
let selection = csv
.schema()
.fields()
.iter()
.take(schema.fields.len() - 1)
.map(|d| Expr::Column(d.qualified_column()))
.map(|(qualifier, field)| {
Expr::Column(Column::from((qualifier, field.as_ref())))
})
.collect();

csv = csv.select(selection)?;
Expand Down
28 changes: 13 additions & 15 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::array::{BooleanArray, Int32Array};
use arrow::record_batch::RecordBatch;

use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::common::{DFField, DFSchema};
use datafusion::common::DFSchema;
use datafusion::error::Result;
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion::physical_expr::{
Expand Down Expand Up @@ -272,32 +272,30 @@ fn expression_type_demo() -> Result<()> {
// types of the input expressions. You can provide this information using
// a schema. In this case we create a schema where the column `c` is of
// type Utf8 (a String / VARCHAR)
let schema = DFSchema::new_with_metadata(
vec![DFField::new_unqualified("c", DataType::Utf8, true)],
let schema = DFSchema::from_unqualifed_fields(
vec![Field::new("c", DataType::Utf8, true)].into(),
HashMap::new(),
)
.unwrap();
)?;
assert_eq!("Utf8", format!("{}", expr.get_type(&schema).unwrap()));

// Using a schema where the column `foo` is of type Int32
let schema = DFSchema::new_with_metadata(
vec![DFField::new_unqualified("c", DataType::Int32, true)],
let schema = DFSchema::from_unqualifed_fields(
vec![Field::new("c", DataType::Int32, true)].into(),
HashMap::new(),
)
.unwrap();
)?;
assert_eq!("Int32", format!("{}", expr.get_type(&schema).unwrap()));

// Get the type of an expression that adds 2 columns. Adding an Int32
// and Float32 results in Float32 type
let expr = col("c1") + col("c2");
let schema = DFSchema::new_with_metadata(
let schema = DFSchema::from_unqualifed_fields(
vec![
DFField::new_unqualified("c1", DataType::Int32, true),
DFField::new_unqualified("c2", DataType::Float32, true),
],
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Float32, true),
]
.into(),
HashMap::new(),
)
.unwrap();
)?;
assert_eq!("Float32", format!("{}", expr.get_type(&schema).unwrap()));

Ok(())
Expand Down
87 changes: 42 additions & 45 deletions datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! Column

use arrow_schema::Field;

use crate::error::_schema_err;
use crate::utils::{parse_identifiers_normalized, quote_identifier};
use crate::{DFSchema, DataFusionError, OwnedTableReference, Result, SchemaError};
Expand Down Expand Up @@ -178,11 +180,12 @@ impl Column {
}

for schema in schemas {
let fields = schema.fields_with_unqualified_name(&self.name);
match fields.len() {
let qualified_fields =
schema.qualified_fields_with_unqualified_name(&self.name);
match qualified_fields.len() {
0 => continue,
1 => {
return Ok(fields[0].qualified_column());
return Ok(Column::from(qualified_fields[0]));
}
_ => {
// More than 1 fields in this schema have their names set to self.name.
Expand All @@ -198,14 +201,13 @@ impl Column {
// We will use the relation from the first matched field to normalize self.

// Compare matched fields with one USING JOIN clause at a time
let columns = schema.columns_with_unqualified_name(&self.name);
for using_col in using_columns {
let all_matched = fields
.iter()
.all(|f| using_col.contains(&f.qualified_column()));
let all_matched = columns.iter().all(|f| using_col.contains(f));
// All matched fields belong to the same using column set, in orther words
// the same join clause. We simply pick the qualifer from the first match.
if all_matched {
return Ok(fields[0].qualified_column());
return Ok(columns[0].clone());
}
}
}
Expand All @@ -214,10 +216,7 @@ impl Column {

_schema_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new(self.relation.clone(), self.name)),
valid_fields: schemas
.iter()
.flat_map(|s| s.fields().iter().map(|f| f.qualified_column()))
.collect(),
valid_fields: schemas.iter().flat_map(|s| s.columns()).collect(),
})
}

Expand Down Expand Up @@ -267,13 +266,13 @@ impl Column {
}

for schema_level in schemas {
let fields = schema_level
let qualified_fields = schema_level
.iter()
.flat_map(|s| s.fields_with_unqualified_name(&self.name))
.flat_map(|s| s.qualified_fields_with_unqualified_name(&self.name))
.collect::<Vec<_>>();
match fields.len() {
match qualified_fields.len() {
0 => continue,
1 => return Ok(fields[0].qualified_column()),
1 => return Ok(Column::from(qualified_fields[0])),
_ => {
// More than 1 fields in this schema have their names set to self.name.
//
Expand All @@ -288,14 +287,16 @@ impl Column {
// We will use the relation from the first matched field to normalize self.

// Compare matched fields with one USING JOIN clause at a time
let columns = schema_level
.iter()
.flat_map(|s| s.columns_with_unqualified_name(&self.name))
.collect::<Vec<_>>();
for using_col in using_columns {
let all_matched = fields
.iter()
.all(|f| using_col.contains(&f.qualified_column()));
let all_matched = columns.iter().all(|c| using_col.contains(c));
// All matched fields belong to the same using column set, in orther words
// the same join clause. We simply pick the qualifer from the first match.
if all_matched {
return Ok(fields[0].qualified_column());
return Ok(columns[0].clone());
}
}

Expand All @@ -312,7 +313,7 @@ impl Column {
valid_fields: schemas
.iter()
.flat_map(|s| s.iter())
.flat_map(|s| s.fields().iter().map(|f| f.qualified_column()))
.flat_map(|s| s.columns())
.collect(),
})
}
Expand All @@ -338,6 +339,13 @@ impl From<String> for Column {
}
}

/// Create a column, use qualifier and field name
impl From<(Option<&OwnedTableReference>, &Field)> for Column {
fn from((relation, field): (Option<&OwnedTableReference>, &Field)) -> Self {
Self::new(relation.cloned(), field.name())
}
}

impl FromStr for Column {
type Err = Infallible;

Expand All @@ -355,36 +363,25 @@ impl fmt::Display for Column {
#[cfg(test)]
mod tests {
use super::*;
use crate::DFField;
use arrow::datatypes::DataType;
use std::collections::HashMap;

fn create_schema(names: &[(Option<&str>, &str)]) -> Result<DFSchema> {
let fields = names
.iter()
.map(|(qualifier, name)| {
DFField::new(
qualifier.to_owned().map(|s| s.to_string()),
name,
DataType::Boolean,
true,
)
})
.collect::<Vec<_>>();
DFSchema::new_with_metadata(fields, HashMap::new())
use arrow_schema::{Field, SchemaBuilder};

fn create_qualified_schema(qualifier: &str, names: Vec<&str>) -> Result<DFSchema> {
let mut schema_builder = SchemaBuilder::new();
schema_builder.extend(
names
.iter()
.map(|f| Field::new(*f, DataType::Boolean, true)),
);
let schema = Arc::new(schema_builder.finish());
DFSchema::try_from_qualified_schema(qualifier, &schema)
}

#[test]
fn test_normalize_with_schemas_and_ambiguity_check() -> Result<()> {
let schema1 = create_schema(&[(Some("t1"), "a"), (Some("t1"), "b")])?;
let schema2 = create_schema(&[(Some("t2"), "c"), (Some("t2"), "d")])?;
let schema3 = create_schema(&[
(Some("t3"), "a"),
(Some("t3"), "b"),
(Some("t3"), "c"),
(Some("t3"), "d"),
(Some("t3"), "e"),
])?;
let schema1 = create_qualified_schema("t1", vec!["a", "b"])?;
let schema2 = create_qualified_schema("t2", vec!["c", "d"])?;
let schema3 = create_qualified_schema("t3", vec!["a", "b", "c", "d", "e"])?;

// already normalized
let col = Column::new(Some("t1"), "a");
Expand Down
Loading

0 comments on commit 784ff55

Please sign in to comment.