Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/requirements-py36.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies:
- seaborn
- toolz
- rasterio
- bottleneck
- pip:
- coveralls
- pytest-cov
Expand Down
2 changes: 1 addition & 1 deletion doc/installing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ For accelerating xarray

- `bottleneck <https://github.com/kwgoodman/bottleneck>`__: speeds up
NaN-skipping and rolling window aggregations by a large factor
(1.0 or later)
(1.1 or later)
- `cyordereddict <https://github.com/shoyer/cyordereddict>`__: speeds up most
internal operations with xarray data structures (for python versions < 3.5)

Expand Down
7 changes: 7 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ Backward Incompatible Changes

- Old numpy < 1.11 and pandas < 0.18 are no longer supported (:issue:`1512`).
By `Keisuke Fujii <https://github.com/fujiisoup>`_.
- The minimum supported version bottleneck has increased to 1.1
(:issue:`1279`).
By `Joe Hamman <https://github.com/jhamman>`_.

Enhancements
~~~~~~~~~~~~
Expand Down Expand Up @@ -108,6 +111,10 @@ Enhancements
By `Joe Hamman <https://github.com/jhamman>`_ and
`Gerrit Holl <https://github.com/gerritholl>`_.

- Support applying rolling window operations using bottleneck's moving window
functions on data stored as dask arrays (:issue:`1279`).
By `Joe Hamman <https://github.com/jhamman>`_.

Bug fixes
~~~~~~~~~

