Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix missing dict attribute bug #3

Open
wants to merge 4 commits into
base: ceresdb-2023-05
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,12 @@ pub trait ExprSchema: std::fmt::Debug {

/// What is the datatype of this column?
fn data_type(&self, col: &Column) -> Result<&DataType>;

/// Is this column reference dict_is_ordered?
fn dict_is_ordered(&self, col: &Column) -> Result<bool>;

/// What is the dict_id of this column?
fn dict_id(&self, col: &Column) -> Result<i64>;
}

// Implement `ExprSchema` for `Arc<DFSchema>`
Expand All @@ -597,6 +603,14 @@ impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
fn data_type(&self, col: &Column) -> Result<&DataType> {
self.as_ref().data_type(col)
}

fn dict_is_ordered(&self, col: &Column) -> Result<bool> {
self.as_ref().dict_is_ordered(col)
}

fn dict_id(&self, col: &Column) -> Result<i64> {
self.as_ref().dict_id(col)
}
}

impl ExprSchema for DFSchema {
Expand All @@ -607,6 +621,20 @@ impl ExprSchema for DFSchema {
fn data_type(&self, col: &Column) -> Result<&DataType> {
Ok(self.field_from_column(col)?.data_type())
}

fn dict_is_ordered(&self, col: &Column) -> Result<bool> {
match self.field_from_column(col)?.field().dict_is_ordered() {
Some(dict_id_ordered) => Ok(dict_id_ordered),
_ => Ok(false),
}
}

fn dict_id(&self, col: &Column) -> Result<i64> {
match self.field_from_column(col)?.field().dict_id() {
Some(dict_id_ordered) => Ok(dict_id_ordered),
_ => Ok(0),
}
}
}

/// DFField wraps an Arrow field and adds an optional qualifier
Expand Down Expand Up @@ -639,6 +667,46 @@ impl DFField {
field: Arc::new(Field::new(name, data_type, nullable)),
}
}
/// Creates a new `DFField` with dict
pub fn new_dict<R: Into<OwnedTableReference>>(
qualifier: Option<R>,
name: &str,
data_type: DataType,
nullable: bool,
dict_id: i64,
dict_is_ordered: bool,
) -> Self {
DFField {
qualifier: qualifier.map(|s| s.into()),
field: Arc::new(Field::new_dict(
name,
data_type,
nullable,
dict_id,
dict_is_ordered,
)),
}
}

/// Convenience method for creating new `DFField` without a qualifier with dict type
pub fn new_unqualified_dict(
name: &str,
data_type: DataType,
nullable: bool,
dict_id: i64,
dict_is_ordered: bool,
) -> Self {
DFField {
qualifier: None,
field: Arc::new(Field::new_dict(
name,
data_type,
nullable,
dict_id,
dict_is_ordered,
)),
}
}

/// Create a qualified field from an existing Arrow field
pub fn from_qualified<'a>(
Expand Down
120 changes: 108 additions & 12 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ pub trait ExprSchemable {

/// cast to a type with respect to a schema
fn cast_to<S: ExprSchema>(self, cast_to_type: &DataType, schema: &S) -> Result<Expr>;

/// given a schema, return the dict id of the expr
fn get_dict_id<S: ExprSchema>(&self, schema: &S) -> Result<i64>;

/// given a schema, return the dict_is_ordered of the expr
fn dict_is_ordered<S: ExprSchema>(&self, input_schema: &S) -> Result<bool>;
}

impl ExprSchemable for Expr {
Expand Down Expand Up @@ -262,23 +268,58 @@ impl ExprSchemable for Expr {
}
}

fn dict_is_ordered<S: ExprSchema>(&self, input_schema: &S) -> Result<bool> {
match self {
// TODO Handle more types
Expr::Column(c) => input_schema.dict_is_ordered(c),
_ => Ok(false),
}
}

fn get_dict_id<S: ExprSchema>(&self, schema: &S) -> Result<i64> {
match self {
// TODO Handle more types
Expr::Column(c) => schema.dict_id(c),
_ => Ok(0),
}
}

/// Returns a [arrow::datatypes::Field] compatible with this expression.
///
/// So for example, a projected expression `col(c1) + col(c2)` is
/// placed in an output field **named** col("c1 + c2")
fn to_field(&self, input_schema: &DFSchema) -> Result<DFField> {
match self {
Expr::Column(c) => Ok(DFField::new(
c.relation.clone(),
&c.name,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)),
_ => Ok(DFField::new_unqualified(
&self.display_name()?,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)),
Expr::Column(c) => Ok(match self.get_type(input_schema)? {
DataType::Dictionary(_, _) => DFField::new_dict(
c.relation.clone(),
&c.name,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
self.get_dict_id(input_schema)?,
self.dict_is_ordered(input_schema)?,
),
_ => DFField::new(
c.relation.clone(),
&c.name,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
),
}),
_ => Ok(match self.get_type(input_schema)? {
DataType::Dictionary(_, _) => DFField::new_unqualified_dict(
&self.display_name()?,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
self.get_dict_id(input_schema)?,
self.dict_is_ordered(input_schema)?,
),
_ => DFField::new_unqualified(
&self.display_name()?,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
),
}),
}
}

Expand Down Expand Up @@ -347,11 +388,58 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result<Subq

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use super::*;
use crate::{col, lit};
use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, Field};
use datafusion_common::Column;

#[test]
fn expr_with_dictionary_to_schema() {
let fields = vec![
Field::new_dict(
"dictionary_column1",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
1,
false,
),
Field::new_dict(
"dictionary_column2",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
2,
true,
),
];
let mut dffield = vec![];
for i in fields {
dffield.push(DFField::from(i));
}
let dfschema = DFSchema::new_with_metadata(
dffield.clone(),
HashMap::<String, String>::new(),
)
.unwrap();
let expr = vec![col("dictionary_column1"), col("dictionary_column2")];
for i in 0..dffield.len() {
assert_eq!(expr[i].to_field(&dfschema).unwrap(), dffield[i]);
assert_eq!(
expr[i].to_field(&dfschema).unwrap().field().dict_id(),
dffield[i].field().dict_id()
);
assert_eq!(
expr[i]
.to_field(&dfschema)
.unwrap()
.field()
.dict_is_ordered(),
dffield[i].field().dict_is_ordered()
);
}
}

#[test]
fn expr_schema_nullability() {
let expr = col("foo").eq(lit(1));
Expand Down Expand Up @@ -404,5 +492,13 @@ mod tests {
fn data_type(&self, _col: &Column) -> Result<&DataType> {
Ok(&self.data_type)
}

fn dict_id(&self, _col: &Column) -> Result<i64> {
Ok(0)
}

fn dict_is_ordered(&self, _col: &Column) -> Result<bool> {
Ok(false)
}
}
}