Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement expanding.groupby.count in Series and Frame #991

Merged
merged 1 commit into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions databricks/koalas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -1651,8 +1651,27 @@ def nunique(self, dropna=True):
def rolling(self, *args, **kwargs):
return RollingGroupby(self)

def expanding(self, *args, **kwargs):
return ExpandingGroupby(self)
def expanding(self, min_periods=1):
"""
Return an expanding grouper, providing expanding
functionality per group.
.. note:: 'min_periods' in Koalas works as a fixed window size unlike pandas.
Unlike pandas, NA is also counted as the period. This might be changed
in the near future.
Parameters
----------
min_periods : int, default 1
Minimum number of observations in window required to have a value
(otherwise result is NA).
See Also
--------
Series.groupby
DataFrame.groupby
"""
return ExpandingGroupby(self, self._groupkeys, min_periods=min_periods)

def _reduce_for_stat_function(self, sfun, only_numeric):
groupkeys = self._groupkeys
Expand Down
8 changes: 0 additions & 8 deletions databricks/koalas/missing/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,10 @@ class _MissingPandasLikeRolling(object):
count = unsupported_function_rolling("count")
cov = unsupported_function_rolling("cov")
kurt = unsupported_function_rolling("kurt")
max = unsupported_function_rolling("max")
mean = unsupported_function_rolling("mean")
median = unsupported_function_rolling("median")
min = unsupported_function_rolling("min")
quantile = unsupported_function_rolling("quantile")
skew = unsupported_function_rolling("skew")
std = unsupported_function_rolling("std")
sum = unsupported_function_rolling("sum")
validate = unsupported_function_rolling("validate")
var = unsupported_function_rolling("var")

Expand Down Expand Up @@ -112,14 +108,10 @@ class _MissingPandasLikeRollingGroupby(object):
count = unsupported_function_rolling("count")
cov = unsupported_function_rolling("cov")
kurt = unsupported_function_rolling("kurt")
max = unsupported_function_rolling("max")
mean = unsupported_function_rolling("mean")
median = unsupported_function_rolling("median")
min = unsupported_function_rolling("min")
quantile = unsupported_function_rolling("quantile")
skew = unsupported_function_rolling("skew")
std = unsupported_function_rolling("std")
sum = unsupported_function_rolling("sum")
validate = unsupported_function_rolling("validate")
var = unsupported_function_rolling("var")

Expand Down
34 changes: 34 additions & 0 deletions databricks/koalas/tests/test_expanding.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pandas as pd

import databricks.koalas as ks
from databricks.koalas.testing.utils import ReusedSQLTestCase, TestUtils
Expand Down Expand Up @@ -57,3 +58,36 @@ def test_expanding_mean(self):

def test_expanding_sum(self):
self._test_expanding_func("sum")

def _test_groupby_expanding_func(self, f):
kser = ks.Series([1, 2, 3])
pser = kser.to_pandas()
self.assert_eq(
repr(getattr(kser.groupby(kser).expanding(2), f)()),
repr(getattr(pser.groupby(pser).expanding(2), f)()))

# Multiindex
kser = ks.Series(
[1, 2, 3],
index=pd.MultiIndex.from_tuples([('a', 'x'), ('a', 'y'), ('b', 'z')]))
pser = kser.to_pandas()
self.assert_eq(
repr(getattr(kser.groupby(kser).expanding(2), f)()),
repr(getattr(pser.groupby(pser).expanding(2), f)()))

kdf = ks.DataFrame({'a': [1, 2, 3, 2], 'b': [4.0, 2.0, 3.0, 1.0]})
pdf = kdf.to_pandas()
self.assert_eq(
repr(getattr(kdf.groupby(kdf.a).expanding(2), f)()),
repr(getattr(pdf.groupby(pdf.a).expanding(2), f)()))

