Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[draft] Add LogicalType, try to support user-defined types #11160

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
10 changes: 6 additions & 4 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tempfile::TempDir;
use url::Url;
use datafusion_common::logical_type::schema::LogicalSchemaRef;

/// This example demonstrates using low level DataFusion APIs to read only
/// certain row groups and ranges from parquet files, based on external
Expand Down Expand Up @@ -299,8 +300,9 @@ impl IndexTableProvider {
// In this example, we use the PruningPredicate's literal guarantees to
// analyze the predicate. In a real system, using
// `PruningPredicate::prune` would likely be easier to do.
let schema = SchemaRef::new(self.schema().as_ref().clone().into());
let pruning_predicate =
PruningPredicate::try_new(Arc::clone(predicate), self.schema().clone())?;
PruningPredicate::try_new(Arc::clone(predicate), schema)?;

// The PruningPredicate's guarantees must all be satisfied in order for
// the predicate to possibly evaluate to true.
Expand Down Expand Up @@ -453,8 +455,8 @@ impl TableProvider for IndexTableProvider {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.indexed_file.schema)
fn schema(&self) -> LogicalSchemaRef {
LogicalSchemaRef::new(self.indexed_file.schema.as_ref().clone().into())
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -482,7 +484,7 @@ impl TableProvider for IndexTableProvider {
.with_extensions(Arc::new(access_plan) as _);

// Prepare for scanning
let schema = self.schema();
let schema = SchemaRef::new(self.schema().as_ref().clone().into());
let object_store_url = ObjectStoreUrl::parse("file://")?;
let file_scan_config = FileScanConfig::new(object_store_url, schema)
.with_limit(limit)
Expand Down
15 changes: 9 additions & 6 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use datafusion::arrow::array::{UInt64Builder, UInt8Builder};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::datatypes::{DataType, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::{provider_as_source, TableProvider, TableType};
use datafusion::error::Result;
Expand All @@ -38,6 +38,8 @@ use datafusion_physical_expr::EquivalenceProperties;

use async_trait::async_trait;
use tokio::time::timeout;
use datafusion_common::logical_type::field::LogicalField;
use datafusion_common::logical_type::schema::{LogicalSchema, LogicalSchemaRef};

/// This example demonstrates executing a simple query against a custom datasource
#[tokio::main]
Expand Down Expand Up @@ -162,10 +164,10 @@ impl TableProvider for CustomDataSource {
self
}

fn schema(&self) -> SchemaRef {
SchemaRef::new(Schema::new(vec![
Field::new("id", DataType::UInt8, false),
Field::new("bank_account", DataType::UInt64, true),
fn schema(&self) -> LogicalSchemaRef {
LogicalSchemaRef::new(LogicalSchema::new(vec![
LogicalField::new("id", DataType::UInt8, false),
LogicalField::new("bank_account", DataType::UInt64, true),
]))
}

Expand All @@ -181,7 +183,8 @@ impl TableProvider for CustomDataSource {
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
return self.create_physical_plan(projection, self.schema()).await;
let schema = SchemaRef::new(self.schema().as_ref().clone().into());
return self.create_physical_plan(projection, schema).await;
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/dataframe_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use arrow_schema::DataType;
use std::sync::Arc;

use arrow_schema::DataType;
use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg;
use datafusion::prelude::*;
Expand Down
20 changes: 11 additions & 9 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use datafusion::functions_aggregate::first_last::first_value_udaf;
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries};
use datafusion::prelude::*;
use datafusion_common::logical_type::field::LogicalField;
use datafusion_common::logical_type::schema::LogicalSchema;
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::BinaryExpr;
Expand Down Expand Up @@ -156,7 +158,7 @@ fn simplify_demo() -> Result<()> {
// However, DataFusion's simplification logic can do this for you

// you need to tell DataFusion the type of column "ts":
let schema = Schema::new(vec![make_ts_field("ts")]).to_dfschema_ref()?;
let schema = LogicalSchema::from(Schema::new(vec![make_ts_field("ts")])).to_dfschema_ref()?;

// And then build a simplifier
// the ExecutionProps carries information needed to simplify
Expand All @@ -177,10 +179,10 @@ fn simplify_demo() -> Result<()> {
);

// here are some other examples of what DataFusion is capable of
let schema = Schema::new(vec![
let schema = LogicalSchema::from(Schema::new(vec![
make_field("i", DataType::Int64),
make_field("b", DataType::Boolean),
])
]))
.to_dfschema_ref()?;
let context = SimplifyContext::new(&props).with_schema(schema.clone());
let simplifier = ExprSimplifier::new(context);
Expand Down Expand Up @@ -211,7 +213,7 @@ fn simplify_demo() -> Result<()> {
// String --> Date simplification
// `cast('2020-09-01' as date)` --> 18500
assert_eq!(
simplifier.simplify(lit("2020-09-01").cast_to(&DataType::Date32, &schema)?)?,
simplifier.simplify(lit("2020-09-01").cast_to(&DataType::Date32.into(), &schema)?)?,
lit(ScalarValue::Date32(Some(18506)))
);

Expand Down Expand Up @@ -258,7 +260,7 @@ fn range_analysis_demo() -> Result<()> {
let analysis_result = analyze(
&physical_expr,
AnalysisContext::new(boundaries),
df_schema.as_ref(),
&df_schema.into(),
)?;

// The results of the analysis is an range, encoded as an `Interval`, for
Expand Down Expand Up @@ -293,14 +295,14 @@ fn expression_type_demo() -> Result<()> {
// a schema. In this case we create a schema where the column `c` is of
// type Utf8 (a String / VARCHAR)
let schema = DFSchema::from_unqualified_fields(
vec![Field::new("c", DataType::Utf8, true)].into(),
vec![LogicalField::new("c", DataType::Utf8, true)].into(),
HashMap::new(),
)?;
assert_eq!("Utf8", format!("{}", expr.get_type(&schema).unwrap()));

// Using a schema where the column `foo` is of type Int32
let schema = DFSchema::from_unqualified_fields(
vec![Field::new("c", DataType::Int32, true)].into(),
vec![LogicalField::new("c", DataType::Int32, true)].into(),
HashMap::new(),
)?;
assert_eq!("Int32", format!("{}", expr.get_type(&schema).unwrap()));
Expand All @@ -310,8 +312,8 @@ fn expression_type_demo() -> Result<()> {
let expr = col("c1") + col("c2");
let schema = DFSchema::from_unqualified_fields(
vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Float32, true),
LogicalField::new("c1", DataType::Int32, true),
LogicalField::new("c2", DataType::Float32, true),
]
.into(),
HashMap::new(),
Expand Down
7 changes: 5 additions & 2 deletions datafusion-examples/examples/function_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use datafusion::error::Result;
use datafusion::execution::context::{
FunctionFactory, RegisterFunction, SessionContext, SessionState,
};
use datafusion_common::logical_type::ExtensionType;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{exec_err, internal_err, DataFusionError};
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
Expand Down Expand Up @@ -216,13 +217,15 @@ impl TryFrom<CreateFunction> for ScalarFunctionWrapper {
.expect("Expression has to be defined!"),
return_type: definition
.return_type
.expect("Return type has to be defined!"),
.expect("Return type has to be defined!")
.physical()
.clone(),
signature: Signature::exact(
definition
.args
.unwrap_or_default()
.into_iter()
.map(|a| a.data_type)
.map(|a| a.data_type.physical().clone())
.collect(),
definition
.params
Expand Down
108 changes: 108 additions & 0 deletions datafusion-examples/examples/logical_type.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::logical_type::schema::{LogicalSchema, LogicalSchemaRef};
use datafusion::error::Result;
use datafusion_expr::{Expr, TableType};
use std::any::Any;
use std::sync::Arc;
use arrow::util::pretty::pretty_format_batches;
use arrow_schema::{DataType, Field, TimeUnit};
use datafusion::prelude::SessionContext;
use datafusion_common::logical_type::{ExtensionType};
use datafusion_common::logical_type::field::LogicalField;
use datafusion_common::logical_type::signature::LogicalType;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_table("example", Arc::new(ExampleTableSource::default()))?;

let df = ctx.sql("SELECT * FROM example").await?;
let records = df.collect().await?;

println!("{}", pretty_format_batches(&records)?);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the primary usecase of the LogicalType so that we could write functions that take the logical type, or define custom behavior?

Like I wonder is the idea that we could now create a ScalarFunctionImpl whose signature refers to LogicalType rather than PhysicalType 🤔 Or somehow plan a binary operation on logical types?

BTW I think the work we are doing with @samuelcolvin @dharanad and @jayzhan211 in #11207 would make it straightforward to implement a custom comparsion (via a function) for this magical type

That might be a good thing to show too

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like I wonder is the idea that we could now create a ScalarFunctionImpl whose signature refers to LogicalType rather than PhysicalType 🤔 Or somehow plan a binary operation on logical types?

Potentially both. The motivation behind this change is to simplify the interaction between standard types represented by different encodings (like RunArray, the various Views and DictionaryArray but potentially also user defined ones via Arrow Extension Types).


Ok(())
}

#[derive(Debug)]
struct CustomMagicalType {
logical: LogicalType,
physical: DataType
}

impl Default for CustomMagicalType {
fn default() -> Self {
Self {
logical: LogicalType::Utf8,
physical: DataType::new_list(DataType::UInt8, false),
}
}
}

impl ExtensionType for CustomMagicalType {
fn logical(&self) -> &LogicalType {
&self.logical
}

fn physical(&self) -> &DataType {
&self.physical
}

// TODO: materialisation methods?
}

#[derive(Default)]
struct ExampleTableSource {}

#[async_trait::async_trait]
impl TableProvider for ExampleTableSource {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> LogicalSchemaRef {
// TODO: ugly?
let custom_magical_type: Arc<dyn ExtensionType + Send + Sync> = Arc::new(CustomMagicalType::default());

// This schema will be equivalent to:
// a -> Timestamp(Microsecond, None)
// b -> Utf8
// c -> Int64
Arc::new(LogicalSchema::new(vec![
LogicalField::new(
"a",
DataType::RunEndEncoded(
Arc::new(Field::new("run_ends", DataType::Int64, false)),
Arc::new(Field::new("values", DataType::Timestamp(TimeUnit::Microsecond, None), false))
),
false
),
LogicalField::new(
"b",
custom_magical_type,
false
),
LogicalField::new(
"c",
DataType::Int64,
true,
)
]))
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
_state: &SessionState,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
todo!()
}
}
8 changes: 5 additions & 3 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use std::sync::{
};
use tempfile::TempDir;
use url::Url;
use datafusion_common::logical_type::schema::LogicalSchemaRef;

/// This example demonstrates building a secondary index over multiple Parquet
/// files and using that index during query to skip ("prune") files that do not
Expand Down Expand Up @@ -212,8 +213,8 @@ impl TableProvider for IndexTableProvider {
self
}

fn schema(&self) -> SchemaRef {
self.index.schema().clone()
fn schema(&self) -> LogicalSchemaRef {
LogicalSchemaRef::new(self.index.schema().into())
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -243,7 +244,8 @@ impl TableProvider for IndexTableProvider {
let files = self.index.get_files(predicate.clone())?;

let object_store_url = ObjectStoreUrl::parse("file://")?;
let mut file_scan_config = FileScanConfig::new(object_store_url, self.schema())
let schema = SchemaRef::new(self.schema().as_ref().clone().into());
let mut file_scan_config = FileScanConfig::new(object_store_url, schema)
.with_projection(projection.cloned())
.with_limit(limit);

Expand Down
8 changes: 5 additions & 3 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::logical_type::TypeRelation;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::{
Expand All @@ -31,6 +32,7 @@ use datafusion_sql::sqlparser::parser::Parser;
use datafusion_sql::TableReference;
use std::any::Any;
use std::sync::Arc;
use datafusion_common::logical_type::schema::LogicalSchemaRef;

pub fn main() -> Result<()> {
// produce a logical plan using the datafusion-sql crate
Expand Down Expand Up @@ -211,7 +213,7 @@ impl ContextProvider for MyContextProvider {
None
}

fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
fn get_variable_type(&self, _variable_names: &[String]) -> Option<TypeRelation> {
None
}

Expand Down Expand Up @@ -245,7 +247,7 @@ impl TableSource for MyTableSource {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
fn schema(&self) -> LogicalSchemaRef {
LogicalSchemaRef::new(self.schema.as_ref().clone().into())
}
}
8 changes: 4 additions & 4 deletions datafusion-examples/examples/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::fs::File;
use std::io::Seek;
use std::path::Path;
use std::sync::Arc;

use datafusion_common::logical_type::schema::LogicalSchemaRef;
// To define your own table function, you only need to do the following 3 things:
// 1. Implement your own [`TableProvider`]
// 2. Implement your own [`TableFunctionImpl`] and return your [`TableProvider`]
Expand Down Expand Up @@ -85,8 +85,8 @@ impl TableProvider for LocalCsvTable {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
fn schema(&self) -> LogicalSchemaRef {
LogicalSchemaRef::new(self.schema.clone().into())
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -121,7 +121,7 @@ impl TableProvider for LocalCsvTable {
};
Ok(Arc::new(MemoryExec::try_new(
&[batches],
TableProvider::schema(self),
self.schema.clone(),
projection.cloned(),
)?))
}
Expand Down
Loading
Loading