Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/deps/dask-min.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dask=2.22.0
2 changes: 1 addition & 1 deletion ci/deps/dask.yml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
dask
dask
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies:
# Ibis soft dependencies
# TODO This section is probably not very accurate right now (some dependencies should probably be in the backends files)
- sqlalchemy>=1.3
- dask
- dask>=2.22.0
- graphviz>=2.38
- openjdk=8
- pytables>=3.6
Expand Down
2 changes: 1 addition & 1 deletion ibis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@

with suppress(ImportError):
# pip install ibis-framework[dask]
import ibis.backends as dask # noqa: F401
from ibis.backends import dask # noqa: F401


__version__ = get_versions()['version']
Expand Down
72 changes: 8 additions & 64 deletions ibis/backends/dask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from functools import partial
from typing import Dict, List, Mapping

import dask
import dask.dataframe as dd
import dateutil.parser
import numpy as np
import pandas as pd
import toolz
from multipledispatch import Dispatcher
from pandas.api.types import DatetimeTZDtype
from pkg_resources import parse_version

Expand All @@ -25,11 +25,13 @@
PANDAS_DATE_TYPES,
PANDAS_STRING_TYPES,
_inferable_pandas_dtypes,
convert,
convert_timezone,
ibis_dtype_to_pandas,
ibis_schema_to_pandas,
)
from ibis.backends.pandas.core import execute_and_reset

from .core import execute_and_reset

infer_dask_dtype = pd.api.types.infer_dtype

Expand Down Expand Up @@ -82,35 +84,13 @@ def infer_dask_schema(df, schema=None):

ibis_schema_to_dask = ibis_schema_to_pandas

convert = Dispatcher(
'convert',
doc="""\
Convert `column` to the dask dtype corresponding to `out_dtype`, where the
dtype of `column` is `in_dtype`.

Parameters
----------
in_dtype : Union[np.dtype, dask_dtype]
The dtype of `column`, used for dispatching
out_dtype : ibis.expr.datatypes.DataType
The requested ibis type of the output
column : dd.Series
The column to convert

Returns
-------
result : dd.Series
The converted column
""",
)


@convert.register(DatetimeTZDtype, dt.Timestamp, dd.Series)
def convert_datetimetz_to_timestamp(in_dtype, out_dtype, column):
output_timezone = out_dtype.timezone
if output_timezone is not None:
return column.dt.tz_convert(output_timezone)
return column.astype(out_dtype.to_dask(), errors='ignore')
return column.astype(out_dtype.to_dask())


DASK_STRING_TYPES = PANDAS_STRING_TYPES
Expand Down Expand Up @@ -151,7 +131,7 @@ def convert_any_to_interval(_, out_dtype, column):

@convert.register(np.dtype, dt.String, dd.Series)
def convert_any_to_string(_, out_dtype, column):
result = column.astype(out_dtype.to_dask(), errors='ignore')
result = column.astype(out_dtype.to_dask())
return result


Expand All @@ -167,46 +147,11 @@ def convert_boolean_to_series(in_dtype, out_dtype, column):

@convert.register(object, dt.DataType, dd.Series)
def convert_any_to_any(_, out_dtype, column):
return column.astype(out_dtype.to_dask(), errors='ignore')


def ibis_schema_apply_to(schema: sch.Schema, df: dd.DataFrame) -> dd.DataFrame:
"""Applies the Ibis schema to a dask DataFrame

Parameters
----------
schema : ibis.schema.Schema
df : dask.dataframe.DataFrame

Returns
-------
df : dask.dataframeDataFrame

Notes
-----
Mutates `df`
"""

for column, dtype in schema.items():
dask_dtype = dtype.to_dask()
col = df[column]
col_dtype = col.dtype

try:
not_equal = dask_dtype != col_dtype
except TypeError:
# ugh, we can't compare dtypes coming from dask, assume not equal
not_equal = True

if not_equal or isinstance(dtype, dt.String):
df[column] = convert(col_dtype, dtype, col)

