Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Series.factorize() #1972

Merged
merged 19 commits into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion databricks/koalas/missing/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class MissingPandasLikeSeries(object):
convert_dtypes = _unsupported_function("convert_dtypes")
cov = _unsupported_function("cov")
ewm = _unsupported_function("ewm")
factorize = _unsupported_function("factorize")
first = _unsupported_function("first")
infer_objects = _unsupported_function("infer_objects")
interpolate = _unsupported_function("interpolate")
Expand Down
129 changes: 129 additions & 0 deletions databricks/koalas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from collections.abc import Mapping
from distutils.version import LooseVersion
from functools import partial, wraps, reduce
from itertools import chain
from typing import Any, Generic, Iterable, List, Optional, Tuple, TypeVar, Union, cast

import matplotlib
Expand Down Expand Up @@ -1913,6 +1914,134 @@ def _fillna(self, value=None, method=None, axis=None, limit=None, part_cols=()):
)
)._kser_for(self._column_label)

def factorize(
self, sort: bool = True, na_sentinel: Optional[int] = -1
) -> Tuple["Series", pd.Index]:
"""
Encode the object as an enumerated type or categorical variable.
This method is useful for obtaining a numeric representation of an
array when all that matters is identifying distinct values.

Parameters
----------
sort : bool, default True
na_sentinel : int or None, default -1
Value to mark "not found". If None, will not drop the NaN
from the uniques of the values.

Returns
-------
codes : Series
A Series that's an indexer into `uniques`.
``uniques.take(codes)`` will have the same values as `values`.
uniques : Index
xinrong-meng marked this conversation as resolved.
Show resolved Hide resolved
The unique valid values.

.. note ::

Even if there's a missing value in `values`, `uniques` will
*not* contain an entry for it.

Examples
--------
>>> kser = ks.Series(['b', None, 'a', 'c', 'b'])
>>> codes, uniques = kser.factorize()
>>> codes
0 1
1 -1
2 0
3 2
4 1
dtype: int32
>>> uniques
Index(['a', 'b', 'c'], dtype='object')

>>> codes, uniques = kser.factorize(na_sentinel=None)
>>> codes
0 1
1 3
2 0
3 2
4 1
dtype: int32
>>> uniques
Index(['a', 'b', 'c', None], dtype='object')

>>> codes, uniques = kser.factorize(na_sentinel=-2)
>>> codes
0 1
1 -2
2 0
3 2
4 1
dtype: int32
>>> uniques
Index(['a', 'b', 'c'], dtype='object')
"""
assert (na_sentinel is None) or isinstance(na_sentinel, int)
assert sort is True
uniq_sdf = self._internal.spark_frame.select(self.spark.column).distinct()

