Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions rust/datafusion/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ use crate::{
};
use crate::{
physical_plan::{
aggregates, expressions::binary_operator_data_type, functions,
type_coercion::can_coerce_from, udf::ScalarUDF,
aggregates, expressions::binary_operator_data_type, functions, udf::ScalarUDF,
},
sql::parser::FileType,
};
Expand Down Expand Up @@ -323,21 +322,19 @@ impl Expr {
///
/// # Errors
///
/// This function errors when it is impossible to cast the expression to the target [arrow::datatypes::DataType].
/// Currently no errors happen at plan time. If it is impossible
Copy link
Member

@jorgecarleitao jorgecarleitao Oct 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure on whether this is advisable:

Doesn't this mean that the plan can fail arbitrarily when a user performs an impossible cast? This can happen like 10hs after the execution starts, when the query finally reaches that point.

I though that the point of having the types available during planning, both logical and physical, was to perform exactly these types of checks. Removing this check seems a regression to me, on which an impossible cast will only be caught during execution.

Isn't it possible to expand can_coerce_from to cover the cases that DataFusion is missing but that arrow cast kernel supports?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is interesting that there appear to be no uses (or tests) for this function -- https://github.com/apache/arrow/search?q=cast_to I found this quite confusing

@jorgecarleitao what do you think about simply removing the whole thing entirely?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry, I was confused: can_coerce_from checks whether we can perform a lossless cast. This is different from the ability to perform any cast.

Therefore, I think that the logic we may want is:

  • the Expr::Cast operation should check that we can perform the cast at all (e.g. a ListArray to a UInt32Array should not be allowed). This should match the casts that the kernel supports.
  • implicit casts, such as the ones that we perform when we pass arguments to functions to try to match the signature, should use can_coerce_from, as we do not want to perform lossless implicit casts.

AFAIK casts to and from dictionaries are lossless, and thus they can happen at any point in the physical planning. Since all arrays support dictionaries, I guess we only have to error if the implementation is still not available.

What do you think, @alamb ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorgecarleitao -- I think this makes sense. I was confused about the intent of can_coerce_from (aka that it is for lossless conversions only). With this explanation let me go back and give adding dictionary coercion / cast a second pass (though likely not until tomorrow, Thursday, US Eastern time)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorgecarleitao -- here is a proposed alternate approach: #8400

/// to cast the expression to the target
/// [arrow::datatypes::DataType] then an error will occur at
/// runtime.
pub fn cast_to(&self, cast_to_type: &DataType, schema: &Schema) -> Result<Expr> {
let this_type = self.get_type(schema)?;
if this_type == *cast_to_type {
Ok(self.clone())
} else if can_coerce_from(cast_to_type, &this_type) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR itself is pretty simple -- it just deletes code :)

} else {
Ok(Expr::Cast {
expr: Box::new(self.clone()),
data_type: cast_to_type.clone(),
})
} else {
Err(ExecutionError::General(format!(
"Cannot automatically convert {:?} to {:?}",
this_type, cast_to_type
)))
}
}

Expand Down
82 changes: 77 additions & 5 deletions rust/datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use std::sync::Arc;
extern crate arrow;
extern crate datafusion;

use arrow::record_batch::RecordBatch;
use arrow::{array::*, datatypes::TimeUnit};
use arrow::{datatypes::Int32Type, record_batch::RecordBatch};
use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
util::pretty::array_value_to_string,
Expand Down Expand Up @@ -918,14 +918,20 @@ fn register_alltypes_parquet(ctx: &mut ExecutionContext) {
/// Execute query and return result set as 2-d table of Vecs
/// `result[row][column]`
async fn execute(ctx: &mut ExecutionContext, sql: &str) -> Vec<Vec<String>> {
let plan = ctx.create_logical_plan(&sql).unwrap();
let msg = format!("Creating logical plan for '{}'", sql);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are some changes to improve the test error reporting (rather than a straight up panic, some diagnostic information is printed as well)

let plan = ctx.create_logical_plan(&sql).expect(&msg);
let logical_schema = plan.schema();
let plan = ctx.optimize(&plan).unwrap();

let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan);
let plan = ctx.optimize(&plan).expect(&msg);
let optimized_logical_schema = plan.schema();
let plan = ctx.create_physical_plan(&plan).unwrap();

let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
let plan = ctx.create_physical_plan(&plan).expect(&msg);
let physical_schema = plan.schema();

let results = ctx.collect(plan).await.unwrap();
let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
let results = ctx.collect(plan).await.expect(&msg);

assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref());
assert_eq!(logical_schema.as_ref(), physical_schema.as_ref());
Expand Down Expand Up @@ -1200,3 +1206,69 @@ async fn query_is_not_null() -> Result<()> {
assert_eq!(expected, actual);
Ok(())
}

#[tokio::test]
async fn query_on_string_dictionary() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the end to end testcast I am working on for DictionaryArray support -- with this PR we can do basic filtering in DataFusion. There are a few more PRs needed to complete expressions and aggregation, but they are coming.

// Test to ensure DataFusion can operate on dictionary types
// Use StringDictionary (32 bit indexes = keys)
let field_type =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
let schema = Arc::new(Schema::new(vec![Field::new("d1", field_type, true)]));

let keys_builder = PrimitiveBuilder::<Int32Type>::new(10);
let values_builder = StringBuilder::new(10);
let mut builder = StringDictionaryBuilder::new(keys_builder, values_builder);

builder.append("one")?;
builder.append_null()?;
builder.append("three")?;
let array = Arc::new(builder.finish());

let data = RecordBatch::try_new(schema.clone(), vec![array])?;

let table = MemTable::new(schema, vec![vec![data]])?;
let mut ctx = ExecutionContext::new();
ctx.register_table("test", Box::new(table));

// Basic SELECT
let sql = "SELECT * FROM test";
let actual = execute(&mut ctx, sql).await;
let expected = vec![vec!["one"], vec!["NULL"], vec!["three"]];
assert_eq!(expected, actual);

// basic filtering
let sql = "SELECT * FROM test WHERE d1 IS NOT NULL";
let actual = execute(&mut ctx, sql).await;
let expected = vec![vec!["one"], vec!["three"]];
assert_eq!(expected, actual);

// The following queries are not yet supported

// // filtering with constant
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have PRs in the works to support these cases and I will uncomment them as I do so.

// let sql = "SELECT * FROM test WHERE d1 = 'three'";
// let actual = execute(&mut ctx, sql).await;
// let expected = vec![
// vec!["three"],
// ];
// assert_eq!(expected, actual);

// // Expression evaluation
// let sql = "SELECT concat(d1, '-foo') FROM test";
// let actual = execute(&mut ctx, sql).await;
// let expected = vec![
// vec!["one-foo"],
// vec!["NULL"],
// vec!["three-foo"],
// ];
// assert_eq!(expected, actual);

// // aggregation
// let sql = "SELECT COUNT(d1) FROM test";
// let actual = execute(&mut ctx, sql).await;
// let expected = vec![
// vec!["2"]
// ];
// assert_eq!(expected, actual);

Ok(())
}