Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/datafusion/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl FlightService for FlightServiceImpl {
}
}

fn to_tonic_err(e: &datafusion::error::ExecutionError) -> Status {
fn to_tonic_err(e: &datafusion::error::DataFusionError) -> Status {
Status::internal(format!("{:?}", e))
}

Expand Down
4 changes: 2 additions & 2 deletions rust/datafusion/src/datasource/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use std::string::String;
use std::sync::Arc;

use crate::datasource::TableProvider;
use crate::error::{ExecutionError, Result};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::csv::CsvExec;
pub use crate::physical_plan::csv::CsvReadOptions;
use crate::physical_plan::{common, ExecutionPlan};
Expand All @@ -62,7 +62,7 @@ impl CsvFile {
let mut filenames: Vec<String> = vec![];
common::build_file_list(path, &mut filenames, options.file_extension)?;
if filenames.is_empty() {
return Err(ExecutionError::General("No files found".to_string()));
return Err(DataFusionError::Plan("No files found".to_string()));
}
CsvExec::try_infer_schema(&filenames, &options)?
}
Expand Down
10 changes: 5 additions & 5 deletions rust/datafusion/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;

use crate::datasource::TableProvider;
use crate::error::{ExecutionError, Result};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::common;
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::ExecutionPlan;
Expand All @@ -49,7 +49,7 @@ impl MemTable {
batches: partitions,
})
} else {
Err(ExecutionError::General(
Err(DataFusionError::Plan(
"Mismatch between schema and batches".to_string(),
))
}
Expand Down Expand Up @@ -112,7 +112,7 @@ impl TableProvider for MemTable {
if *i < self.schema.fields().len() {
Ok(self.schema.field(*i).clone())
} else {
Err(ExecutionError::General(
Err(DataFusionError::Internal(
"Projection index out of range".to_string(),
))
}
Expand Down Expand Up @@ -217,7 +217,7 @@ mod tests {
let projection: Vec<usize> = vec![0, 4];

match provider.scan(&Some(projection), 1024) {
Err(ExecutionError::General(e)) => {
Err(DataFusionError::Internal(e)) => {
assert_eq!("\"Projection index out of range\"", format!("{:?}", e))
}
_ => assert!(false, "Scan should failed on invalid projection"),
Expand Down Expand Up @@ -250,7 +250,7 @@ mod tests {
)?;

match MemTable::new(schema2, vec![vec![batch]]) {
Err(ExecutionError::General(e)) => assert_eq!(
Err(DataFusionError::Plan(e)) => assert_eq!(
"\"Mismatch between schema and batches\"",
format!("{:?}", e)
),
Expand Down
105 changes: 49 additions & 56 deletions rust/datafusion/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,109 +19,102 @@

use std::error;
use std::fmt::{Display, Formatter};
use std::io::Error;
use std::io;
use std::result;

use arrow::error::ArrowError;
use parquet::errors::ParquetError;
use sqlparser::parser::ParserError;

/// Result type for operations that could result in an `ExecutionError`
pub type Result<T> = result::Result<T, ExecutionError>;
/// Result type for operations that could result in an [DataFusionError]
pub type Result<T> = result::Result<T, DataFusionError>;

/// DataFusion error
#[derive(Debug)]
#[allow(missing_docs)]
pub enum ExecutionError {
/// Wraps an error from the Arrow crate
pub enum DataFusionError {
/// Error returned by arrow.
ArrowError(ArrowError),
/// Wraps an error from the Parquet crate
ParquetError(ParquetError),
/// I/O error
IoError(Error),
/// SQL parser error
ParserError(ParserError),
/// General error
General(String),
/// Invalid column error
InvalidColumn(String),
/// Missing functionality
/// Error associated to I/O operations and associated traits.
IoError(io::Error),
/// Error returned when SQL is syntatically incorrect.
SQL(ParserError),
/// Error returned on a branch that we know it is possible
/// but to which we still have no implementation for.
/// Often, these errors are tracked in our issue tracker.
NotImplemented(String),
/// Internal error
InternalError(String),
/// Query engine execution error
ExecutionError(String),
/// Error returned as a consequence of an error in DataFusion.
/// This error should not happen in normal usage of DataFusion.
// DataFusions has internal invariants that we are unable to ask the compiler to check for us.
// This error is raised when one of those invariants is not verified during execution.
Internal(String),
/// This error happens whenever a plan is not valid. Examples include
/// impossible casts, schema inference not possible and non-unique column names.
Plan(String),
/// Error returned during execution of the query.
/// Examples include files not found, errors in parsing certain types.
Execution(String),
}

impl ExecutionError {
/// Wraps this `ExecutionError` in arrow's `ExternalError` variant.
impl DataFusionError {
/// Wraps this [DataFusionError] as an [Arrow::error::ArrowError].
pub fn into_arrow_external_error(self) -> ArrowError {
ArrowError::from_external_error(Box::new(self))
}
}

impl From<Error> for ExecutionError {
fn from(e: Error) -> Self {
ExecutionError::IoError(e)
impl From<io::Error> for DataFusionError {
fn from(e: io::Error) -> Self {
DataFusionError::IoError(e)
}
}

impl From<String> for ExecutionError {
fn from(e: String) -> Self {
ExecutionError::General(e)
}
}

impl From<&'static str> for ExecutionError {
fn from(e: &'static str) -> Self {
ExecutionError::General(e.to_string())
}
}

impl From<ArrowError> for ExecutionError {
impl From<ArrowError> for DataFusionError {
fn from(e: ArrowError) -> Self {
ExecutionError::ArrowError(e)
DataFusionError::ArrowError(e)
}
}

impl From<ParquetError> for ExecutionError {
impl From<ParquetError> for DataFusionError {
fn from(e: ParquetError) -> Self {
ExecutionError::ParquetError(e)
DataFusionError::ParquetError(e)
}
}

impl From<ParserError> for ExecutionError {
impl From<ParserError> for DataFusionError {
fn from(e: ParserError) -> Self {
ExecutionError::ParserError(e)
DataFusionError::SQL(e)
}
}

impl Display for ExecutionError {
impl Display for DataFusionError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match *self {
ExecutionError::ArrowError(ref desc) => write!(f, "Arrow error: {}", desc),
ExecutionError::ParquetError(ref desc) => {
DataFusionError::ArrowError(ref desc) => write!(f, "Arrow error: {}", desc),
DataFusionError::ParquetError(ref desc) => {
write!(f, "Parquet error: {}", desc)
}
ExecutionError::IoError(ref desc) => write!(f, "IO error: {}", desc),
ExecutionError::ParserError(ref desc) => {
write!(f, "Parser error: {:?}", desc)
DataFusionError::IoError(ref desc) => write!(f, "IO error: {}", desc),
DataFusionError::SQL(ref desc) => {
write!(f, "SQL error: {:?}", desc)
}
ExecutionError::General(ref desc) => write!(f, "General error: {}", desc),
ExecutionError::InvalidColumn(ref desc) => {
write!(f, "Invalid column error: {}", desc)
DataFusionError::NotImplemented(ref desc) => {
write!(f, "This feature is not implemented: {}", desc)
}
ExecutionError::NotImplemented(ref desc) => {
write!(f, "NotImplemented: {}", desc)
DataFusionError::Internal(ref desc) => {
write!(f, "Internal error: {}. This was likely caused by a bug in DataFusion's \
code and we would welcome that you file an bug report in our issue tracker", desc)
}
ExecutionError::InternalError(ref desc) => {
write!(f, "Internal error: {}", desc)
DataFusionError::Plan(ref desc) => {
write!(f, "Error during planning: {}", desc)
}
ExecutionError::ExecutionError(ref desc) => {
DataFusionError::Execution(ref desc) => {
write!(f, "Execution error: {}", desc)
}
}
}
}

impl error::Error for ExecutionError {}
impl error::Error for DataFusionError {}
16 changes: 8 additions & 8 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arrow::record_batch::RecordBatch;
use crate::datasource::csv::CsvFile;
use crate::datasource::parquet::ParquetTable;
use crate::datasource::TableProvider;
use crate::error::{ExecutionError, Result};
use crate::error::{DataFusionError, Result};
use crate::execution::dataframe_impl::DataFrameImpl;
use crate::logical_plan::{FunctionRegistry, LogicalPlan, LogicalPlanBuilder};
use crate::optimizer::filter_push_down::FilterPushDown;
Expand Down Expand Up @@ -148,7 +148,7 @@ impl ExecutionContext {
let plan = LogicalPlanBuilder::empty().build()?;
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}
_ => Err(ExecutionError::ExecutionError(format!(
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported file type {:?}.",
file_type
))),
Expand All @@ -164,7 +164,7 @@ impl ExecutionContext {
let statements = DFParser::parse_sql(sql)?;

if statements.len() != 1 {
return Err(ExecutionError::NotImplemented(format!(
return Err(DataFusionError::NotImplemented(format!(
"The context currently only supports a single SQL statement",
)));
}
Expand Down Expand Up @@ -288,7 +288,7 @@ impl ExecutionContext {
&LogicalPlanBuilder::from(&table_scan).build()?,
)))
}
_ => Err(ExecutionError::General(format!(
_ => Err(DataFusionError::Plan(format!(
"No table named '{}'",
table_name
))),
Expand Down Expand Up @@ -364,7 +364,7 @@ impl ExecutionContext {
.map(|batch| writer.write(&batch?))
.try_collect()
.await
.map_err(|e| ExecutionError::from(e))?;
.map_err(|e| DataFusionError::from(e))?;
}
Ok(())
}
Expand Down Expand Up @@ -501,7 +501,7 @@ impl FunctionRegistry for ExecutionContextState {
fn udf(&self, name: &str) -> Result<&ScalarUDF> {
let result = self.scalar_functions.get(name);
if result.is_none() {
Err(ExecutionError::General(
Err(DataFusionError::Plan(
format!("There is no UDF named \"{}\" in the registry", name).to_string(),
))
} else {
Expand All @@ -512,7 +512,7 @@ impl FunctionRegistry for ExecutionContextState {
fn udaf(&self, name: &str) -> Result<&AggregateUDF> {
let result = self.aggregate_functions.get(name);
if result.is_none() {
Err(ExecutionError::General(
Err(DataFusionError::Plan(
format!("There is no UDAF named \"{}\" in the registry", name)
.to_string(),
))
Expand Down Expand Up @@ -1414,7 +1414,7 @@ mod tests {
_logical_plan: &LogicalPlan,
_ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>> {
Err(ExecutionError::NotImplemented(
Err(DataFusionError::NotImplemented(
"query not supported".to_string(),
))
}
Expand Down
24 changes: 12 additions & 12 deletions rust/datafusion/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arrow::{

use crate::datasource::parquet::ParquetTable;
use crate::datasource::TableProvider;
use crate::error::{ExecutionError, Result};
use crate::error::{DataFusionError, Result};
use crate::{
datasource::csv::{CsvFile, CsvReadOptions},
physical_plan::udaf::AggregateUDF,
Expand Down Expand Up @@ -115,7 +115,7 @@ fn create_name(e: &Expr, input_schema: &Schema) -> Result<String> {
}
Ok(format!("{}({})", fun.name, names.join(",")))
}
other => Err(ExecutionError::NotImplemented(format!(
other => Err(DataFusionError::NotImplemented(format!(
"Physical plan does not support logical expression {:?}",
other
))),
Expand Down Expand Up @@ -275,7 +275,7 @@ impl Expr {
&right.get_type(schema)?,
),
Expr::Sort { ref expr, .. } => expr.get_type(schema),
Expr::Wildcard => Err(ExecutionError::General(
Expr::Wildcard => Err(DataFusionError::Internal(
"Wildcard expressions are not valid in a logical query plan".to_owned(),
)),
Expr::Nested(e) => e.get_type(schema),
Expand Down Expand Up @@ -309,7 +309,7 @@ impl Expr {
} => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
Expr::Sort { ref expr, .. } => expr.nullable(input_schema),
Expr::Nested(e) => e.nullable(input_schema),
Expr::Wildcard => Err(ExecutionError::General(
Expr::Wildcard => Err(DataFusionError::Internal(
"Wildcard expressions are not valid in a logical query plan".to_owned(),
)),
}
Expand Down Expand Up @@ -347,7 +347,7 @@ impl Expr {
data_type: cast_to_type.clone(),
})
} else {
Err(ExecutionError::General(format!(
Err(DataFusionError::Plan(format!(
"Cannot automatically convert {:?} to {:?}",
this_type, cast_to_type
)))
Expand Down Expand Up @@ -1270,7 +1270,7 @@ fn validate_unique_names(
Ok(())
},
Some((existing_position, existing_expr)) => {
Err(ExecutionError::General(
Err(DataFusionError::Plan(
format!("{} require unique expression names \
but the expression \"{:?}\" at position {} and \"{:?}\" \
at position {} have the same name. Consider aliasing (\"AS\") one of them.",
Expand Down Expand Up @@ -1445,14 +1445,14 @@ mod tests {
.project(vec![col("id"), col("first_name").alias("id")]);

match plan {
Err(ExecutionError::General(e)) => {
Err(DataFusionError::Plan(e)) => {
assert_eq!(e, "Projections require unique expression names \
but the expression \"#id\" at position 0 and \"#first_name AS id\" at \
position 1 have the same name. Consider aliasing (\"AS\") one of them.");
Ok(())
}
_ => Err(ExecutionError::General(
"Plan should have returned an ExecutionError::General".to_string(),
_ => Err(DataFusionError::Plan(
"Plan should have returned an DataFusionError::Plan".to_string(),
)),
}
}
Expand All @@ -1469,14 +1469,14 @@ mod tests {
.aggregate(vec![col("state")], vec![sum(col("salary")).alias("state")]);

match plan {
Err(ExecutionError::General(e)) => {
Err(DataFusionError::Plan(e)) => {
assert_eq!(e, "Aggregations require unique expression names \
but the expression \"#state\" at position 0 and \"SUM(#salary) AS state\" at \
position 1 have the same name. Consider aliasing (\"AS\") one of them.");
Ok(())
}
_ => Err(ExecutionError::General(
"Plan should have returned an ExecutionError::General".to_string(),
_ => Err(DataFusionError::Plan(
"Plan should have returned an DataFusionError::Plan".to_string(),
)),
}
}
Expand Down
Loading