# Check number of uniques and constructs sorted `uniques_list`
max_compute_count = get_option("compute.max_rows")
if max_compute_count is not None:
uniq_pdf = uniq_sdf.limit(max_compute_count + 1).toPandas()
if len(uniq_pdf) > max_compute_count:
raise ValueError(
"Current Series has more then {0} unique values. "
"Please set 'compute.max_rows' by using 'databricks.koalas.config.set_option' "
"to more than {0} rows. Note that, before changing the "
"'compute.max_rows', this operation is considerably expensive.".format(
max_compute_count
)
)
xinrong-meng marked this conversation as resolved.
Show resolved Hide resolved
else:
raise ValueError(
"Please set 'compute.max_rows' by using 'databricks.koalas.config.set_option' "
"to restrict the total number of unique values of the current Series."
"Note that, before changing the 'compute.max_rows', "
"this operation is considerably expensive."
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we should just collect all the data? cc @HyukjinKwon

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean by collect all the data?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just do toPandas() without limits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! Modified to toPandas() without limits for now.


uniques_list = first_series(uniq_pdf).tolist()
uniques_list = sorted(uniques_list, key=lambda x: (pd.isna(x), x))

# Constructs `unique_to_code` mapping non-na unique to code
unique_to_code = {}
if na_sentinel is not None:
na_sentinel_code = na_sentinel
code = 0
for unique in uniques_list:
if pd.isna(unique):
if na_sentinel is None:
na_sentinel_code = code
else:
unique_to_code[unique] = code
code += 1

kvs = list(
chain(*([(F.lit(unique), F.lit(code)) for unique, code in unique_to_code.items()]))
)

map_scol = F.create_map(kvs)
null_scol = F.when(self.spark.column.isNull(), F.lit(na_sentinel_code))
xinrong-meng marked this conversation as resolved.
Show resolved Hide resolved
mapped_scol = map_scol.getItem(self.spark.column)

new_col = verify_temp_column_name(self.to_frame(), "__new_col__")
internal = self._internal.with_new_columns(
[null_scol.otherwise(mapped_scol).alias(new_col)]
)

codes = first_series(DataFrame(internal))

if na_sentinel is not None:
# Drops the NaN from the uniques of the values
uniques_list = [x for x in uniques_list if not pd.isna(x)]

uniques = pd.Index(uniques_list)

return codes, uniques

def dropna(self, axis=0, inplace=False, **kwargs) -> Optional["Series"]:
"""
Return a new Series with missing values removed.
Expand Down
108 changes: 108 additions & 0 deletions databricks/koalas/tests/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2283,6 +2283,114 @@ def test_first_valid_index(self):
kser = ks.from_pandas(pser)
self.assert_eq(pser.first_valid_index(), kser.first_valid_index())

def test_factorize(self):
pser = pd.Series(["a", "b", "a", "b"])
kser = ks.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = kser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

pser = pd.Series([5, 1, 5, 1])
kser = ks.from_pandas(pser)
pcodes, puniques = (pser + 1).factorize(sort=True)
kcodes, kuniques = (kser + 1).factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

pser = pd.Series(["a", "b", "a", "b"], name="ser")
kser = ks.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = kser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

pser = pd.Series(["a", "b", "a", "b"], index=["w", "x", "y", "z"])
kser = ks.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = kser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

pser = pd.Series(
["a", "b", "a", "b"], index=pd.MultiIndex.from_arrays([[4, 3, 2, 1], [1, 2, 3, 4]])
)
kser = ks.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = kser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

#
# Deals with None and np.nan
#
pser = pd.Series(["a", "b", "a", np.nan])
kser = ks.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = kser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

pser = pd.Series([1, None, 3, 2, 1])
kser = ks.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = kser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

pser = pd.Series(["a", None, "a"])
kser = ks.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = kser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

pser = pd.Series([None, np.nan])
kser = ks.from_pandas(pser)
kcodes, kuniques = kser.factorize()
# pandas: [-1, -1]
self.assert_eq(["-1", "-1"], kcodes.to_list())
# pandas: Float64Index([], dtype='float64')
self.assert_eq(pd.Index([]), kuniques)

pser = pd.Series([np.nan, np.nan])
kser = ks.from_pandas(pser)
kcodes, kuniques = kser.factorize()
# pandas: [-1, -1]
self.assert_eq(["-1", "-1"], kcodes.to_list())
# pandas: Float64Index([], dtype='float64')
self.assert_eq(pd.Index([]), kuniques)

#
# Deals with na_sentinel
#
is_lower_pandas_version = LooseVersion(pd.__version__) < LooseVersion("1.1.2")

pser = pd.Series(["a", "b", "a", np.nan, None])
kser = ks.from_pandas(pser)

pcodes, puniques = pser.factorize(sort=True, na_sentinel=-2)
kcodes, kuniques = kser.factorize(na_sentinel=-2)

self.assert_eq(
[0, 1, 0, -2, -2] if is_lower_pandas_version else pcodes.tolist(), kcodes.to_list()
xinrong-meng marked this conversation as resolved.
Show resolved Hide resolved
)
self.assert_eq(puniques, kuniques)

pcodes, puniques = pser.factorize(sort=True, na_sentinel=2)
kcodes, kuniques = kser.factorize(na_sentinel=2)
self.assert_eq(
[0, 1, 0, 2, 2] if is_lower_pandas_version else pcodes.tolist(), kcodes.to_list()
)
self.assert_eq(puniques, kuniques)
xinrong-meng marked this conversation as resolved.
Show resolved Hide resolved

if not is_lower_pandas_version:
pcodes, puniques = pser.factorize(sort=True, na_sentinel=None)
kcodes, kuniques = kser.factorize(na_sentinel=None)
self.assert_eq(pcodes.tolist(), kcodes.to_list())
# puniques is Index(['a', 'b', nan], dtype='object')
self.assert_eq(ks.Index(["a", "b", None]), kuniques)
xinrong-meng marked this conversation as resolved.
Show resolved Hide resolved

def test_pad(self):
pser = pd.Series([np.nan, 2, 3, 4, np.nan, 6], name="x")
kser = ks.from_pandas(pser)
Expand Down
1 change: 1 addition & 0 deletions docs/source/reference/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ Reshaping, sorting, transposing
Series.explode
Series.repeat
Series.squeeze
Series.factorize

Combining / joining / merging
-----------------------------
Expand Down