Skip to content

Commit

Permalink
draft: add logical types and support extension types
Browse files Browse the repository at this point in the history
  • Loading branch information
notfilippo committed Jun 28, 2024
1 parent 57280e4 commit c30746c
Show file tree
Hide file tree
Showing 101 changed files with 2,381 additions and 1,326 deletions.
6 changes: 3 additions & 3 deletions datafusion-examples/examples/dataframe_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow_schema::DataType;
use datafusion_common::logical_type::LogicalType;
use std::sync::Arc;

use datafusion::error::Result;
Expand Down Expand Up @@ -48,7 +48,7 @@ async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> {
scalar_subquery(Arc::new(
ctx.table("t2")
.await?
.filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))?
.filter(out_ref_col(LogicalType::Utf8, "t1.c1").eq(col("t2.c1")))?
.aggregate(vec![], vec![avg(col("t2.c2"))])?
.select(vec![avg(col("t2.c2"))])?
.into_unoptimized_plan(),
Expand Down Expand Up @@ -91,7 +91,7 @@ async fn where_exist_subquery(ctx: &SessionContext) -> Result<()> {
.filter(exists(Arc::new(
ctx.table("t2")
.await?
.filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))?
.filter(out_ref_col(LogicalType::Utf8, "t1.c1").eq(col("t2.c1")))?
.select(vec![col("t2.c2")])?
.into_unoptimized_plan(),
)))?
Expand Down
21 changes: 12 additions & 9 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ use datafusion::functions_aggregate::first_last::first_value_udaf;
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries};
use datafusion::prelude::*;
use datafusion_common::logical_type::field::LogicalField;
use datafusion_common::logical_type::schema::LogicalSchema;
use datafusion_common::logical_type::LogicalType;
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::BinaryExpr;
Expand Down Expand Up @@ -156,7 +159,7 @@ fn simplify_demo() -> Result<()> {
// However, DataFusion's simplification logic can do this for you

// you need to tell DataFusion the type of column "ts":
let schema = Schema::new(vec![make_ts_field("ts")]).to_dfschema_ref()?;
let schema = LogicalSchema::from(Schema::new(vec![make_ts_field("ts")])).to_dfschema_ref()?;

// And then build a simplifier
// the ExecutionProps carries information needed to simplify
Expand All @@ -177,10 +180,10 @@ fn simplify_demo() -> Result<()> {
);

// here are some other examples of what DataFusion is capable of
let schema = Schema::new(vec![
let schema = LogicalSchema::from(Schema::new(vec![
make_field("i", DataType::Int64),
make_field("b", DataType::Boolean),
])
]))
.to_dfschema_ref()?;
let context = SimplifyContext::new(&props).with_schema(schema.clone());
let simplifier = ExprSimplifier::new(context);
Expand Down Expand Up @@ -211,7 +214,7 @@ fn simplify_demo() -> Result<()> {
// String --> Date simplification
// `cast('2020-09-01' as date)` --> 18500
assert_eq!(
simplifier.simplify(lit("2020-09-01").cast_to(&DataType::Date32, &schema)?)?,
simplifier.simplify(lit("2020-09-01").cast_to(&LogicalType::Date32, &schema)?)?,
lit(ScalarValue::Date32(Some(18506)))
);

Expand Down Expand Up @@ -258,7 +261,7 @@ fn range_analysis_demo() -> Result<()> {
let analysis_result = analyze(
&physical_expr,
AnalysisContext::new(boundaries),
df_schema.as_ref(),
&df_schema.into(),
)?;

// The results of the analysis is an range, encoded as an `Interval`, for
Expand Down Expand Up @@ -293,14 +296,14 @@ fn expression_type_demo() -> Result<()> {
// a schema. In this case we create a schema where the column `c` is of
// type Utf8 (a String / VARCHAR)
let schema = DFSchema::from_unqualifed_fields(
vec![Field::new("c", DataType::Utf8, true)].into(),
vec![LogicalField::new("c", LogicalType::Utf8, true)].into(),
HashMap::new(),
)?;
assert_eq!("Utf8", format!("{}", expr.get_type(&schema).unwrap()));

// Using a schema where the column `foo` is of type Int32
let schema = DFSchema::from_unqualifed_fields(
vec![Field::new("c", DataType::Int32, true)].into(),
vec![LogicalField::new("c", LogicalType::Int32, true)].into(),
HashMap::new(),
)?;
assert_eq!("Int32", format!("{}", expr.get_type(&schema).unwrap()));
Expand All @@ -310,8 +313,8 @@ fn expression_type_demo() -> Result<()> {
let expr = col("c1") + col("c2");
let schema = DFSchema::from_unqualifed_fields(
vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Float32, true),
LogicalField::new("c1", LogicalType::Int32, true),
LogicalField::new("c2", LogicalType::Float32, true),
]
.into(),
HashMap::new(),
Expand Down
6 changes: 4 additions & 2 deletions datafusion-examples/examples/function_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use datafusion::error::Result;
use datafusion::execution::context::{
FunctionFactory, RegisterFunction, SessionContext, SessionState,
};
use datafusion_common::logical_type::extension::ExtensionType;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{exec_err, internal_err, DataFusionError};
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
Expand Down Expand Up @@ -216,13 +217,14 @@ impl TryFrom<CreateFunction> for ScalarFunctionWrapper {
.expect("Expression has to be defined!"),
return_type: definition
.return_type
.expect("Return type has to be defined!"),
.expect("Return type has to be defined!")
.physical_type(),
signature: Signature::exact(
definition
.args
.unwrap_or_default()
.into_iter()
.map(|a| a.data_type)
.map(|a| a.data_type.physical_type())
.collect(),
definition
.params
Expand Down
3 changes: 2 additions & 1 deletion datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::logical_type::LogicalType;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::{
Expand Down Expand Up @@ -211,7 +212,7 @@ impl ContextProvider for MyContextProvider {
None
}

fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
fn get_variable_type(&self, _variable_names: &[String]) -> Option<LogicalType> {
None
}

Expand Down
15 changes: 7 additions & 8 deletions datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

//! Column

use arrow_schema::{Field, FieldRef};

use crate::error::_schema_err;
use crate::utils::{parse_identifiers_normalized, quote_identifier};
use crate::{DFSchema, DataFusionError, Result, SchemaError, TableReference};
Expand All @@ -27,6 +25,7 @@ use std::convert::Infallible;
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use crate::logical_type::field::{LogicalField, LogicalFieldRef};

/// A named reference to a qualified field in a schema.
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
Expand Down Expand Up @@ -349,15 +348,15 @@ impl From<String> for Column {
}

/// Create a column, use qualifier and field name
impl From<(Option<&TableReference>, &Field)> for Column {
fn from((relation, field): (Option<&TableReference>, &Field)) -> Self {
impl From<(Option<&TableReference>, &LogicalField)> for Column {
fn from((relation, field): (Option<&TableReference>, &LogicalField)) -> Self {
Self::new(relation.cloned(), field.name())
}
}

/// Create a column, use qualifier and field name
impl From<(Option<&TableReference>, &FieldRef)> for Column {
fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self {
impl From<(Option<&TableReference>, &LogicalFieldRef)> for Column {
fn from((relation, field): (Option<&TableReference>, &LogicalFieldRef)) -> Self {
Self::new(relation.cloned(), field.name())
}
}
Expand All @@ -380,7 +379,7 @@ impl fmt::Display for Column {
mod tests {
use super::*;
use arrow::datatypes::DataType;
use arrow_schema::SchemaBuilder;
use arrow_schema::{Field, SchemaBuilder};

fn create_qualified_schema(qualifier: &str, names: Vec<&str>) -> Result<DFSchema> {
let mut schema_builder = SchemaBuilder::new();
Expand All @@ -389,7 +388,7 @@ mod tests {
.iter()
.map(|f| Field::new(*f, DataType::Boolean, true)),
);
let schema = Arc::new(schema_builder.finish());
let schema = Arc::new(schema_builder.finish().into());
DFSchema::try_from_qualified_schema(qualifier, &schema)
}

Expand Down
Loading

0 comments on commit c30746c

Please sign in to comment.