Skip to content

Commit

Permalink
Empty relation bindings (#208)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdye64 committed Feb 22, 2023
1 parent 75dea3d commit 4869a86
Show file tree
Hide file tree
Showing 14 changed files with 110 additions and 22 deletions.
1 change: 0 additions & 1 deletion datafusion/cudf.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def register_parquet(self, name, path):
self.datafusion_ctx.register_parquet(name, path)

def to_cudf_expr(self, expr):

# get Python wrapper for logical expression
expr = expr.to_variant()

Expand Down
6 changes: 3 additions & 3 deletions datafusion/tests/test_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test_projection(test_ctx):
assert col3.op() == "<"
assert isinstance(col3.right().to_variant(), Literal)

plan = plan.input().to_variant()
plan = plan.input()[0].to_variant()
assert isinstance(plan, TableScan)


Expand All @@ -71,7 +71,7 @@ def test_filter(test_ctx):
plan = plan.to_variant()
assert isinstance(plan, Projection)

plan = plan.input().to_variant()
plan = plan.input()[0].to_variant()
assert isinstance(plan, Filter)


Expand All @@ -90,7 +90,7 @@ def test_aggregate_query(test_ctx):
projection = plan.to_variant()
assert isinstance(projection, Projection)

aggregate = projection.input().to_variant()
aggregate = projection.input()[0].to_variant()
assert isinstance(aggregate, Aggregate)

col1 = aggregate.group_by_exprs()[0].to_variant()
Expand Down
5 changes: 5 additions & 0 deletions src/common/df_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ impl PyDFSchema {
schema: Arc::new(DFSchema::empty()),
})
}

#[pyo3(name = "field_names")]
fn py_field_names(&self) -> PyResult<Vec<String>> {
Ok(self.schema.field_names())
}
}
2 changes: 2 additions & 0 deletions src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub mod aggregate_expr;
pub mod analyze;
pub mod binary_expr;
pub mod column;
pub mod empty_relation;
pub mod filter;
pub mod limit;
pub mod literal;
Expand Down Expand Up @@ -185,5 +186,6 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
m.add_class::<aggregate::PyAggregate>()?;
m.add_class::<sort::PySort>()?;
m.add_class::<analyze::PyAnalyze>()?;
m.add_class::<empty_relation::PyEmptyRelation>()?;
Ok(())
}
6 changes: 3 additions & 3 deletions src/expr/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ impl PyAggregate {
}

// Retrieves the input `LogicalPlan` to this `Aggregate` node
fn input(&self) -> PyLogicalPlan {
PyLogicalPlan::from((*self.aggregate.input).clone())
fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
Ok(Self::inputs(self))
}

// Resulting Schema for this `Aggregate` node instance
Expand All @@ -100,7 +100,7 @@ impl PyAggregate {
}

impl LogicalNode for PyAggregate {
fn input(&self) -> Vec<PyLogicalPlan> {
fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![PyLogicalPlan::from((*self.aggregate.input).clone())]
}
}
6 changes: 5 additions & 1 deletion src/expr/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ impl PyAnalyze {
Ok(self.analyze.verbose)
}

fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
Ok(Self::inputs(self))
}

/// Resulting Schema for this `Analyze` node instance
fn schema(&self) -> PyResult<PyDFSchema> {
Ok((*self.analyze.schema).clone().into())
Expand All @@ -70,7 +74,7 @@ impl PyAnalyze {
}

impl LogicalNode for PyAnalyze {
fn input(&self) -> Vec<PyLogicalPlan> {
fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![PyLogicalPlan::from((*self.analyze.input).clone())]
}
}
72 changes: 72 additions & 0 deletions src/expr/empty_relation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::common::df_schema::PyDFSchema;
use datafusion_expr::EmptyRelation;
use pyo3::prelude::*;
use std::fmt::{self, Display, Formatter};

#[pyclass(name = "EmptyRelation", module = "datafusion.expr", subclass)]
#[derive(Clone)]
pub struct PyEmptyRelation {
empty: EmptyRelation,
}

impl From<PyEmptyRelation> for EmptyRelation {
fn from(empty_relation: PyEmptyRelation) -> Self {
empty_relation.empty
}
}

impl From<EmptyRelation> for PyEmptyRelation {
fn from(empty: EmptyRelation) -> PyEmptyRelation {
PyEmptyRelation { empty }
}
}

impl Display for PyEmptyRelation {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"Empty Relation
\nProduce One Row: {:?}
\nSchema: {:?}",
&self.empty.produce_one_row, &self.empty.schema
)
}
}

#[pymethods]
impl PyEmptyRelation {
fn produce_one_row(&self) -> PyResult<bool> {
Ok(self.empty.produce_one_row)
}

/// Resulting Schema for this `EmptyRelation` node instance
fn schema(&self) -> PyResult<PyDFSchema> {
Ok((*self.empty.schema).clone().into())
}

/// Get a String representation of this column
fn __repr__(&self) -> String {
format!("{}", self)
}

fn __name__(&self) -> PyResult<String> {
Ok("EmptyRelation".to_string())
}
}
6 changes: 3 additions & 3 deletions src/expr/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ impl PyFilter {
}

