Skip to content
29 changes: 27 additions & 2 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::execution::dataframe_impl::DataFrameImpl;
use crate::logical_plan::{
FunctionRegistry, LogicalPlan, LogicalPlanBuilder, ToDFSchema,
};
use crate::optimizer::constant_folding::ConstantFolding;
use crate::optimizer::filter_push_down::FilterPushDown;
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::projection_push_down::ProjectionPushDown;
Expand Down Expand Up @@ -512,6 +513,7 @@ impl ExecutionConfig {
concurrency: num_cpus::get(),
batch_size: 32768,
optimizers: vec![
Arc::new(ConstantFolding::new()),
Arc::new(ProjectionPushDown::new()),
Arc::new(FilterPushDown::new()),
Arc::new(HashBuildProbeOrder::new()),
Expand Down Expand Up @@ -834,7 +836,7 @@ mod tests {
projected_schema,
..
} => {
assert_eq!(source.schema().fields().len(), 2);
assert_eq!(source.schema().fields().len(), 3);
assert_eq!(projected_schema.fields().len(), 1);
}
_ => panic!("input to projection should be TableScan"),
Expand Down Expand Up @@ -1146,6 +1148,28 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn boolean_literal() -> Result<()> {
let results =
execute("SELECT c1, c3 FROM test WHERE c1 > 2 AND c3 = true", 4).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+----+------+",
"| c1 | c3 |",
"+----+------+",
"| 3 | true |",
"| 3 | true |",
"| 3 | true |",
"| 3 | true |",
"| 3 | true |",
"+----+------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn aggregate_grouped_empty() -> Result<()> {
let results =
Expand Down Expand Up @@ -1953,6 +1977,7 @@ mod tests {
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::UInt32, false),
Field::new("c2", DataType::UInt64, false),
Field::new("c3", DataType::Boolean, false),
]));

// generate a partitioned file
Expand All @@ -1963,7 +1988,7 @@ mod tests {

// generate some data
for i in 0..=10 {
let data = format!("{},{}\n", partition, i);
let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
file.write_all(data.as_bytes())?;
}
}
Expand Down
6 changes: 6 additions & 0 deletions rust/datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,12 @@ impl Literal for String {
}
}

impl Literal for ScalarValue {
fn lit(&self) -> Expr {
Expr::Literal(self.clone())
}
}

macro_rules! make_literal {
($TYPE:ty, $SCALAR:ident) => {
#[allow(missing_docs)]
Expand Down
34 changes: 34 additions & 0 deletions rust/datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,40 @@ impl LogicalPlan {
}
}

/// Get a vector of references to all schemas in every node of the logical plan
pub fn all_schemas(&self) -> Vec<&DFSchemaRef> {
match self {
LogicalPlan::TableScan {
projected_schema, ..
} => vec![&projected_schema],
LogicalPlan::Aggregate { input, schema, .. }
| LogicalPlan::Projection { input, schema, .. } => {
let mut schemas = input.all_schemas();
schemas.insert(0, &schema);
schemas
}
LogicalPlan::Join {
left,
right,
schema,
..
} => {
let mut schemas = left.all_schemas();
schemas.extend(right.all_schemas());
schemas.insert(0, &schema);
schemas
}
LogicalPlan::Extension { node } => vec![&node.schema()],
LogicalPlan::Explain { schema, .. }
| LogicalPlan::EmptyRelation { schema, .. }
| LogicalPlan::CreateExternalTable { schema, .. } => vec![&schema],
LogicalPlan::Limit { input, .. }
| LogicalPlan::Repartition { input, .. }
| LogicalPlan::Sort { input, .. }
| LogicalPlan::Filter { input, .. } => input.all_schemas(),
}
}

/// Returns the (fixed) output schema for explain plans
pub fn explain_schema() -> SchemaRef {
SchemaRef::new(Schema::new(vec![
Expand Down
Loading