Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
103 commits
Select commit Hold shift + click to select a range
b1900cf
Condition for BinaryExpr, filter, input_ref, rexcall, and rexliteral
jdye64 Mar 26, 2022
1e48597
Updates for test_filter
jdye64 Mar 31, 2022
fd41a8c
more of test_filter.py working with the exception of some date pytests
jdye64 Mar 31, 2022
682c009
Add workflow to keep datafusion dev branch up to date (#440)
charlesbluca Mar 25, 2022
ab69dd8
Include setuptools-rust in conda build recipie, in host and run
jdye64 Apr 13, 2022
ce4c31e
Remove PyArrow dependency
jdye64 Apr 20, 2022
8785b8c
rebase with datafusion-sql-planner
jdye64 Apr 21, 2022
3e45ab8
refactor changes that were inadvertent during rebase
jdye64 Apr 21, 2022
1734b89
timestamp with loglca time zone
jdye64 Apr 21, 2022
ac7d9f6
Bump DataFusion version (#494)
andygrove Apr 21, 2022
cbf5db0
Include RelDataType work
jdye64 Apr 21, 2022
d9380a6
Include RelDataType work
jdye64 Apr 21, 2022
ad56fc2
Introduced SqlTypeName Enum in Rust and mappings for Python
jdye64 Apr 22, 2022
7b20e66
impl PyExpr.getIndex()
jdye64 Apr 22, 2022
7dd2017
add getRowType() for logical.rs
jdye64 Apr 22, 2022
984f523
Introduce DaskTypeMap for storing correlating SqlTypeName and DataTypes
jdye64 Apr 23, 2022
1405fea
use str values instead of Rust Enums, Python is unable to Hash the Ru…
jdye64 Apr 23, 2022
789aaad
linter changes, why did that work on my local pre-commit??
jdye64 Apr 23, 2022
652205e
linter changes, why did that work on my local pre-commit??
jdye64 Apr 23, 2022
5127f87
Convert final strs to SqlTypeName Enum
jdye64 Apr 24, 2022
cf568dc
removed a few print statements
jdye64 Apr 24, 2022
4fb640e
commit to share with colleague
jdye64 Apr 24, 2022
32127e5
updates
jdye64 Apr 25, 2022
f5e24fe
checkpoint
jdye64 Apr 25, 2022
11cf212
Temporarily disable conda run_test.py script since it uses features n…
jdye64 Apr 25, 2022
46dfb0a
formatting after upstream merge
jdye64 Apr 25, 2022
fa71674
expose fromString method for SqlTypeName to use Enums instead of stri…
jdye64 Apr 25, 2022
f6e86ca
expanded SqlTypeName from_string() support
jdye64 Apr 25, 2022
3d1a5ad
accept INT as INTEGER
jdye64 Apr 25, 2022
384e446
tests update
jdye64 Apr 25, 2022
199b9d2
checkpoint
jdye64 Apr 25, 2022
c9dffae
checkpoint
jdye64 Apr 27, 2022
c9aad43
Refactor PyExpr by removing From trait, and using recursion to expand…
jdye64 Apr 28, 2022
11100fa
skip test that uses create statement for gpuci
jdye64 Apr 28, 2022
643e85d
Basic DataFusion Select Functionality (#489)
jdye64 Apr 28, 2022
b36ef16
updates for expression
jdye64 Apr 28, 2022
5c94fbc
uncommented pytests
jdye64 Apr 28, 2022
bb461c8
uncommented pytests
jdye64 Apr 28, 2022
f65b1ab
code cleanup for review
jdye64 Apr 28, 2022
dc7553f
code cleanup for review
jdye64 Apr 28, 2022
f1dc0b2
Enabled more pytest that work now
jdye64 Apr 28, 2022
940e867
Enabled more pytest that work now
jdye64 Apr 28, 2022
6769ca0
Output Expression as String when BinaryExpr does not contain a named …
jdye64 Apr 29, 2022
c4ed9bd
Output Expression as String when BinaryExpr does not contain a named …
jdye64 Apr 29, 2022
05c5788
Disable 2 pytest that are causing gpuCI issues. They will be address …
jdye64 Apr 29, 2022
a33aa63
Handle Between operation for case-when
jdye64 Apr 29, 2022
20efd5c
adjust timestamp casting
jdye64 May 2, 2022
281baf7
merge with upstream
jdye64 May 6, 2022
d666bdd
merge with upstream/datafusion-sql-planner
jdye64 May 9, 2022
533f50a
Refactor projection _column_name() logic to the _column_name logic in…
jdye64 May 9, 2022
a42a133
removed println! statements
jdye64 May 9, 2022
dc12f5d
introduce join getCondition() logic for retrieving the combining Rex …
jdye64 May 10, 2022
9dce68a
merge with upstream
jdye64 May 10, 2022
10cd463
merge with upstream
jdye64 May 10, 2022
a1841c3
Updates from review
jdye64 May 11, 2022
3001943
Add Offset and point to repo with offset in datafusion
jdye64 May 11, 2022
7ec66da
Introduce offset
jdye64 May 12, 2022
b72917b
limit updates
jdye64 May 12, 2022
651c9ab
commit before upstream merge
jdye64 May 15, 2022
4e69813
merged with upstream/datafusion-sql-planner
jdye64 May 16, 2022
3219ad0
Code formatting
jdye64 May 16, 2022
5a88155
Merge with upstream
jdye64 May 16, 2022
23adefa
Merge with upstream
jdye64 May 16, 2022
bd94ccf
Merge remote-tracking branch 'upstream/datafusion-sql-planner' into d…
jdye64 May 17, 2022
bf91e8f
update Cargo.toml to use Arrow-DataFusion version with LIMIT logic
jdye64 May 17, 2022
3dc6a89
Bump DataFusion version to get changes around variant_name()
jdye64 May 18, 2022
08b38aa
Use map partitions for determining the offset
jdye64 May 19, 2022
7b52f41
Merge with upstream datafusion-crossjoin merge
jdye64 May 19, 2022
6638930
Added multiple LogicalPlan inputs for join conditions
jdye64 May 20, 2022
e24b97f
Merge with upstream LIMIT PR
jdye64 May 20, 2022
61bd864
Merge remote-tracking branch 'upstream/datafusion-sql-planner' into d…
jdye64 May 22, 2022
e3b0d2b
Merge with upstream
jdye64 May 23, 2022
0407c6f
Rename underlying DataContainer's DataFrame instance to match the col…
jdye64 May 23, 2022
af1c138
Adjust ColumnContainer mapping after join.py logic to entire the bake…
jdye64 May 23, 2022
8853765
Add enumerate to column_{i} generation string to ensure columns exist…
jdye64 May 24, 2022
2adc5ce
Adjust join schema logic to perform merge instead of join on rust sid…
jdye64 May 24, 2022
6005018
Handle DataFusion COUNT(UInt8(1)) as COUNT(*)
jdye64 May 24, 2022
f640e1d
commit before merge
jdye64 May 24, 2022
f0cc07b
merge with upstream datafusion-sql-planner
jdye64 May 24, 2022
3159645
Update function for gathering index of a expression
jdye64 May 24, 2022
ba8cec2
Update for review check
jdye64 May 25, 2022
a8fba46
Adjust RelDataType to retrieve fully qualified column names
jdye64 May 26, 2022
8a1a865
Adjust base.py to get fully qualified column name
jdye64 May 26, 2022
6e966b6
Enable passing pytests in test_join.py
jdye64 May 26, 2022
b9604cc
Adjust keys provided by getting backend column mapping name
jdye64 May 27, 2022
014fe68
Adjust output_col to not use the backend_column name for special rese…
jdye64 May 27, 2022
5b0dba3
uncomment cross join pytest which works now
jdye64 May 27, 2022
d17d859
Uncomment passing pytests in test_select.py
jdye64 May 27, 2022
805ec8a
Review updates
jdye64 May 28, 2022
7728bd4
Add back complex join case condition, not just cross join but 'comple…
jdye64 May 28, 2022
6f8d0d9
Enable DataFusion CBO logic
jdye64 May 31, 2022
dad9eb4
Disable EliminateFilter optimization rule
jdye64 May 31, 2022
adc0083
updates
jdye64 Jun 1, 2022
4101d27
upstream merge
jdye64 Jun 1, 2022
be7d502
Disable tests that hit CBO generated plan edge cases of yet to be imp…
jdye64 Jun 1, 2022
a006def
[REVIEW] - Modifiy sql.skip_optimize to use dask_config.get and remov…
jdye64 Jun 2, 2022
6ba6edb
[REVIEW] - change name of configuration from skip_optimize to optimize
jdye64 Jun 2, 2022
984c5bb
[REVIEW] - Add OptimizeException catch and raise statements back
jdye64 Jun 2, 2022
e59cd1e
Found issue where backend column names which are results of a single …
jdye64 Jun 3, 2022
4edb4b5
Remove SQL from OptimizationException
jdye64 Jun 3, 2022
06e76ed
Merge remote-tracking branch 'upstream/datafusion-sql-planner' into d…
jdye64 Jun 6, 2022
15115ab
Upstream merge and removed unused code imports
jdye64 Jun 7, 2022
da37517
skip tests that CBO plan reorganization causes missing features to be…
jdye64 Jun 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dask_planner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ fn rust(py: Python, m: &PyModule) -> PyResult<()> {
"DFParsingException",
py.get_type::<sql::exceptions::ParsingException>(),
)?;
m.add(
"DFOptimizationException",
py.get_type::<sql::exceptions::OptimizationException>(),
)?;

Ok(())
}
54 changes: 51 additions & 3 deletions dask_planner/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@ pub mod column;
pub mod exceptions;
pub mod function;
pub mod logical;
pub mod optimizer;
pub mod schema;
pub mod statement;
pub mod table;
pub mod types;

use crate::sql::exceptions::ParsingException;
use crate::sql::exceptions::{OptimizationException, ParsingException};

use datafusion::arrow::datatypes::{Field, Schema};
use datafusion::catalog::{ResolvedTableReference, TableReference};
use datafusion::datasource::TableProvider;
use datafusion::error::DataFusionError;
use datafusion::logical_expr::{
AggregateUDF, ScalarFunctionImplementation, ScalarUDF, TableSource,
};
use datafusion::logical_plan::{LogicalPlan, PlanVisitor};
use datafusion::sql::parser::DFParser;
use datafusion::sql::planner::{ContextProvider, SqlToRel};

use std::collections::HashMap;
use std::sync::Arc;

use crate::sql::table::DaskTableSource;
use pyo3::prelude::*;

/// DaskSQLContext is main interface used for interacting with DataFusion to
Expand Down Expand Up @@ -177,4 +177,52 @@ impl DaskSQLContext {
})
.map_err(|e| PyErr::new::<ParsingException, _>(format!("{}", e)))
}

