Skip to content

Commit

Permalink
Implement Series.factorize() (#1972)
Browse files Browse the repository at this point in the history
ref #1929
```
        >>> kser = ks.Series(['b', None, 'a', 'c', 'b'])
        >>> codes, uniques = kser.factorize()
        >>> codes
        0    1
        1   -1
        2    0
        3    2
        4    1
        dtype: int64
        >>> uniques
        Index(['a', 'b', 'c'], dtype='object')

        >>> codes, uniques = kser.factorize(na_sentinel=None)
        >>> codes
        0    1
        1    3
        2    0
        3    2
        4    1
        dtype: int64
        >>> uniques
        Index(['a', 'b', 'c', None], dtype='object')

        >>> codes, uniques = kser.factorize(na_sentinel=-2)
        >>> codes
        0    1
        1   -2
        2    0
        3    2
        4    1
        dtype: int64
        >>> uniques
        Index(['a', 'b', 'c'], dtype='object')
```
  • Loading branch information
xinrong-meng authored Jan 13, 2021
1 parent 3cde582 commit ce2d260
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 1 deletion.
1 change: 0 additions & 1 deletion databricks/koalas/missing/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class MissingPandasLikeSeries(object):
convert_dtypes = _unsupported_function("convert_dtypes")
cov = _unsupported_function("cov")
ewm = _unsupported_function("ewm")
factorize = _unsupported_function("factorize")
first = _unsupported_function("first")
infer_objects = _unsupported_function("infer_objects")
interpolate = _unsupported_function("interpolate")
Expand Down
133 changes: 133 additions & 0 deletions databricks/koalas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from collections.abc import Mapping
from distutils.version import LooseVersion
from functools import partial, wraps, reduce
from itertools import chain
from typing import Any, Generic, Iterable, List, Optional, Tuple, TypeVar, Union, cast

import matplotlib
Expand Down Expand Up @@ -1901,6 +1902,138 @@ def _fillna(self, value=None, method=None, axis=None, limit=None, part_cols=()):
)
)._kser_for(self._column_label)

def factorize(
self, sort: bool = True, na_sentinel: Optional[int] = -1
) -> Tuple["Series", pd.Index]:
"""
Encode the object as an enumerated type or categorical variable.
This method is useful for obtaining a numeric representation of an
array when all that matters is identifying distinct values.
Parameters
----------
sort : bool, default True
na_sentinel : int or None, default -1
Value to mark "not found". If None, will not drop the NaN
from the uniques of the values.
Returns
-------
codes : Series
A Series that's an indexer into `uniques`.
``uniques.take(codes)`` will have the same values as `values`.
uniques : pd.Index
The unique valid values.
.. note ::
Even if there's a missing value in `values`, `uniques` will
*not* contain an entry for it.
Examples
--------
>>> kser = ks.Series(['b', None, 'a', 'c', 'b'])
>>> codes, uniques = kser.factorize()
>>> codes
0 1
1 -1
2 0
3 2
4 1
dtype: int32
>>> uniques
Index(['a', 'b', 'c'], dtype='object')
>>> codes, uniques = kser.factorize(na_sentinel=None)
>>> codes
0 1
1 3
2 0
3 2
4 1
dtype: int32
>>> uniques
Index(['a', 'b', 'c', None], dtype='object')
>>> codes, uniques = kser.factorize(na_sentinel=-2)
>>> codes
0 1
1 -2
2 0
3 2
4 1
dtype: int32
>>> uniques
Index(['a', 'b', 'c'], dtype='object')
"""
assert (na_sentinel is None) or isinstance(na_sentinel, int)
assert sort is True
uniq_sdf = self._internal.spark_frame.select(self.spark.column).distinct()

# Check number of uniques and constructs sorted `uniques_list`
max_compute_count = get_option("compute.max_rows")
if max_compute_count is not None:
uniq_pdf = uniq_sdf.limit(max_compute_count + 1).toPandas()
if len(uniq_pdf) > max_compute_count:
raise ValueError(
"Current Series has more then {0} unique values. "
"Please set 'compute.max_rows' by using 'databricks.koalas.config.set_option' "
"to more than {0} rows. Note that, before changing the "
"'compute.max_rows', this operation is considerably expensive.".format(
max_compute_count
)
)
else:
uniq_pdf = uniq_sdf.toPandas()
# pandas takes both NaN and null in Spark to np.nan, so de-duplication is required
uniq_series = first_series(uniq_pdf).drop_duplicates()
uniques_list = uniq_series.tolist()
uniques_list = sorted(uniques_list, key=lambda x: (pd.isna(x), x))

# Constructs `unique_to_code` mapping non-na unique to code
unique_to_code = {}
if na_sentinel is not None:
na_sentinel_code = na_sentinel
code = 0
for unique in uniques_list:
if pd.isna(unique):
if na_sentinel is None:
na_sentinel_code = code
else:
unique_to_code[unique] = code
code += 1

kvs = list(
chain(*([(F.lit(unique), F.lit(code)) for unique, code in unique_to_code.items()]))
)

