Skip to content

Commit

Permalink
Implement expanding.groupby.count in Series and Frame
Browse files Browse the repository at this point in the history
  • Loading branch information
HyukjinKwon committed Nov 5, 2019
1 parent e4b7bed commit 3d4113d
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 19 deletions.
23 changes: 21 additions & 2 deletions databricks/koalas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -1651,8 +1651,27 @@ def nunique(self, dropna=True):
def rolling(self, *args, **kwargs):
return RollingGroupby(self)

def expanding(self, *args, **kwargs):
return ExpandingGroupby(self)
def expanding(self, min_periods=1):
"""
Return an expanding grouper, providing expanding
functionality per group.
.. note:: 'min_periods' in Koalas works as a fixed window size unlike pandas.
Unlike pandas, NA is also counted as the period. This might be changed
in the near future.
Parameters
----------
min_periods : int, default 1
Minimum number of observations in window required to have a value
(otherwise result is NA).
See Also
--------
Series.groupby
DataFrame.groupby
"""
return ExpandingGroupby(self, self._groupkeys, min_periods=min_periods)

def _reduce_for_stat_function(self, sfun, only_numeric):
groupkeys = self._groupkeys
Expand Down
8 changes: 0 additions & 8 deletions databricks/koalas/missing/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,10 @@ class _MissingPandasLikeRolling(object):
count = unsupported_function_rolling("count")
cov = unsupported_function_rolling("cov")
kurt = unsupported_function_rolling("kurt")
max = unsupported_function_rolling("max")
mean = unsupported_function_rolling("mean")
median = unsupported_function_rolling("median")
min = unsupported_function_rolling("min")
quantile = unsupported_function_rolling("quantile")
skew = unsupported_function_rolling("skew")
std = unsupported_function_rolling("std")
sum = unsupported_function_rolling("sum")
validate = unsupported_function_rolling("validate")
var = unsupported_function_rolling("var")

Expand Down Expand Up @@ -112,14 +108,10 @@ class _MissingPandasLikeRollingGroupby(object):
count = unsupported_function_rolling("count")
cov = unsupported_function_rolling("cov")
kurt = unsupported_function_rolling("kurt")
max = unsupported_function_rolling("max")
mean = unsupported_function_rolling("mean")
median = unsupported_function_rolling("median")
min = unsupported_function_rolling("min")
quantile = unsupported_function_rolling("quantile")
skew = unsupported_function_rolling("skew")
std = unsupported_function_rolling("std")
sum = unsupported_function_rolling("sum")
validate = unsupported_function_rolling("validate")
var = unsupported_function_rolling("var")

Expand Down
34 changes: 34 additions & 0 deletions databricks/koalas/tests/test_expanding.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pandas as pd

import databricks.koalas as ks
from databricks.koalas.testing.utils import ReusedSQLTestCase, TestUtils
Expand Down Expand Up @@ -57,3 +58,36 @@ def test_expanding_mean(self):

def test_expanding_sum(self):
self._test_expanding_func("sum")

def _test_groupby_expanding_func(self, f):
kser = ks.Series([1, 2, 3])
pser = kser.to_pandas()
self.assert_eq(
repr(getattr(kser.groupby(kser).expanding(2), f)()),
repr(getattr(pser.groupby(pser).expanding(2), f)()))

# Multiindex
kser = ks.Series(
[1, 2, 3],
index=pd.MultiIndex.from_tuples([('a', 'x'), ('a', 'y'), ('b', 'z')]))
pser = kser.to_pandas()
self.assert_eq(
repr(getattr(kser.groupby(kser).expanding(2), f)()),
repr(getattr(pser.groupby(pser).expanding(2), f)()))

kdf = ks.DataFrame({'a': [1, 2, 3, 2], 'b': [4.0, 2.0, 3.0, 1.0]})
pdf = kdf.to_pandas()
self.assert_eq(
repr(getattr(kdf.groupby(kdf.a).expanding(2), f)()),
repr(getattr(pdf.groupby(pdf.a).expanding(2), f)()))