/// Retrieves the input `LogicalPlan` to this `Filter` node
fn input(&self) -> PyLogicalPlan {
PyLogicalPlan::from((*self.filter.input).clone())
fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
Ok(Self::inputs(self))
}

/// Resulting Schema for this `Filter` node instance
Expand All @@ -77,7 +77,7 @@ impl PyFilter {
}

impl LogicalNode for PyFilter {
fn input(&self) -> Vec<PyLogicalPlan> {
fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![PyLogicalPlan::from((*self.filter.input).clone())]
}
}
6 changes: 3 additions & 3 deletions src/expr/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ impl PyLimit {
}

/// Retrieves the input `LogicalPlan` to this `Limit` node
fn input(&self) -> PyLogicalPlan {
PyLogicalPlan::from((*self.limit.input).clone())
fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
Ok(Self::inputs(self))
}

/// Resulting Schema for this `Limit` node instance
Expand All @@ -82,7 +82,7 @@ impl PyLimit {
}

impl LogicalNode for PyLimit {
fn input(&self) -> Vec<PyLogicalPlan> {
fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![PyLogicalPlan::from((*self.limit.input).clone())]
}
}
2 changes: 1 addition & 1 deletion src/expr/logical_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ use crate::sql::logical::PyLogicalPlan;
/// any "node" shares these common traits in common.
pub trait LogicalNode {
/// The input plan to the current logical node instance.
fn input(&self) -> Vec<PyLogicalPlan>;
fn inputs(&self) -> Vec<PyLogicalPlan>;
}
10 changes: 7 additions & 3 deletions src/expr/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ impl PyProjection {
}

/// Retrieves the input `LogicalPlan` to this `Projection` node
fn input(&self) -> PyLogicalPlan {
PyLogicalPlan::from((*self.projection.input).clone())
fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
Ok(Self::inputs(self))
}

/// Resulting Schema for this `Projection` node instance
Expand All @@ -86,10 +86,14 @@ impl PyProjection {
fn __repr__(&self) -> PyResult<String> {
Ok(format!("Projection({})", self))
}

fn __name__(&self) -> PyResult<String> {
Ok("Projection".to_string())
}
}

impl LogicalNode for PyProjection {
fn input(&self) -> Vec<PyLogicalPlan> {
fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![PyLogicalPlan::from((*self.projection.input).clone())]
}
}
6 changes: 3 additions & 3 deletions src/expr/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ impl PySort {
}

/// Retrieves the input `LogicalPlan` to this `Sort` node
fn input(&self) -> PyLogicalPlan {
PyLogicalPlan::from((*self.sort.input).clone())
fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
Ok(Self::inputs(self))
}

/// Resulting Schema for this `Sort` node instance
Expand All @@ -88,7 +88,7 @@ impl PySort {
}

impl LogicalNode for PySort {
fn input(&self) -> Vec<PyLogicalPlan> {
fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![PyLogicalPlan::from((*self.sort.input).clone())]
}
}
2 changes: 1 addition & 1 deletion src/expr/table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl PyTableScan {
}

impl LogicalNode for PyTableScan {
fn input(&self) -> Vec<PyLogicalPlan> {
fn inputs(&self) -> Vec<PyLogicalPlan> {
// table scans are leaf nodes and do not have inputs
vec![]
}
Expand Down
2 changes: 2 additions & 0 deletions src/sql/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use crate::errors::py_runtime_err;
use crate::expr::aggregate::PyAggregate;
use crate::expr::analyze::PyAnalyze;
use crate::expr::empty_relation::PyEmptyRelation;
use crate::expr::filter::PyFilter;
use crate::expr::limit::PyLimit;
use crate::expr::projection::PyProjection;
Expand Down Expand Up @@ -54,6 +55,7 @@ impl PyLogicalPlan {
Python::with_gil(|_| match self.plan.as_ref() {
LogicalPlan::Aggregate(plan) => Ok(PyAggregate::from(plan.clone()).into_py(py)),
LogicalPlan::Analyze(plan) => Ok(PyAnalyze::from(plan.clone()).into_py(py)),
LogicalPlan::EmptyRelation(plan) => Ok(PyEmptyRelation::from(plan.clone()).into_py(py)),
LogicalPlan::Filter(plan) => Ok(PyFilter::from(plan.clone()).into_py(py)),
LogicalPlan::Limit(plan) => Ok(PyLimit::from(plan.clone()).into_py(py)),
LogicalPlan::Projection(plan) => Ok(PyProjection::from(plan.clone()).into_py(py)),
Expand Down

0 comments on commit 4869a86

Please sign in to comment.