if len(kvs) == 0: # uniques are all missing values
new_scol = F.lit(na_sentinel_code)
else:
scol = self.spark.column
if isinstance(self.spark.data_type, (FloatType, DoubleType)):
cond = scol.isNull() | F.isnan(scol)
else:
cond = scol.isNull()
map_scol = F.create_map(kvs)

null_scol = F.when(cond, F.lit(na_sentinel_code))
new_scol = null_scol.otherwise(map_scol.getItem(scol))

internal = self._internal.with_new_columns(
[new_scol.alias(self._internal.data_spark_column_names[0])]
)

codes = first_series(DataFrame(internal))

if na_sentinel is not None:
# Drops the NaN from the uniques of the values
uniques_list = [x for x in uniques_list if not pd.isna(x)]

uniques = pd.Index(uniques_list)

return codes, uniques

def dropna(self, axis=0, inplace=False, **kwargs) -> Optional["Series"]:
"""
Return a new Series with missing values removed.
Expand Down
107 changes: 107 additions & 0 deletions databricks/koalas/tests/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2306,6 +2306,113 @@ def test_first_valid_index(self):
kser = ks.from_pandas(pser)
self.assert_eq(pser.first_valid_index(), kser.first_valid_index())

def test_factorize(self):
pser = pd.Series(["a", "b", "a", "b"])
kser = ks.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = kser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

pser = pd.Series([5, 1, 5, 1])
kser = ks.from_pandas(pser)
pcodes, puniques = (pser + 1).factorize(sort=True)
kcodes, kuniques = (kser + 1).factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

pser = pd.Series(["a", "b", "a", "b"], name="ser", index=["w", "x", "y", "z"])
kser = ks.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = kser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

pser = pd.Series(
["a", "b", "a", "b"], index=pd.MultiIndex.from_arrays([[4, 3, 2, 1], [1, 2, 3, 4]])
)
kser = ks.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = kser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

#
# Deals with None and np.nan
#
pser = pd.Series(["a", "b", "a", np.nan])
kser = ks.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = kser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

pser = pd.Series([1, None, 3, 2, 1])
kser = ks.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = kser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

pser = pd.Series(["a", None, "a"])
kser = ks.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = kser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

pser = pd.Series([None, np.nan])
kser = ks.from_pandas(pser)
pcodes, puniques = pser.factorize()
kcodes, kuniques = kser.factorize()
self.assert_eq(pcodes, kcodes.to_list())
# pandas: Float64Index([], dtype='float64')
self.assert_eq(pd.Index([]), kuniques)

pser = pd.Series([np.nan, np.nan])
kser = ks.from_pandas(pser)
pcodes, puniques = pser.factorize()
kcodes, kuniques = kser.factorize()
self.assert_eq(pcodes, kcodes.to_list())
# pandas: Float64Index([], dtype='float64')
self.assert_eq(pd.Index([]), kuniques)

#
# Deals with na_sentinel
#
# pandas >= 1.1.2 support na_sentinel=None
# pandas >= 0.24 support na_sentinel not to be -1
#
pd_below_1_1_2 = LooseVersion(pd.__version__) < LooseVersion("1.1.2")
pd_below_0_24 = LooseVersion(pd.__version__) < LooseVersion("0.24")

pser = pd.Series(["a", "b", "a", np.nan, None])
kser = ks.from_pandas(pser)

pcodes, puniques = pser.factorize(sort=True, na_sentinel=-2)
kcodes, kuniques = kser.factorize(na_sentinel=-2)
self.assert_eq([0, 1, 0, -2, -2] if pd_below_0_24 else pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

pcodes, puniques = pser.factorize(sort=True, na_sentinel=2)
kcodes, kuniques = kser.factorize(na_sentinel=2)
self.assert_eq([0, 1, 0, 2, 2] if pd_below_0_24 else pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

if not pd_below_1_1_2:
pcodes, puniques = pser.factorize(sort=True, na_sentinel=None)
kcodes, kuniques = kser.factorize(na_sentinel=None)
self.assert_eq(pcodes.tolist(), kcodes.to_list())
# puniques is Index(['a', 'b', nan], dtype='object')
self.assert_eq(ks.Index(["a", "b", None]), kuniques)

kser = ks.Series([1, 2, np.nan, 4, 5]) # Arrow takes np.nan as null
kser.loc[3] = np.nan # Spark takes np.nan as NaN
kcodes, kuniques = kser.factorize(na_sentinel=None)
pcodes, puniques = kser.to_pandas().factorize(sort=True, na_sentinel=None)
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)

def test_pad(self):
pser = pd.Series([np.nan, 2, 3, 4, np.nan, 6], name="x")
kser = ks.from_pandas(pser)
Expand Down
1 change: 1 addition & 0 deletions docs/source/reference/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ Reshaping, sorting, transposing
Series.explode
Series.repeat
Series.squeeze
Series.factorize

Combining / joining / merging
-----------------------------
Expand Down

0 comments on commit ce2d260

Please sign in to comment.