/// Accepts an existing relational plan, `LogicalPlan`, and optimizes it
/// by applying a set of `optimizer` trait implementations against the
/// `LogicalPlan`
pub fn optimize_relational_algebra(
&self,
existing_plan: logical::PyLogicalPlan,
) -> PyResult<logical::PyLogicalPlan> {
// Certain queries cannot be optimized. Ex: `EXPLAIN SELECT * FROM test` simply return those plans as is
let mut visitor = OptimizablePlanVisitor {};

match existing_plan.original_plan.accept(&mut visitor) {
Ok(valid) => {
if valid {
optimizer::DaskSqlOptimizer::new()
.run_optimizations(existing_plan.original_plan)
.map(|k| logical::PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(|e| PyErr::new::<OptimizationException, _>(format!("{}", e)))
} else {
// This LogicalPlan does not support Optimization. Return original
Ok(existing_plan)
}
}
Err(e) => Err(PyErr::new::<OptimizationException, _>(format!("{}", e))),
}
}
}

/// Visits each AST node to determine if the plan is valid for optimization or not
pub struct OptimizablePlanVisitor;

impl PlanVisitor for OptimizablePlanVisitor {
type Error = DataFusionError;

fn pre_visit(&mut self, plan: &LogicalPlan) -> std::result::Result<bool, DataFusionError> {
// If the plan contains an unsupported Node type we flag the plan as un-optimizable here
match plan {
LogicalPlan::Explain(..) => Ok(false),
_ => Ok(true),
}
}

fn post_visit(&mut self, _plan: &LogicalPlan) -> std::result::Result<bool, DataFusionError> {
Ok(true)
}
}
4 changes: 4 additions & 0 deletions dask_planner/src/sql/exceptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ use datafusion::error::DataFusionError;
use pyo3::{create_exception, PyErr};
use std::fmt::Debug;