# Multiindex column
kdf = ks.DataFrame({'a': [1, 2, 3, 2], 'b': [4.0, 2.0, 3.0, 1.0]})
kdf.columns = pd.MultiIndex.from_tuples([('a', 'x'), ('a', 'y')])
pdf = kdf.to_pandas()
self.assert_eq(
repr(getattr(kdf.groupby(kdf.a).expanding(2), f)()),
repr(getattr(pdf.groupby(pdf.a).expanding(2), f)()))

def test_groupby_expanding_count(self):
self._test_expanding_func("count")
139 changes: 130 additions & 9 deletions databricks/koalas/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
#
from functools import partial
from typing import Any
from functools import reduce

from databricks.koalas.internal import _InternalFrame, SPARK_INDEX_NAME_FORMAT
from databricks.koalas.utils import name_like_string
from pyspark.sql import Window
from pyspark.sql import functions as F
from databricks.koalas.missing.window import _MissingPandasLikeRolling, \
Expand Down Expand Up @@ -57,19 +60,17 @@ def __getattr__(self, item: str) -> Any:
class Expanding(_RollingAndExpanding):
def __init__(self, kdf_or_kser, min_periods=1):
from databricks.koalas import DataFrame, Series
from databricks.koalas.groupby import SeriesGroupBy, DataFrameGroupBy

if min_periods < 0:
raise ValueError("min_periods must be >= 0")
self._min_periods = min_periods
self.kdf_or_kser = kdf_or_kser
if not isinstance(kdf_or_kser, (DataFrame, Series, DataFrameGroupBy, SeriesGroupBy)):
if not isinstance(kdf_or_kser, (DataFrame, Series)):
raise TypeError(
"kdf_or_kser must be a series or dataframe; however, got: %s" % type(kdf_or_kser))
if isinstance(kdf_or_kser, (DataFrame, Series)):
index_scols = kdf_or_kser._internal.index_scols
self._window = Window.orderBy(index_scols).rowsBetween(
Window.unboundedPreceding, Window.currentRow)
index_scols = kdf_or_kser._internal.index_scols
self._window = Window.orderBy(index_scols).rowsBetween(
Window.unboundedPreceding, Window.currentRow)

def __getattr__(self, item: str) -> Any:
if hasattr(_MissingPandasLikeExpanding, item):
Expand All @@ -86,7 +87,7 @@ def __repr__(self):