Expand Down
26 changes: 26 additions & 0 deletions xarray/core/dask_array_ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Define core operations for xarray objects.
"""
import numpy as np

try:
import dask.array as da
except ImportError:
pass


def dask_rolling_wrapper(moving_func, a, window, min_count=None, axis=-1):
'''wrapper to apply bottleneck moving window funcs on dask arrays'''
# inputs for ghost
if axis < 0:
axis = a.ndim + axis
depth = {d: 0 for d in range(a.ndim)}
depth[axis] = window - 1
boundary = {d: np.nan for d in range(a.ndim)}
# create ghosted arrays
ag = da.ghost.ghost(a, depth=depth, boundary=boundary)
# apply rolling func
out = ag.map_blocks(moving_func, window, min_count=min_count,
axis=axis, dtype=a.dtype)
# trim array
result = da.ghost.trim_internal(out, depth)
return result
45 changes: 11 additions & 34 deletions xarray/core/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from __future__ import print_function

import operator
from distutils.version import LooseVersion

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -51,7 +50,8 @@
BOTTLENECK_ROLLING_METHODS = {'move_sum': 'sum', 'move_mean': 'mean',
'move_std': 'std', 'move_min': 'min',
'move_max': 'max', 'move_var': 'var',
'move_argmin': 'argmin', 'move_argmax': 'argmax'}
'move_argmin': 'argmin', 'move_argmax': 'argmax',
'move_median': 'median'}
# TODO: wrap take, dot, sort


Expand Down Expand Up @@ -243,6 +243,7 @@ def rolling_count(rolling):

return rolling_count.where(enough_periods)


def inject_reduce_methods(cls):
methods = ([(name, getattr(duck_array_ops, 'array_%s' % name), False)
for name in REDUCE_METHODS] +
Expand Down Expand Up @@ -354,40 +355,16 @@ def inject_bottleneck_rolling_methods(cls):
setattr(cls, 'count', func)

# bottleneck rolling methods
if has_bottleneck:
# TODO: Bump the required version of bottlneck to 1.1 and remove all
# these version checks (see GH#1278)
bn_version = LooseVersion(bn.__version__)
bn_min_version = LooseVersion('1.0')
bn_version_1_1 = LooseVersion('1.1')
if bn_version < bn_min_version:
return

for bn_name, method_name in BOTTLENECK_ROLLING_METHODS.items():
try:
f = getattr(bn, bn_name)
func = cls._bottleneck_reduce(f)
func.__name__ = method_name
func.__doc__ = _ROLLING_REDUCE_DOCSTRING_TEMPLATE.format(
name=func.__name__, da_or_ds='DataArray')
setattr(cls, method_name, func)
except AttributeError as e:
# skip functions not in Bottleneck 1.0
if ((bn_version < bn_version_1_1) and
(bn_name not in ['move_var', 'move_argmin',
'move_argmax', 'move_rank'])):
raise e

# bottleneck rolling methods without min_count (bn.__version__ < 1.1)
f = getattr(bn, 'move_median')
if bn_version >= bn_version_1_1:
func = cls._bottleneck_reduce(f)
else:
func = cls._bottleneck_reduce_without_min_count(f)
func.__name__ = 'median'
if not has_bottleneck:
return

for bn_name, method_name in BOTTLENECK_ROLLING_METHODS.items():
f = getattr(bn, bn_name)
func = cls._bottleneck_reduce(f)
func.__name__ = method_name
func.__doc__ = _ROLLING_REDUCE_DOCSTRING_TEMPLATE.format(
name=func.__name__, da_or_ds='DataArray')
setattr(cls, 'median', func)
setattr(cls, method_name, func)


def inject_datasetrolling_methods(cls):
Expand Down
47 changes: 12 additions & 35 deletions xarray/core/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .combine import concat
from .ops import (inject_bottleneck_rolling_methods,
inject_datasetrolling_methods, has_bottleneck, bn)
from .dask_array_ops import dask_rolling_wrapper


class Rolling(object):
Expand Down Expand Up @@ -75,7 +76,8 @@ def __init__(self, obj, min_periods=None, center=False, **windows):
self._min_periods = window
else:
if min_periods <= 0:
raise ValueError('min_periods must be greater than zero or None')
raise ValueError(
'min_periods must be greater than zero or None')

self._min_periods = min_periods
self.center = center
Expand All @@ -98,7 +100,6 @@ class DataArrayRolling(Rolling):
This class adds the following class methods;
+ _reduce_method(cls, func)
+ _bottleneck_reduce(cls, func)
+ _bottleneck_reduce_without_min_count(cls, func)

These class methods will be used to inject numpy or bottleneck function
by doing
Expand Down Expand Up @@ -237,47 +238,23 @@ def _bottleneck_reduce(cls, func):
def wrapped_func(self, **kwargs):
from .dataarray import DataArray

if isinstance(self.obj.data, dask_array_type):
raise NotImplementedError(
'Rolling window operation does not work with dask arrays')

# bottleneck doesn't allow min_count to be 0, although it should
# work the same as if min_count = 1
if self.min_periods is not None and self.min_periods == 0:
min_count = self.min_periods + 1
min_count = 1
else:
min_count = self.min_periods

values = func(self.obj.data, window=self.window,
min_count=min_count,
axis=self.obj.get_axis_num(self.dim))

result = DataArray(values, self.obj.coords)

if self.center:
result = self._center_result(result)

return result
return wrapped_func

@classmethod
def _bottleneck_reduce_without_min_count(cls, func):
"""
Methods to return a wrapped function for `media` bottoleneck method.
bottleneck's median does not accept min_periods.
"""
def wrapped_func(self, **kwargs):
from .dataarray import DataArray

if self.min_periods is not None:
raise ValueError('Rolling.median does not accept min_periods')
axis = self.obj.get_axis_num(self.dim)

if isinstance(self.obj.data, dask_array_type):
raise NotImplementedError(
'Rolling window operation does not work with dask arrays')

values = func(self.obj.data, window=self.window,
axis=self.obj.get_axis_num(self.dim))
values = dask_rolling_wrapper(func, self.obj.data,
window=self.window,
min_count=min_count,
axis=axis)
else:
values = func(self.obj.data, window=self.window,
min_count=min_count, axis=axis)

result = DataArray(values, self.obj.coords)

Expand Down
2 changes: 1 addition & 1 deletion xarray/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@

try:
import bottleneck
if LooseVersion(bottleneck.__version__) < LooseVersion('1.0'):
if LooseVersion(bottleneck.__version__) < LooseVersion('1.1'):
raise ImportError('Fall back to numpy')
has_bottleneck = True
except ImportError:
Expand Down
57 changes: 32 additions & 25 deletions xarray/tests/test_dataarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -2627,6 +2627,18 @@ def da(request):
[0, np.nan, 1, 2, np.nan, 3, 4, 5, np.nan, 6, 7],
dims='time')


@pytest.fixture
def da_dask(seed=123):
pytest.importorskip('dask.array')
rs = np.random.RandomState(seed)
times = pd.date_range('2000-01-01', freq='1D', periods=21)
values = rs.normal(size=(1, 21, 1))
da = DataArray(values, dims=('a', 'time', 'x')).chunk({'time': 7})
da['time'] = times
return da


def test_rolling_iter(da):

rolling_obj = da.rolling(time=7)
Expand All @@ -2646,8 +2658,6 @@ def test_rolling_doc(da):


def test_rolling_properties(da):
pytest.importorskip('bottleneck', minversion='1.0')

rolling_obj = da.rolling(time=4)

assert rolling_obj.obj.get_axis_num('time') == 1
Expand All @@ -2669,19 +2679,7 @@ def test_rolling_properties(da):
@pytest.mark.parametrize('center', (True, False, None))
@pytest.mark.parametrize('min_periods', (1, None))
def test_rolling_wrapped_bottleneck(da, name, center, min_periods):
pytest.importorskip('bottleneck')
import bottleneck as bn

# skip if median and min_periods bottleneck version < 1.1
if ((min_periods == 1) and
(name == 'median') and
(LooseVersion(bn.__version__) < LooseVersion('1.1'))):
pytest.skip('rolling median accepts min_periods for bottleneck 1.1')

# skip if var and bottleneck version < 1.1
if ((name == 'median') and
(LooseVersion(bn.__version__) < LooseVersion('1.1'))):
pytest.skip('rolling median accepts min_periods for bottleneck 1.1')
bn = pytest.importorskip('bottleneck', minversion="1.1")

# Test all bottleneck functions
rolling_obj = da.rolling(time=7, min_periods=min_periods)
Expand All @@ -2698,14 +2696,22 @@ def test_rolling_wrapped_bottleneck(da, name, center, min_periods):
assert_equal(actual, da['time'])


def test_rolling_invalid_args(da):
pytest.importorskip('bottleneck', minversion="1.0")
import bottleneck as bn
if LooseVersion(bn.__version__) >= LooseVersion('1.1'):
pytest.skip('rolling median accepts min_periods for bottleneck 1.1')
with pytest.raises(ValueError) as exception:
da.rolling(time=7, min_periods=1).median()
assert 'Rolling.median does not' in str(exception)
@pytest.mark.parametrize('name', ('sum', 'mean', 'std', 'min', 'max',
'median'))
@pytest.mark.parametrize('center', (True, False, None))
@pytest.mark.parametrize('min_periods', (1, None))
def test_rolling_wrapped_bottleneck_dask(da_dask, name, center, min_periods):
pytest.importorskip('dask.array')
# dask version
rolling_obj = da_dask.rolling(time=7, min_periods=min_periods)
actual = getattr(rolling_obj, name)().load()
# numpy version
rolling_obj = da_dask.load().rolling(time=7, min_periods=min_periods)
expected = getattr(rolling_obj, name)()

# using all-close because rolling over ghost cells introduces some
# precision errors
assert_allclose(actual, expected)


@pytest.mark.parametrize('center', (True, False))
Expand All @@ -2718,8 +2724,8 @@ def test_rolling_pandas_compat(da, center, window, min_periods):
if min_periods is not None and window < min_periods:
min_periods = window

s_rolling = pd.rolling_mean(s, window, center=center,
min_periods=min_periods)
s_rolling = s.rolling(window, center=center,
min_periods=min_periods).mean()
da_rolling = da.rolling(index=window, center=center,
min_periods=min_periods).mean()
# pandas does some fancy stuff in the last position,
Expand All @@ -2729,6 +2735,7 @@ def test_rolling_pandas_compat(da, center, window, min_periods):
np.testing.assert_allclose(s_rolling.index,
da_rolling['index'])


@pytest.mark.parametrize('da', (1, 2), indirect=True)
@pytest.mark.parametrize('center', (True, False))
@pytest.mark.parametrize('min_periods', (None, 1, 2, 3))
Expand Down
25 changes: 3 additions & 22 deletions xarray/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3511,8 +3511,6 @@ def ds(request):


def test_rolling_properties(ds):
pytest.importorskip('bottleneck', minversion='1.0')

# catching invalid args
with pytest.raises(ValueError) as exception:
ds.rolling(time=7, x=2)
Expand All @@ -3527,18 +3525,14 @@ def test_rolling_properties(ds):
ds.rolling(time2=2)
assert 'time2' in str(exception)


@pytest.mark.parametrize('name',
('sum', 'mean', 'std', 'var', 'min', 'max', 'median'))
@pytest.mark.parametrize('center', (True, False, None))
@pytest.mark.parametrize('min_periods', (1, None))
@pytest.mark.parametrize('key', ('z1', 'z2'))
def test_rolling_wrapped_bottleneck(ds, name, center, min_periods, key):
pytest.importorskip('bottleneck')
import bottleneck as bn

# skip if median and min_periods
if (min_periods == 1) and (name == 'median'):
pytest.skip()
bn = pytest.importorskip('bottleneck', minversion='1.1')

# Test all bottleneck functions
rolling_obj = ds.rolling(time=7, min_periods=min_periods)
Expand All @@ -3558,16 +3552,6 @@ def test_rolling_wrapped_bottleneck(ds, name, center, min_periods, key):
assert_equal(actual, ds['time'])


def test_rolling_invalid_args(ds):
pytest.importorskip('bottleneck', minversion="1.0")
import bottleneck as bn
if LooseVersion(bn.__version__) >= LooseVersion('1.1'):
pytest.skip('rolling median accepts min_periods for bottleneck 1.1')
with pytest.raises(ValueError) as exception:
da.rolling(time=7, min_periods=1).median()
assert 'Rolling.median does not' in str(exception)


@pytest.mark.parametrize('center', (True, False))
@pytest.mark.parametrize('min_periods', (None, 1, 2, 3))
@pytest.mark.parametrize('window', (1, 2, 3, 4))
Expand Down Expand Up @@ -3603,11 +3587,8 @@ def test_rolling_reduce(ds, center, min_periods, window, name):
if min_periods is not None and window < min_periods:
min_periods = window

# std with window == 1 seems unstable in bottleneck
if name == 'std' and window == 1:
window = 2
if name == 'median':
min_periods = None
pytest.skip('std with window == 1 is unstable in bottleneck')

rolling_obj = ds.rolling(time=window, center=center,
min_periods=min_periods)
Expand Down