// Identifies expections that occur while attempting to generate a `LogicalPlan` from a SQL string
create_exception!(rust, ParsingException, pyo3::exceptions::PyException);

// Identifies exceptions that occur during attempts to optimization an existing `LogicalPlan`
create_exception!(rust, OptimizationException, pyo3::exceptions::PyException);

pub fn py_type_err(e: impl Debug) -> PyErr {
PyErr::new::<pyo3::exceptions::PyTypeError, _>(format!("{:?}", e))
}
Expand Down
4 changes: 2 additions & 2 deletions dask_planner/src/sql/logical/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::expression::PyExpr;
use crate::sql::column;

use datafusion::logical_expr::{
and, binary_expr,
and,
logical_plan::{Join, JoinType, LogicalPlan},
Expr, Operator,
Expr,
};

use crate::sql::exceptions::py_type_err;
Expand Down
54 changes: 54 additions & 0 deletions dask_planner/src/sql/optimizer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use datafusion::error::DataFusionError;
use datafusion::logical_expr::LogicalPlan;
use datafusion::optimizer::eliminate_limit::EliminateLimit;
use datafusion::optimizer::filter_push_down::FilterPushDown;
use datafusion::optimizer::limit_push_down::LimitPushDown;
use datafusion::optimizer::optimizer::OptimizerRule;
use datafusion::optimizer::OptimizerConfig;