def _apply_as_series_or_frame(self, func):
"""
Decorator that can wraps a function that handles Spark column in order
Wraps a function that handles Spark column in order
to support it in both Koalas Series and DataFrame.
Note that the given `func` name should be same as the API's method name.
Expand All @@ -97,7 +98,6 @@ def _apply_as_series_or_frame(self, func):
kser = self.kdf_or_kser
return kser._with_new_scol(func(kser._scol)).rename(kser.name)
elif isinstance(self.kdf_or_kser, DataFrame):
# TODO: deduplicate with other APIs in expanding.
kdf = self.kdf_or_kser
applied = []
for column in kdf.columns:
Expand Down Expand Up @@ -348,6 +348,31 @@ def mean(scol):


class ExpandingGroupby(Expanding):
def __init__(self, groupby, groupkeys, min_periods=1):
from databricks.koalas.groupby import SeriesGroupBy
from databricks.koalas.groupby import DataFrameGroupBy

if isinstance(groupby, SeriesGroupBy):
kdf = groupby._ks.to_frame()
elif isinstance(groupby, DataFrameGroupBy):
kdf = groupby._kdf
else:
raise TypeError(
"groupby must be a SeriesGroupBy or DataFrameGroupBy; "
"however, got: %s" % type(groupby))

super(ExpandingGroupby, self).__init__(kdf, min_periods)
self._groupby = groupby
# NOTE THAT this code intentionally uses `F.col` instead of `scol` in
# given series. This is because, in case of series, we convert it into
# DataFrame. So, if the given `groupkeys` is a series, they end up with
# being a different series.
self._window = self._window.partitionBy(
*[F.col(name_like_string(ser.name)) for ser in groupkeys])
self._groupkeys = groupkeys
# Current implementation reuses DataFrameGroupBy implementations for Series as well.
self.kdf = self.kdf_or_kser

def __getattr__(self, item: str) -> Any:
if hasattr(_MissingPandasLikeExpandingGroupby, item):
property_or_func = getattr(_MissingPandasLikeExpandingGroupby, item)
Expand All @@ -357,8 +382,104 @@ def __getattr__(self, item: str) -> Any:
return partial(property_or_func, self)
raise AttributeError(item)

def _apply_as_series_or_frame(self, func):
"""
Wraps a function that handles Spark column in order
to support it in both Koalas Series and DataFrame.
Note that the given `func` name should be same as the API's method name.
"""
from databricks.koalas import DataFrame
from databricks.koalas.series import _col
from databricks.koalas.groupby import SeriesGroupBy

kdf = self.kdf
sdf = self.kdf._sdf

# Here we need to include grouped key as an index, and shift previous index.
# [index_column0, index_column1] -> [grouped key, index_column0, index_column1]
new_index_scols = []
new_index_map = []
for groupkey in self._groupkeys:
new_index_scols.append(
# NOTE THAT this code intentionally uses `F.col` instead of `scol` in
# given series. This is because, in case of series, we convert it into
# DataFrame. So, if the given `groupkeys` is a series, they end up with
# being a different series.
F.col(
name_like_string(groupkey.name)
).alias(
SPARK_INDEX_NAME_FORMAT(len(new_index_scols))
))
new_index_map.append(
(SPARK_INDEX_NAME_FORMAT(len(new_index_map)),
groupkey._internal.column_index[0]))

for new_index_scol, index_map in zip(kdf._internal.index_scols, kdf._internal.index_map):
new_index_scols.append(
new_index_scol.alias(SPARK_INDEX_NAME_FORMAT(len(new_index_scols))))
_, name = index_map
new_index_map.append((SPARK_INDEX_NAME_FORMAT(len(new_index_map)), name))

applied = []
for column in kdf.columns:
applied.append(
kdf[column]._with_new_scol(
func(kdf[column]._scol)
).rename(kdf[column].name))

# Seems like pandas filters out when grouped key is NA.
cond = self._groupkeys[0]._scol.isNotNull()
for c in self._groupkeys:
cond = cond | c._scol.isNotNull()
sdf = sdf.select(new_index_scols + [c._scol for c in applied]).filter(cond)

internal = _InternalFrame(sdf=sdf,
data_columns=[c._internal.data_columns[0] for c in applied],
index_map=new_index_map)

ret = DataFrame(internal)
if isinstance(self._groupby, SeriesGroupBy):
return _col(ret)
else:
return ret

def count(self):
raise NotImplementedError("groupby.expanding().count() is currently not implemented yet.")
"""
The expanding count of any non-NaN observations inside the window.
Returns
-------
Series or DataFrame
Returned object type is determined by the caller of the expanding
calculation.
See Also
--------
Series.expanding : Calling object with Series data.
DataFrame.expanding : Calling object with DataFrames.
DataFrame.count : Count of the full DataFrame.
Examples
--------
>>> s = ks.Series([2, 3, float("nan"), 10])
>>> s.name = "col"
>>> s.groupby(s).expanding().count().sort_index() # doctest: +NORMALIZE_WHITESPACE
col
2.0 0 1.0
3.0 1 1.0
10.0 3 1.0
Name: col, dtype: float64
>>> df = s.to_frame()
>>> df.groupby(df.col).expanding().count().sort_index() # doctest: +NORMALIZE_WHITESPACE
col
col
2.0 0 1.0
3.0 1 1.0
10.0 3 1.0
"""
return super(ExpandingGroupby, self).count()

def sum(self):
raise NotImplementedError("groupby.expanding().sum() is currently not implemented yet.")
Expand Down

0 comments on commit 3d4113d

Please sign in to comment.