# Multiindex column
kdf = ks.DataFrame({'a': [1, 2, 3, 2], 'b': [4.0, 2.0, 3.0, 1.0]})
kdf.columns = pd.MultiIndex.from_tuples([('a', 'x'), ('a', 'y')])
pdf = kdf.to_pandas()
self.assert_eq(
repr(getattr(kdf.groupby(kdf.a).expanding(2), f)()),
repr(getattr(pdf.groupby(pdf.a).expanding(2), f)()))

def test_groupby_expanding_count(self):
self._test_expanding_func("count")
139 changes: 130 additions & 9 deletions databricks/koalas/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
#
from functools import partial
from typing import Any
from functools import reduce

from databricks.koalas.internal import _InternalFrame, SPARK_INDEX_NAME_FORMAT
from databricks.koalas.utils import name_like_string
from pyspark.sql import Window
from pyspark.sql import functions as F
from databricks.koalas.missing.window import _MissingPandasLikeRolling, \
Expand Down Expand Up @@ -57,19 +60,17 @@ def __getattr__(self, item: str) -> Any:
class Expanding(_RollingAndExpanding):
def __init__(self, kdf_or_kser, min_periods=1):
from databricks.koalas import DataFrame, Series
from databricks.koalas.groupby import SeriesGroupBy, DataFrameGroupBy

if min_periods < 0:
raise ValueError("min_periods must be >= 0")
self._min_periods = min_periods
self.kdf_or_kser = kdf_or_kser
if not isinstance(kdf_or_kser, (DataFrame, Series, DataFrameGroupBy, SeriesGroupBy)):
if not isinstance(kdf_or_kser, (DataFrame, Series)):
raise TypeError(
"kdf_or_kser must be a series or dataframe; however, got: %s" % type(kdf_or_kser))
if isinstance(kdf_or_kser, (DataFrame, Series)):
index_scols = kdf_or_kser._internal.index_scols
self._window = Window.orderBy(index_scols).rowsBetween(
Window.unboundedPreceding, Window.currentRow)
index_scols = kdf_or_kser._internal.index_scols
self._window = Window.orderBy(index_scols).rowsBetween(
Window.unboundedPreceding, Window.currentRow)

def __getattr__(self, item: str) -> Any:
if hasattr(_MissingPandasLikeExpanding, item):
Expand All @@ -86,7 +87,7 @@ def __repr__(self):