return df
return column.astype(out_dtype.to_dask())


dt.DataType.to_dask = ibis_dtype_to_dask
sch.Schema.to_dask = ibis_schema_to_dask
sch.Schema.apply_to = ibis_schema_apply_to


class DaskTable(ops.DatabaseTable):
Expand Down Expand Up @@ -297,7 +242,6 @@ def create_table(
if obj is not None:
df = obj
else:
# TODO - this isn't right
dtypes = ibis_schema_to_dask(schema)
df = schema.apply_to(
dd.from_pandas(
Expand Down Expand Up @@ -342,4 +286,4 @@ def exists_table(self, name: str) -> bool:
@property
def version(self) -> str:
"""Return the version of the underlying backend library."""
return parse_version(dd.__version__)
return parse_version(dask.__version__)
65 changes: 65 additions & 0 deletions ibis/backends/dask/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,76 @@

from __future__ import absolute_import

from typing import Optional

import dask.dataframe as dd

from ibis.backends.pandas.core import (
execute,
is_computable_input,
is_computable_input_arg,
)
from ibis.expr.typing import TimeContext

is_computable_input.register(dd.core.Scalar)(is_computable_input_arg)


def execute_and_reset(
expr,
params=None,
scope=None,
timecontext: Optional[TimeContext] = None,
aggcontext=None,
**kwargs,
):
"""Execute an expression against data that are bound to it. If no data
are bound, raise an Exception.
Notes
-----
The difference between this function and :func:`~ibis.dask.core.execute`
is that this function resets the index of the result, if the result has
an index.
Parameters
----------
expr : ibis.expr.types.Expr
The expression to execute
params : Mapping[ibis.expr.types.Expr, object]
The data that an unbound parameter in `expr` maps to
scope : Mapping[ibis.expr.operations.Node, object]
Additional scope, mapping ibis operations to data
timecontext : Optional[TimeContext]
timecontext needed for execution
aggcontext : Optional[ibis.dask.aggcontext.AggregationContext]
An object indicating how to compute aggregations. For example,
a rolling mean needs to be computed differently than the mean of a
column.
kwargs : Dict[str, object]
Additional arguments that can potentially be used by individual node
execution
Returns
-------
result : Union[
dask.dataframe.Series,
dask.dataframe.DataFrame,
ibis.dask.core.simple_types
]
Raises
------
ValueError
* If no data are bound to the input expression
"""
result = execute(
expr,
params=params,
scope=scope,
timecontext=timecontext,
aggcontext=aggcontext,
**kwargs,
)
if isinstance(result, dd.DataFrame):
schema = expr.schema()
df = result.reset_index()
return df[schema.names]
elif isinstance(result, dd.Series):
return result.reset_index(drop=True)
return result
16 changes: 13 additions & 3 deletions ibis/backends/dask/execution/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
# This is awaiting implementation of the rest of execution
# https://github.com/ibis-project/ibis/issues/2537
from ibis.backends.pandas.core import execute, execute_node # noqa: F401,F403
from .aggregations import * # noqa: F401,F403
from .arrays import * # noqa: F401,F403
from .decimal import * # noqa: F401,F403
from .generic import * # noqa: F401,F403
from .indexing import * # noqa: F401,F403
from .join import * # noqa: F401,F403
from .maps import * # noqa: F401,F403
from .numeric import * # noqa: F401,F403
from .reductions import * # noqa: F401,F403
from .selection import * # noqa: F401,F403
from .strings import * # noqa: F401,F403
from .structs import * # noqa: F401,F403
from .temporal import * # noqa: F401,F403
143 changes: 143 additions & 0 deletions ibis/backends/dask/execution/aggregations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""Execution rules for Aggregatons - mostly TODO

- ops.Aggregation
- ops.Any
- ops.NotAny
- ops.All
- ops.NotAll

"""

import functools
import operator
from typing import Optional

import dask.dataframe as dd

import ibis.expr.operations as ops
from ibis.backends.pandas.execution.generic import execute, execute_node
from ibis.expr.scope import Scope
from ibis.expr.typing import TimeContext


# TODO - aggregations - #2553
# Not all code paths work cleanly here
@execute_node.register(ops.Aggregation, dd.DataFrame)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add to the list of things that we can register for pandas/dask

def execute_aggregation_dataframe(
op, data, scope=None, timecontext: Optional[TimeContext] = None, **kwargs
):
assert op.metrics, 'no metrics found during aggregation execution'

if op.sort_keys:
raise NotImplementedError(
'sorting on aggregations not yet implemented'
)

predicates = op.predicates
if predicates:
predicate = functools.reduce(
operator.and_,
(
execute(p, scope=scope, timecontext=timecontext, **kwargs)
for p in predicates
),
)
data = data.loc[predicate]

columns = {}

if op.by:
grouping_key_pairs = list(
zip(op.by, map(operator.methodcaller('op'), op.by))
)
grouping_keys = [
by_op.name
if isinstance(by_op, ops.TableColumn)
else execute(
by, scope=scope, timecontext=timecontext, **kwargs
).rename(by.get_name())
for by, by_op in grouping_key_pairs
]
columns.update(
(by_op.name, by.get_name())
for by, by_op in grouping_key_pairs
if hasattr(by_op, 'name')
)
source = data.groupby(grouping_keys)
else:
source = data

scope = scope.merge_scope(Scope({op.table.op(): source}, timecontext))

pieces = []
for metric in op.metrics:
piece = execute(metric, scope=scope, timecontext=timecontext, **kwargs)
piece.name = metric.get_name()
pieces.append(piece)

result = dd.concat(pieces, axis=1)

# If grouping, need a reset to get the grouping key back as a column
if op.by:
result = result.reset_index()

result.columns = [columns.get(c, c) for c in result.columns]

if op.having:
# .having(...) is only accessible on groupby, so this should never
# raise
if not op.by:
raise ValueError(
'Filtering out aggregation values is not allowed without at '
'least one grouping key'
)

# TODO(phillipc): Don't recompute identical subexpressions
predicate = functools.reduce(
operator.and_,
(
execute(having, scope=scope, timecontext=timecontext, **kwargs)
for having in op.having
),
)
assert len(predicate) == len(
result
), 'length of predicate does not match length of DataFrame'
result = result.loc[predicate.values]
return result


# TODO - aggregations - #2553
# @execute_node.register((ops.Any, ops.All), (dd.Series, SeriesGroupBy))
# def execute_any_all_series(op, data, aggcontext=None, **kwargs):
# if isinstance(aggcontext, (agg_ctx.Summarize, agg_ctx.Transform)):
# result = aggcontext.agg(data, type(op).__name__.lower())
# else:
# result = aggcontext.agg(
# data, lambda data: getattr(data, type(op).__name__.lower())()
# )
# return result

# TODO - aggregations - #2553
# @execute_node.register(ops.NotAny, (dd.Series, SeriesGroupBy))
# def execute_notany_series(op, data, aggcontext=None, **kwargs):
# if isinstance(aggcontext, (agg_ctx.Summarize, agg_ctx.Transform)):
# result = ~(aggcontext.agg(data, 'any'))
# else:
# result = aggcontext.agg(data, lambda data: ~(data.any()))
# try:
# return result.astype(bool)
# except TypeError:
# return result

# TODO - aggregations - #2553
# @execute_node.register(ops.NotAll, (dd.Series, SeriesGroupBy))
# def execute_notall_series(op, data, aggcontext=None, **kwargs):
# if isinstance(aggcontext, (agg_ctx.Summarize, agg_ctx.Transform)):
# result = ~(aggcontext.agg(data, 'all'))
# else:
# result = aggcontext.agg(data, lambda data: ~(data.all()))
# try:
# return result.astype(bool)
# except TypeError:
# return result
Loading