diff --git a/dask_planner/src/sql/exceptions.rs b/dask_planner/src/sql/exceptions.rs index e53aeb5b4..567225f4f 100644 --- a/dask_planner/src/sql/exceptions.rs +++ b/dask_planner/src/sql/exceptions.rs @@ -1,3 +1,8 @@ -use pyo3::create_exception; +use datafusion::error::DataFusionError; +use pyo3::{create_exception, PyErr}; create_exception!(rust, ParsingException, pyo3::exceptions::PyException); + +pub fn py_type_err(e: DataFusionError) -> PyErr { + PyErr::new::(format!("{:?}", e)) +} diff --git a/dask_planner/src/sql/logical.rs b/dask_planner/src/sql/logical.rs index 2359afdf2..bb4ff53b0 100644 --- a/dask_planner/src/sql/logical.rs +++ b/dask_planner/src/sql/logical.rs @@ -10,8 +10,11 @@ pub mod projection; pub use datafusion_expr::LogicalPlan; +use datafusion::common::Result; use datafusion::prelude::Column; +use pyo3::ffi::Py_FatalError; +use crate::sql::exceptions::py_type_err; use pyo3::prelude::*; #[pyclass(name = "LogicalPlan", module = "dask_planner", subclass)] @@ -140,19 +143,15 @@ impl PyLogicalPlan { } #[pyo3(name = "getRowType")] - pub fn row_type(&self) -> RelDataType { - let fields: &Vec = self.original_plan.schema().fields(); - let mut rel_fields: Vec = Vec::new(); - for i in 0..fields.len() { - rel_fields.push( - RelDataTypeField::from( - fields[i].clone(), - self.original_plan.schema().as_ref().clone(), - ) - .unwrap(), - ); - } - RelDataType::new(false, rel_fields) + pub fn row_type(&self) -> PyResult { + let schema = self.original_plan.schema(); + let mut rel_fields: Vec = schema + .fields() + .iter() + .map(|f| RelDataTypeField::from(f, schema.as_ref())) + .collect::>>() + .map_err(|e| py_type_err(e))?; + Ok(RelDataType::new(false, rel_fields)) } } diff --git a/dask_planner/src/sql/types/rel_data_type_field.rs b/dask_planner/src/sql/types/rel_data_type_field.rs index 754b93f42..befee19b8 100644 --- a/dask_planner/src/sql/types/rel_data_type_field.rs +++ b/dask_planner/src/sql/types/rel_data_type_field.rs @@ -1,7 +1,7 @@ use crate::sql::types::DaskTypeMap; use crate::sql::types::SqlTypeName; -use datafusion::error::DataFusionError; +use datafusion::error::{DataFusionError, Result}; use datafusion::logical_plan::{DFField, DFSchema}; use std::fmt; @@ -19,7 +19,7 @@ pub struct RelDataTypeField { // Functions that should not be presented to Python are placed here impl RelDataTypeField { - pub fn from(field: DFField, schema: DFSchema) -> Result { + pub fn from(field: &DFField, schema: &DFSchema) -> Result { Ok(RelDataTypeField { name: field.name().clone(), data_type: DaskTypeMap {