def _apply_as_series_or_frame(self, func):
"""
Decorator that can wraps a function that handles Spark column in order
Wraps a function that handles Spark column in order
to support it in both Koalas Series and DataFrame.
Note that the given `func` name should be same as the API's method name.
Expand All @@ -97,7 +98,6 @@ def _apply_as_series_or_frame(self, func):
kser = self.kdf_or_kser
return kser._with_new_scol(func(kser._scol)).rename(kser.name)
elif isinstance(self.kdf_or_kser, DataFrame):
# TODO: deduplicate with other APIs in expanding.
kdf = self.kdf_or_kser
applied = []
for column in kdf.columns:
Expand Down Expand Up @@ -348,6 +348,31 @@ def mean(scol):


class ExpandingGroupby(Expanding):
def __init__(self, groupby, groupkeys, min_periods=1):
from databricks.koalas.groupby import SeriesGroupBy
from databricks.koalas.groupby import DataFrameGroupBy

if isinstance(groupby, SeriesGroupBy):
kdf = groupby._ks.to_frame()
elif isinstance(groupby, DataFrameGroupBy):
kdf = groupby._kdf
else:
raise TypeError(
"groupby must be a SeriesGroupBy or DataFrameGroupBy; "
"however, got: %s" % type(groupby))

super(ExpandingGroupby, self).__init__(kdf, min_periods)
self._groupby = groupby
# NOTE THAT this code intentionally uses `F.col` instead of `scol` in
# given series. This is because, in case of series, we convert it into
# DataFrame. So, if the given `groupkeys` is a series, they end up with
# being a different series.
self._window = self._window.partitionBy(
*[F.col(name_like_string(ser.name)) for ser in groupkeys])
self._groupkeys = groupkeys
# Current implementation reuses DataFrameGroupBy implementations for Series as well.
self.kdf = self.kdf_or_kser

def __getattr__(self, item: str) -> Any:
if hasattr(_MissingPandasLikeExpandingGroupby, item):
property_or_func = getattr(_MissingPandasLikeExpandingGroupby, item)
Expand All @@ -357,8 +382,104 @@ def __getattr__(self, item: str) -> Any:
return partial(property_or_func, self)
raise AttributeError(item)

def _apply_as_series_or_frame(self, func):
"""
Wraps a function that handles Spark column in order
to support it in both Koalas Series and DataFrame.
Note that the given `func` name should be same as the API's method name.
"""
from databricks.koalas import DataFrame
from databricks.koalas.series import _col
from databricks.koalas.groupby import SeriesGroupBy

kdf = self.kdf
sdf = self.kdf._sdf

# Here we need to include grouped key as an index, and shift previous index.
# [index_column0, index_column1] -> [grouped key, index_column0, index_column1]
new_index_scols = []
new_index_map = []
for groupkey in self._groupkeys:
new_index_scols.append(
# NOTE THAT this code intentionally uses `F.col` instead of `scol` in
# given series. This is because, in case of series, we convert it into
# DataFrame. So, if the given `groupkeys` is a series, they end up with
# being a different series.
F.col(
name_like_string(groupkey.name)
).alias(
SPARK_INDEX_NAME_FORMAT(len(new_index_scols))
))
new_index_map.append(
(SPARK_INDEX_NAME_FORMAT(len(new_index_map)),
groupkey._internal.column_index[0]))

for new_index_scol, index_map in zip(kdf._internal.index_scols, kdf._internal.index_map):
new_index_scols.append(
new_index_scol.alias(SPARK_INDEX_NAME_FORMAT(len(new_index_scols))))
_, name = index_map
new_index_map.append((SPARK_INDEX_NAME_FORMAT(len(new_index_map)), name))

applied = []
for column in kdf.columns:
applied.append(
kdf[column]._with_new_scol(
func(kdf[column]._scol)
).rename(kdf[column].name))

# Seems like pandas filters out when grouped key is NA.
cond = self._groupkeys[0]._scol.isNotNull()
for c in self._groupkeys:
cond = cond | c._scol.isNotNull()
sdf = sdf.select(new_index_scols + [c._scol for c in applied]).filter(cond)

internal = _InternalFrame(sdf=sdf,
data_columns=[c._internal.data_columns[0] for c in applied],
index_map=new_index_map)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to preserve column_index?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me take a look and fix.


ret = DataFrame(internal)
if isinstance(self._groupby, SeriesGroupBy):
return _col(ret)
else:
return ret

def count(self):
raise NotImplementedError("groupby.expanding().count() is currently not implemented yet.")
"""
The expanding count of any non-NaN observations inside the window.
Returns
-------
Series or DataFrame
Returned object type is determined by the caller of the expanding
calculation.
See Also
--------
Series.expanding : Calling object with Series data.
DataFrame.expanding : Calling object with DataFrames.
DataFrame.count : Count of the full DataFrame.
Examples
--------
>>> s = ks.Series([2, 3, float("nan"), 10])
>>> s.name = "col"
>>> s.groupby(s).expanding().count().sort_index() # doctest: +NORMALIZE_WHITESPACE
col
2.0 0 1.0
3.0 1 1.0
10.0 3 1.0
Name: col, dtype: float64
>>> df = s.to_frame()
>>> df.groupby(df.col).expanding().count().sort_index() # doctest: +NORMALIZE_WHITESPACE
col
col
2.0 0 1.0
3.0 1 1.0
10.0 3 1.0
"""
return super(ExpandingGroupby, self).count()

def sum(self):
raise NotImplementedError("groupby.expanding().sum() is currently not implemented yet.")
Expand Down