use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use datafusion::optimizer::projection_push_down::ProjectionPushDown;
use datafusion::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy;
use datafusion::optimizer::subquery_filter_to_join::SubqueryFilterToJoin;

/// Houses the optimization logic for Dask-SQL. This optimization controls the optimizations
/// and their ordering in regards to their impact on the underlying `LogicalPlan` instance
pub struct DaskSqlOptimizer {
optimizations: Vec<Box<dyn OptimizerRule + Send + Sync>>,
}

impl DaskSqlOptimizer {
/// Creates a new instance of the DaskSqlOptimizer with all the DataFusion desired
/// optimizers as well as any custom `OptimizerRule` trait impls that might be desired.
pub fn new() -> Self {
let mut rules: Vec<Box<dyn OptimizerRule + Send + Sync>> = Vec::new();
rules.push(Box::new(CommonSubexprEliminate::new()));
rules.push(Box::new(EliminateLimit::new()));
rules.push(Box::new(FilterPushDown::new()));
rules.push(Box::new(LimitPushDown::new()));
rules.push(Box::new(ProjectionPushDown::new()));
rules.push(Box::new(SingleDistinctToGroupBy::new()));
rules.push(Box::new(SubqueryFilterToJoin::new()));
Self {
optimizations: rules,
}
}

/// Iteratoes through the configured `OptimizerRule`(s) to transform the input `LogicalPlan`
/// to its final optimized form
pub(crate) fn run_optimizations(
&self,
plan: LogicalPlan,
) -> Result<LogicalPlan, DataFusionError> {
let mut resulting_plan: LogicalPlan = plan;
for optimization in &self.optimizations {
match optimization.optimize(&resulting_plan, &OptimizerConfig::new()) {
Ok(optimized_plan) => resulting_plan = optimized_plan,
Err(e) => {
return Err(e);
}
}
}
Ok(resulting_plan)
}
}
4 changes: 1 addition & 3 deletions dask_planner/src/sql/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use crate::sql::types::SqlTypeName;
use async_trait::async_trait;

use datafusion::arrow::datatypes::{DataType, Field, SchemaRef};
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::DataFusionError;
use datafusion::logical_expr::{Expr, LogicalPlan, TableSource};
use datafusion::logical_expr::{LogicalPlan, TableSource};

use pyo3::prelude::*;

Expand Down
32 changes: 22 additions & 10 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@
from dask.base import optimize
from dask.distributed import Client

from dask_planner.rust import DaskSchema, DaskSQLContext, DaskTable, DFParsingException
from dask_planner.rust import (
DaskSchema,
DaskSQLContext,
DaskTable,
DFOptimizationException,
DFParsingException,
)

try:
import dask_cuda # noqa: F401
Expand All @@ -31,7 +37,7 @@
from dask_sql.mappings import python_to_sql_type
from dask_sql.physical.rel import RelConverter, custom, logical
from dask_sql.physical.rex import RexConverter, core
from dask_sql.utils import ParsingException
from dask_sql.utils import OptimizationException, ParsingException

if TYPE_CHECKING:
from dask_planner.rust import Expression
Expand Down Expand Up @@ -829,17 +835,23 @@ def _get_ral(self, sql):
except DFParsingException as pe:
raise ParsingException(sql, str(pe)) from None

rel = nonOptimizedRel
logger.debug(f"_get_ral -> nonOptimizedRelNode: {nonOptimizedRel}")
# Optimization might remove some alias projects. Make sure to keep them here.
select_names = [field for field in rel.getRowType().getFieldList()]
# Optimize the `LogicalPlan` or skip if configured
if dask_config.get("sql.optimize"):
try:
rel = self.context.optimize_relational_algebra(nonOptimizedRel)
except DFOptimizationException as oe:
rel = nonOptimizedRel
raise OptimizationException(str(oe)) from None
else:
rel = nonOptimizedRel

