Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/cluster-upstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ services:
command: dask-scheduler
environment:
USE_MAMBA: "true"
EXTRA_CONDA_PACKAGES: "dask/label/dev::dask cloudpickle>=2.1.0"
# TODO: remove pandas constraint once Dask images are updated
EXTRA_CONDA_PACKAGES: "dask/label/dev::dask cloudpickle>=2.1.0 pandas>=1.5.0"
ports:
- "8786:8786"
dask-worker:
Expand All @@ -16,6 +17,7 @@ services:
command: dask-worker dask-scheduler:8786
environment:
USE_MAMBA: "true"
EXTRA_CONDA_PACKAGES: "dask/label/dev::dask cloudpickle>=2.1.0 pyarrow>=6.0.1 libstdcxx-ng>=12.1.0"
# TODO: remove pandas constraint once Dask images are updated
EXTRA_CONDA_PACKAGES: "dask/label/dev::dask cloudpickle>=2.1.0 pyarrow>=6.0.1 libstdcxx-ng>=12.1.0 pandas>=1.5.0"
volumes:
- /tmp:/tmp
3 changes: 2 additions & 1 deletion .github/cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
command: dask-worker dask-scheduler:8786
environment:
USE_MAMBA: "true"
EXTRA_CONDA_PACKAGES: "cloudpickle>=2.1.0 pyarrow>=6.0.1 libstdcxx-ng>=12.1.0"
# TODO: remove pandas constraint once Dask images are updated
EXTRA_CONDA_PACKAGES: "cloudpickle>=2.1.0 pyarrow>=6.0.1 libstdcxx-ng>=12.1.0 pandas>=1.5.0"
volumes:
- /tmp:/tmp
1 change: 1 addition & 0 deletions dask_sql/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

FLOAT_NAN_IMPLEMENTED = _pandas_version >= parseVersion("1.2.0")
INT_NAN_IMPLEMENTED = _pandas_version >= parseVersion("1.0.0")
INDEXER_WINDOW_STEP_IMPLEMENTED = _pandas_version >= parseVersion("1.5.0")

# TODO: remove if prompt-toolkit min version gets bumped
PIPE_INPUT_CONTEXT_MANAGER = _prompt_toolkit_version >= parseVersion("3.0.29")
26 changes: 25 additions & 1 deletion dask_sql/physical/rel/logical/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pandas as pd
from pandas.api.indexers import BaseIndexer

from dask_sql._compat import INDEXER_WINDOW_STEP_IMPLEMENTED
from dask_sql.datacontainer import ColumnContainer, DataContainer
from dask_sql.java import org
from dask_sql.physical.rel.base import BaseRelPlugin
Expand Down Expand Up @@ -121,7 +122,7 @@ class Indexer(BaseIndexer):
def __init__(self, start: int, end: int):
super().__init__(self, start=start, end=end)

def get_window_bounds(
def _get_window_bounds(
self,
num_values: int = 0,
min_periods: Optional[int] = None,
Expand Down Expand Up @@ -150,6 +151,29 @@ def get_window_bounds(
)
return start, end

if INDEXER_WINDOW_STEP_IMPLEMENTED:

def get_window_bounds(
self,
num_values: int = 0,
min_periods: Optional[int] = None,
center: Optional[bool] = None,
closed: Optional[str] = None,
step: Optional[int] = None,
) -> Tuple[np.ndarray, np.ndarray]:
return self._get_window_bounds(num_values, min_periods, center, closed)

else:

def get_window_bounds(
self,
num_values: int = 0,
min_periods: Optional[int] = None,
center: Optional[bool] = None,
closed: Optional[str] = None,
) -> Tuple[np.ndarray, np.ndarray]:
return self._get_window_bounds(num_values, min_periods, center, closed)


def map_on_each_group(
partitioned_group: pd.DataFrame,
Expand Down