diff --git a/.github/cluster-upstream.yml b/.github/cluster-upstream.yml index b76fd0f3c..e23851616 100644 --- a/.github/cluster-upstream.yml +++ b/.github/cluster-upstream.yml @@ -7,7 +7,8 @@ services: command: dask-scheduler environment: USE_MAMBA: "true" - EXTRA_CONDA_PACKAGES: "dask/label/dev::dask cloudpickle>=2.1.0" + # TODO: remove pandas constraint once Dask images are updated + EXTRA_CONDA_PACKAGES: "dask/label/dev::dask cloudpickle>=2.1.0 pandas>=1.5.0" ports: - "8786:8786" dask-worker: @@ -16,6 +17,7 @@ services: command: dask-worker dask-scheduler:8786 environment: USE_MAMBA: "true" - EXTRA_CONDA_PACKAGES: "dask/label/dev::dask cloudpickle>=2.1.0 pyarrow>=6.0.1 libstdcxx-ng>=12.1.0" + # TODO: remove pandas constraint once Dask images are updated + EXTRA_CONDA_PACKAGES: "dask/label/dev::dask cloudpickle>=2.1.0 pyarrow>=6.0.1 libstdcxx-ng>=12.1.0 pandas>=1.5.0" volumes: - /tmp:/tmp diff --git a/.github/cluster.yml b/.github/cluster.yml index 29fbde22c..ab55fb078 100644 --- a/.github/cluster.yml +++ b/.github/cluster.yml @@ -13,6 +13,7 @@ services: command: dask-worker dask-scheduler:8786 environment: USE_MAMBA: "true" - EXTRA_CONDA_PACKAGES: "cloudpickle>=2.1.0 pyarrow>=6.0.1 libstdcxx-ng>=12.1.0" + # TODO: remove pandas constraint once Dask images are updated + EXTRA_CONDA_PACKAGES: "cloudpickle>=2.1.0 pyarrow>=6.0.1 libstdcxx-ng>=12.1.0 pandas>=1.5.0" volumes: - /tmp:/tmp diff --git a/dask_sql/_compat.py b/dask_sql/_compat.py index ee6d88c2f..eb0b7c8b4 100644 --- a/dask_sql/_compat.py +++ b/dask_sql/_compat.py @@ -7,6 +7,7 @@ FLOAT_NAN_IMPLEMENTED = _pandas_version >= parseVersion("1.2.0") INT_NAN_IMPLEMENTED = _pandas_version >= parseVersion("1.0.0") +INDEXER_WINDOW_STEP_IMPLEMENTED = _pandas_version >= parseVersion("1.5.0") # TODO: remove if prompt-toolkit min version gets bumped PIPE_INPUT_CONTEXT_MANAGER = _prompt_toolkit_version >= parseVersion("3.0.29") diff --git a/dask_sql/physical/rel/logical/window.py b/dask_sql/physical/rel/logical/window.py index e8541a41c..c61a95ad1 100644 --- a/dask_sql/physical/rel/logical/window.py +++ b/dask_sql/physical/rel/logical/window.py @@ -8,6 +8,7 @@ import pandas as pd from pandas.api.indexers import BaseIndexer +from dask_sql._compat import INDEXER_WINDOW_STEP_IMPLEMENTED from dask_sql.datacontainer import ColumnContainer, DataContainer from dask_sql.java import org from dask_sql.physical.rel.base import BaseRelPlugin @@ -121,7 +122,7 @@ class Indexer(BaseIndexer): def __init__(self, start: int, end: int): super().__init__(self, start=start, end=end) - def get_window_bounds( + def _get_window_bounds( self, num_values: int = 0, min_periods: Optional[int] = None, @@ -150,6 +151,29 @@ def get_window_bounds( ) return start, end + if INDEXER_WINDOW_STEP_IMPLEMENTED: + + def get_window_bounds( + self, + num_values: int = 0, + min_periods: Optional[int] = None, + center: Optional[bool] = None, + closed: Optional[str] = None, + step: Optional[int] = None, + ) -> Tuple[np.ndarray, np.ndarray]: + return self._get_window_bounds(num_values, min_periods, center, closed) + + else: + + def get_window_bounds( + self, + num_values: int = 0, + min_periods: Optional[int] = None, + center: Optional[bool] = None, + closed: Optional[str] = None, + ) -> Tuple[np.ndarray, np.ndarray]: + return self._get_window_bounds(num_values, min_periods, center, closed) + def map_on_each_group( partitioned_group: pd.DataFrame,