# TODO: For POC we are not optimizing the relational algebra - Jeremy Dyer
# rel = generator.getOptimizedRelationalAlgebra(nonOptimizedRelNode)
# rel_string = str(generator.getRelationalAlgebraString(rel))
rel_string = rel.explain_original()

logger.debug(f"_get_ral -> LogicalPlan: {rel}")
logger.debug(f"Extracted relational algebra:\n {rel_string}")

# Optimization might remove some alias projects. Make sure to keep them here.
select_names = [field for field in rel.getRowType().getFieldList()]

return rel, select_names, rel_string

def _get_tables_from_stack(self):
Expand Down
15 changes: 8 additions & 7 deletions dask_sql/physical/rel/logical/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,13 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

# Fix the column names and the order of them, as this was messed with during the aggregations
df_agg.columns = df_agg.columns.get_level_values(-1)
backend_output_column_order = [
cc.get_backend_by_frontend_name(oc) for oc in output_column_order
]

if len(output_column_order) == 1 and output_column_order[0] == "UInt8(1)":
backend_output_column_order = [df_agg.columns[0]]
else:
backend_output_column_order = [
cc.get_backend_by_frontend_name(oc) for oc in output_column_order
]
cc = ColumnContainer(df_agg.columns).limit_to(backend_output_column_order)

cc = self.fix_column_to_row_type(cc, rel.getRowType())
Expand Down Expand Up @@ -425,7 +429,7 @@ def _perform_aggregation(
if additional_column_name is None:
additional_column_name = new_temporary_column(dc.df)

# perform groupby operation; if we are using custom aggreagations, we must handle
# perform groupby operation; if we are using custom aggregations, we must handle
# null values manually (this is slow)
if fast_groupby:
group_columns = [
Expand All @@ -448,11 +452,8 @@ def _perform_aggregation(

for col in agg_result.columns:
logger.debug(col)
logger.debug(f"agg_result: {agg_result.head()}")

# fix the column names to a single level
agg_result.columns = agg_result.columns.get_level_values(-1)

logger.debug(f"agg_result after: {agg_result.head()}")

return agg_result
5 changes: 5 additions & 0 deletions dask_sql/sql-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,8 @@ properties:
type: boolean
description: |
Whether to try pushing down filter predicates into IO (when possible).

optimize:
type: boolean
description: |
Whether the first generated logical plan should be further optimized or used as is.
2 changes: 2 additions & 0 deletions dask_sql/sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ sql:
case_sensitive: True

predicate_pushdown: True

optimize: True
13 changes: 13 additions & 0 deletions dask_sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ def __init__(self, sql, validation_exception_string):
super().__init__(validation_exception_string.strip())


class OptimizationException(Exception):
"""
Helper class for formatting exceptions that occur while trying to
optimize a logical plan
"""

def __init__(self, exception_string):
"""
Create a new exception out of the SQL query and the exception from DataFusion
"""
super().__init__(exception_string.strip())


class LoggableDataFrame:
"""Small helper class to print resulting dataframes or series in logging messages"""

Expand Down
3 changes: 3 additions & 0 deletions tests/integration/test_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ def test_order_by_no_limit():
)


@pytest.mark.skip(
reason="WIP DataFusion - https://github.com/dask-contrib/dask-sql/issues/530"
)
def test_order_by_limit():
a = make_rand_df(100, a=(int, 50), b=(str, 50), c=float)
eq_sqlite(
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ def test_join_cross(c, user_table_1, department_table):
assert_eq(return_df, expected_df, check_index=False)


@pytest.mark.skip(
reason="WIP DataFusion - Enabling CBO generates yet to be implemented edge case"
)
def test_join_complex(c):
return_df = c.sql(
"""
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/test_rex.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from tests.utils import assert_eq


@pytest.mark.skip(
reason="WIP DataFusion - Enabling CBO generates yet to be implemented edge case"
)
def test_case(c, df):
result_df = c.sql(
"""
Expand Down
1 change: 1 addition & 0 deletions tests/integration/test_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def test_limit(assert_query_gives_same_result):
)


@pytest.mark.skip(reason="WIP DataFusion")
def test_groupby(assert_query_gives_same_result):
assert_query_gives_same_result(
"""
Expand Down