diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index 888621adfce..3fb6b8dc6d1 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -10,9 +10,9 @@ import numpy as np import ray -from .utils import _inherit_docstrings, _reindex_helper from .concat import concat from .index_metadata import _IndexMetadata +from .utils import _inherit_docstrings, _reindex_helper @_inherit_docstrings(pandas.core.groupby.DataFrameGroupBy, @@ -27,40 +27,21 @@ def __init__(self, df, by, axis, level, as_index, sort, group_keys, self._index = df.index self._axis = axis + self._df = df + self._by = by + self._level = level + self._as_index = as_index + self._sort = sort + self._group_keys = group_keys + self._squeeze = squeeze + self._row_metadata = df._row_metadata self._col_metadata = df._col_metadata if axis == 0: - partitions = [column for column in df._block_partitions.T] - self._index_grouped = \ - pandas.Series(self._index, index=self._index) \ - .groupby(by=by, sort=sort) + self._partitions = df._block_partitions.T else: - partitions = [row for row in df._block_partitions] - self._index_grouped = \ - pandas.Series(self._columns, index=self._columns) \ - .groupby(by=by, sort=sort) - - self._keys_and_values = [(k, v) - for k, v in self._index_grouped] - - if len(self) > 1: - self._grouped_partitions = \ - list(zip(*(groupby._submit(args=(by, - axis, - level, - as_index, - sort, - group_keys, - squeeze) - + tuple(part.tolist()), - num_return_vals=len(self)) - for part in partitions))) - else: - if axis == 0: - self._grouped_partitions = [df._col_partitions] - else: - self._grouped_partitions = [df._row_partitions] + self._partitions = df._block_partitions def __getattr__(self, key): """Afer regular attribute access, looks up the name in the columns @@ -81,48 +62,97 @@ def __getattr__(self, key): "github.com/ray-project/ray.") raise e + _index_grouped_cache = None + + @property + def _index_grouped(self): + if self._index_grouped_cache is None: + if self._axis == 0: + self._index_grouped_cache = pandas.Series( + np.zeros(len(self._index), dtype=np.uint8), + index=self._index).groupby(by=self._by, sort=self._sort) + else: + self._index_grouped_cache = pandas.Series( + np.zeros(len(self._columns), dtype=np.uint8), + index=self._columns).groupby(by=self._by, sort=self._sort) + + return self._index_grouped_cache + + _keys_and_values_cache = None + + @property + def _keys_and_values(self): + if self._keys_and_values_cache is None: + self._keys_and_values_cache = \ + [(k, v) for k, v in self._index_grouped] + return self._keys_and_values_cache + + @property + def _grouped_partitions(self): + + # It is expensive to put this multiple times, so let's just put it once + remote_by = ray.put(self._by) + + if len(self._index_grouped) > 1: + return zip(*(groupby._submit(args=(remote_by, + self._axis, + self._level, + self._as_index, + self._sort, + self._group_keys, + self._squeeze) + + tuple(part.tolist()), + num_return_vals=len( + self._index_grouped)) + for part in self._partitions)) + elif self._axis == 0: + return [self._df._col_partitions] + else: + return [self._df._row_partitions] + @property def _iter(self): from .dataframe import DataFrame if self._axis == 0: - return [(self._keys_and_values[i][0], + return ((self._keys_and_values[i][0], DataFrame(col_partitions=part, columns=self._columns, index=self._keys_and_values[i][1].index, col_metadata=self._col_metadata)) - for i, part in enumerate(self._grouped_partitions)] + for i, part in enumerate(self._grouped_partitions)) else: - return [(self._keys_and_values[i][0], + return ((self._keys_and_values[i][0], DataFrame(row_partitions=part, columns=self._keys_and_values[i][1].index, index=self._index, row_metadata=self._row_metadata)) - for i, part in enumerate(self._grouped_partitions)] + for i, part in enumerate(self._grouped_partitions)) @property def ngroups(self): return len(self) def skew(self, **kwargs): - return self._apply_agg_function(lambda df: df.skew(axis=self._axis, - **kwargs)) + return self._apply_agg_function( + lambda df: _skew_remote.remote(df, self._axis, kwargs)) def ffill(self, limit=None): return self._apply_df_function(lambda df: df.ffill(axis=self._axis, limit=limit)) def sem(self, ddof=1): - return self._apply_agg_function(lambda df: df.sem(axis=self._axis, - ddof=ddof)) + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def mean(self, *args, **kwargs): - return self._apply_agg_function(lambda df: df.mean(axis=self._axis, - *args, - **kwargs)) + return self._apply_agg_function( + lambda df: _mean_remote.remote(df, self._axis, kwargs, *args)) def any(self): - return self._apply_agg_function(lambda df: df.any(axis=self._axis)) + return self._apply_agg_function( + lambda df: _any_remote.remote(df, self._axis)) @property def plot(self): @@ -151,27 +181,13 @@ def groups(self): return {k: pandas.Index(v) for k, v in self._keys_and_values} def min(self, **kwargs): - return self._apply_agg_function(lambda df: df.min(axis=self._axis, - **kwargs)) + return self._apply_agg_function( + lambda df: _min_remote.remote(df, self._axis, kwargs)) def idxmax(self): - def idxmax_helper(df, index): - result = df.idxmax(axis=self._axis) - result = result.apply(lambda v: index[v]) - return result - - results = [idxmax_helper(g[1], i[1]) - for g, i in zip(self._iter, self._index_grouped)] - - new_df = concat(results, axis=1) - if self._axis == 0: - new_df = new_df.T - new_df.columns = self._columns - new_df.index = [k for k, v in self._iter] - else: - new_df.columns = [k for k, v in self._iter] - new_df.index = self._index - return new_df + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") @property def ndim(self): @@ -197,8 +213,9 @@ def indices(self): return dict(self._keys_and_values) def pct_change(self): - return self._apply_agg_function( - lambda df: df.pct_change(axis=self._axis)) + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def filter(self, func, dropna=True, *args, **kwargs): raise NotImplementedError( @@ -256,11 +273,12 @@ def apply_helper(df): def dtypes(self): if self._axis == 1: raise ValueError("Cannot call dtypes on groupby with axis=1") - return self._apply_agg_function(lambda df: df.dtypes) + return self._apply_agg_function(lambda df: _dtypes_remote.remote(df)) def first(self, **kwargs): - return self._apply_agg_function(lambda df: df.first(offset=0, - **kwargs)) + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def backfill(self, limit=None): return self.bfill(limit) @@ -280,33 +298,17 @@ def bfill(self, limit=None): limit=limit)) def idxmin(self): - def idxmin_helper(df, index): - result = df.idxmin(axis=self._axis) - result = result.apply(lambda v: index[v]) - return result - - results = [idxmin_helper(g[1], i[1]) - for g, i in zip(self._iter, self._index_grouped)] - - new_df = concat(results, axis=1) - if self._axis == 0: - new_df = new_df.T - new_df.columns = self._columns - new_df.index = [k for k, v in self._iter] - else: - new_df.columns = [k for k, v in self._iter] - new_df.index = self._index - return new_df + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def prod(self, **kwargs): - return self._apply_agg_function(lambda df: df.prod(axis=self._axis, - **kwargs)) + return self._apply_agg_function( + lambda df: _prod_remote.remote(df, self._axis, kwargs)) def std(self, ddof=1, *args, **kwargs): - return self._apply_agg_function(lambda df: df.std(axis=self._axis, - ddof=ddof, - *args, - **kwargs)) + return self._apply_agg_function( + lambda df: _std_remote.remote(df, self._axis, ddof, kwargs, *args)) def aggregate(self, arg, *args, **kwargs): if self._axis != 0: @@ -319,17 +321,18 @@ def aggregate(self, arg, *args, **kwargs): "This requires Multi-level index to be implemented. " "To contribute to Pandas on Ray, please visit " "github.com/ray-project/ray.") - return self._apply_agg_function(lambda df: df.agg(arg, - axis=self._axis, - *args, - **kwargs)) + return self._apply_agg_function( + lambda df: _agg_remote.remote(df, self._axis, arg, kwargs, *args)) def last(self, **kwargs): - return self._apply_df_function(lambda df: df.last(offset=0, - **kwargs)) + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def mad(self): - return self._apply_agg_function(lambda df: df.mad()) + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def rank(self): return self._apply_df_function(lambda df: df.rank(axis=self._axis)) @@ -346,14 +349,12 @@ def pad(self, limit=None): "github.com/ray-project/ray.") def max(self, **kwargs): - return self._apply_agg_function(lambda df: df.max(axis=self._axis, - **kwargs)) + return self._apply_agg_function( + lambda df: _max_remote.remote(df, self._axis, kwargs)) def var(self, ddof=1, *args, **kwargs): - return self._apply_agg_function(lambda df: df.var(ddof=ddof, - axis=self._axis, - *args, - **kwargs)) + return self._apply_agg_function( + lambda df: _var_remote.remote(df, self._axis, ddof, kwargs, *args)) def get_group(self, name, obj=None): raise NotImplementedError( @@ -361,17 +362,18 @@ def get_group(self, name, obj=None): "github.com/ray-project/ray.") def __len__(self): - return len(self._keys_and_values) + return len(self._index_grouped) - def all(self): - return self._apply_agg_function(lambda df: df.all()) + def all(self, **kwargs): + return self._apply_agg_function( + lambda df: _all_remote.remote(df, kwargs)) def size(self): - return self._apply_agg_function(lambda df: df.size) + return self._apply_agg_function(lambda df: _size_remote.remote(df)) def sum(self, **kwargs): - return self._apply_agg_function(lambda df: - df.sum(axis=self._axis, **kwargs)) + return self._apply_agg_function( + lambda df: _sum_remote.remote(df, self._axis, kwargs)) def __unicode__(self): raise NotImplementedError( @@ -393,8 +395,8 @@ def ngroup(self, ascending=True): return self._index_grouped.ngroup(ascending) def nunique(self, dropna=True): - return self._apply_agg_function(lambda df: df.nunique(dropna=dropna, - axis=self._axis)) + return self._apply_agg_function( + lambda df: _nunique_remote.remote(df, self._axis, dropna)) def resample(self, rule, *args, **kwargs): raise NotImplementedError( @@ -402,28 +404,13 @@ def resample(self, rule, *args, **kwargs): "github.com/ray-project/ray.") def median(self, **kwargs): - return self._apply_agg_function(lambda df: df.median(axis=self._axis, - **kwargs)) + return self._apply_agg_function( + lambda df: _median_remote.remote(df, self._axis, kwargs)) def head(self, n=5): - result = [v.head(n) for k, v in self._iter] - new_df = concat(result, axis=self._axis) - - if self._axis == 0: - index_head = [v[:n] for k, v in self._keys_and_values] - flattened_index = {i for j in index_head for i in j} - sorted_index = [i for i in self._index if i in flattened_index] - new_df._block_partitions = np.array([_reindex_helper._submit( - args=tuple([new_df.index, sorted_index, 1, - len(new_df._block_partitions)] + block.tolist()), - num_return_vals=len(new_df._block_partitions)) - for block in new_df._block_partitions.T]).T - new_df.index = sorted_index - new_df._row_metadata = \ - _IndexMetadata(new_df._block_partitions[:, 0], - index=new_df.index, axis=0) - - return new_df + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def cumprod(self, axis=0, *args, **kwargs): return self._apply_df_function(lambda df: df.cumprod(axis, @@ -437,7 +424,9 @@ def agg(self, arg, *args, **kwargs): return self.aggregate(arg, *args, **kwargs) def cov(self): - return self._apply_agg_function(lambda df: df.cov()) + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def transform(self, func, *args, **kwargs): return self._apply_df_function(lambda df: df.transform(func, @@ -445,15 +434,17 @@ def transform(self, func, *args, **kwargs): **kwargs)) def corr(self, **kwargs): - return self._apply_agg_function(lambda df: df.corr(**kwargs)) + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def fillna(self, **kwargs): return self._apply_df_function(lambda df: df.fillna(axis=self._axis, **kwargs)) def count(self, **kwargs): - return self._apply_agg_function(lambda df: df.count(self._axis, - **kwargs)) + return self._apply_agg_function( + lambda df: _count_remote.remote(df, self._axis, kwargs)) def pipe(self, func, *args, **kwargs): return com._pipe(self, func, *args, **kwargs) @@ -464,24 +455,9 @@ def cumcount(self, ascending=True): "github.com/ray-project/ray.") def tail(self, n=5): - result = [v.tail(n) for k, v in self._iter] - new_df = concat(result, axis=self._axis) - - if self._axis == 0: - index_tail = [v[-n:] for k, v in self._keys_and_values] - flattened_index = {i for j in index_tail for i in j} - sorted_index = [i for i in self._index if i in flattened_index] - new_df._block_partitions = np.array([_reindex_helper._submit( - args=tuple([new_df.index, sorted_index, 1, - len(new_df._block_partitions)] + block.tolist()), - num_return_vals=len(new_df._block_partitions)) - for block in new_df._block_partitions.T]).T - new_df.index = sorted_index - new_df._row_metadata = \ - _IndexMetadata(new_df._block_partitions[:, 0], - index=new_df.index, axis=0) - - return new_df + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") # expanding and rolling are unique cases and need to likely be handled # separately. They do not appear to be commonly used. @@ -507,9 +483,8 @@ def quantile(self, q=0.5, **kwargs): "To contribute to Pandas on Ray, please visit " "github.com/ray-project/ray.") - return self._apply_agg_function(lambda df: df.quantile(q=q, - axis=self._axis, - **kwargs)) + return self._apply_agg_function( + lambda df: _quantile_remote.remote(df, self._axis, q, kwargs)) def diff(self): raise NotImplementedError( @@ -519,20 +494,30 @@ def diff(self): def take(self, **kwargs): return self._apply_df_function(lambda df: df.take(**kwargs)) - def _apply_agg_function(self, f): + def _apply_agg_function(self, f, index=None): + """Perform aggregation and combine stages based on a given function. + + Args: + f: The function to apply to each group. f must be a remote + function. + + Returns: + A new combined DataFrame with the result of all groups. + """ assert callable(f), "\'{0}\' object is not callable".format(type(f)) - result = [f(v) for k, v in self._iter] - new_df = concat(result, axis=1) + blocks = np.array([[f(part) for part in group_of_parts] + for group_of_parts in self._grouped_partitions]) + from .dataframe import DataFrame if self._axis == 0: - new_df = new_df.T - new_df.columns = self._columns - new_df.index = [k for k, v in self._iter] + return DataFrame(block_partitions=blocks, columns=self._columns, + index=index if index is not None + else [k for k, _ in self._index_grouped]) else: - new_df.columns = [k for k, v in self._iter] - new_df.index = self._index - return new_df + return DataFrame(block_partitions=blocks.T, index=self._index, + columns=index if index is not None + else [k for k, _ in self._index_grouped]) def _apply_df_function(self, f, concat_axis=None): assert callable(f), "\'{0}\' object is not callable".format(type(f)) @@ -579,3 +564,144 @@ def groupby(by, axis, level, as_index, sort, group_keys, squeeze, *df): sort=sort, group_keys=group_keys, squeeze=squeeze)] + + +@ray.remote +def _sum_remote(df, axis, kwargs): + result = pandas.DataFrame(df.sum(axis=axis, **kwargs)) + if axis == 0: + return result.T + else: + return result + + +@ray.remote +def _skew_remote(df, axis, kwargs): + result = pandas.DataFrame(df.skew(axis, **kwargs)) + if axis == 0: + return result.T + else: + return result + + +@ray.remote +def _mean_remote(df, axis, kwargs, *args): + result = pandas.DataFrame(df.mean(axis, *args, **kwargs)) + if axis == 0: + return result.T + else: + return result + + +@ray.remote +def _any_remote(df, axis): + result = pandas.DataFrame(df.any(axis)) + if axis == 0: + return result.T + else: + return result + + +@ray.remote +def _min_remote(df, axis, kwargs): + result = pandas.DataFrame(df.min(axis, **kwargs)) + if axis == 0: + return result.T + else: + return result + + +@ray.remote +def _dtypes_remote(df): + return pandas.DataFrame(df.dtypes).T + + +@ray.remote +def _prod_remote(df, axis, kwargs): + result = pandas.DataFrame(df.prod(axis, **kwargs)) + if axis == 0: + return result.T + else: + return result + + +@ray.remote +def _std_remote(df, axis, ddof, kwargs, *args): + result = pandas.DataFrame(df.std(axis=axis, ddof=ddof, *args, **kwargs)) + if axis == 0: + return result.T + else: + return result + + +@ray.remote +def _max_remote(df, axis, kwargs): + result = pandas.DataFrame(df.max(axis=axis, **kwargs)) + if axis == 0: + return result.T + else: + return result + + +@ray.remote +def _var_remote(df, axis, ddof, kwargs, *args): + result = pandas.DataFrame(df.var(axis=axis, ddof=ddof, *args, **kwargs)) + if axis == 0: + return result.T + else: + return result + + +@ray.remote +def _all_remote(df, kwargs): + return pandas.DataFrame(df.all(**kwargs)).T + + +@ray.remote +def _size_remote(df): + return pandas.DataFrame(df.size).T + + +@ray.remote +def _nunique_remote(df, axis, dropna): + result = pandas.DataFrame(df.nunique(axis=axis, dropna=dropna)) + if axis == 0: + return result.T + else: + return result + + +@ray.remote +def _median_remote(df, axis, kwargs): + result = pandas.DataFrame(df.median(axis, **kwargs)) + if axis == 0: + return result.T + else: + return result + + +@ray.remote +def _count_remote(df, axis, kwargs): + result = pandas.DataFrame(df.count(axis, **kwargs)) + if axis == 0: + return result.T + else: + return result + + +@ray.remote +def _quantile_remote(df, axis, q, kwargs): + result = pandas.DataFrame(df.quantile(q=q, axis=axis, **kwargs)) + if axis == 0: + return result.T + else: + return result + + +@ray.remote +def _agg_remote(df, axis, arg, kwargs, *args): + result = pandas.DataFrame(df.agg(arg, axis=axis, *args, **kwargs)) + if axis == 0: + return result.T + else: + return result diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 492e7f13ea2..608d8f45c55 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -208,7 +208,7 @@ def test_large_row_groupby(): ray_df = from_pandas(pandas_df, 2) - by = pandas_df['A'].tolist() + by = [str(i) for i in pandas_df['A'].tolist()] n = 4 ray_groupby = ray_df.groupby(by=by) @@ -388,7 +388,8 @@ def test_min(ray_groupby, pandas_groupby): @pytest.fixture def test_idxmax(ray_groupby, pandas_groupby): - ray_df_equals_pandas(ray_groupby.idxmax(), pandas_groupby.idxmax()) + with pytest.raises(NotImplementedError): + ray_df_equals_pandas(ray_groupby.idxmax(), pandas_groupby.idxmax()) @pytest.fixture @@ -451,7 +452,8 @@ def test_bfill(ray_groupby, pandas_groupby): @pytest.fixture def test_idxmin(ray_groupby, pandas_groupby): - ray_df_equals_pandas(ray_groupby.idxmin(), pandas_groupby.idxmin()) + with pytest.raises(NotImplementedError): + ray_df_equals_pandas(ray_groupby.idxmin(), pandas_groupby.idxmin()) @pytest.fixture @@ -529,7 +531,8 @@ def test_median(ray_groupby, pandas_groupby): @pytest.fixture def test_head(ray_groupby, pandas_groupby, n): - ray_df_equals_pandas(ray_groupby.head(n=n), pandas_groupby.head(n=n)) + with pytest.raises(NotImplementedError): + ray_df_equals_pandas(ray_groupby.head(n=n), pandas_groupby.head(n=n)) @pytest.fixture @@ -575,7 +578,8 @@ def test_pipe(ray_groupby, pandas_groupby, func): @pytest.fixture def test_tail(ray_groupby, pandas_groupby, n): - ray_df_equals_pandas(ray_groupby.tail(n=n), pandas_groupby.tail(n=n)) + with pytest.raises(NotImplementedError): + ray_df_equals_pandas(ray_groupby.tail(n=n), pandas_groupby.tail(n=n)) @pytest.fixture