diff --git a/databricks/koalas/missing/series.py b/databricks/koalas/missing/series.py index 3d4c42a979..b685737b5c 100644 --- a/databricks/koalas/missing/series.py +++ b/databricks/koalas/missing/series.py @@ -44,7 +44,6 @@ class MissingPandasLikeSeries(object): convert_dtypes = _unsupported_function("convert_dtypes") cov = _unsupported_function("cov") ewm = _unsupported_function("ewm") - factorize = _unsupported_function("factorize") first = _unsupported_function("first") infer_objects = _unsupported_function("infer_objects") interpolate = _unsupported_function("interpolate") diff --git a/databricks/koalas/series.py b/databricks/koalas/series.py index f07281d861..3d1bda0eed 100644 --- a/databricks/koalas/series.py +++ b/databricks/koalas/series.py @@ -24,6 +24,7 @@ from collections.abc import Mapping from distutils.version import LooseVersion from functools import partial, wraps, reduce +from itertools import chain from typing import Any, Generic, Iterable, List, Optional, Tuple, TypeVar, Union, cast import matplotlib @@ -1901,6 +1902,138 @@ def _fillna(self, value=None, method=None, axis=None, limit=None, part_cols=()): ) )._kser_for(self._column_label) + def factorize( + self, sort: bool = True, na_sentinel: Optional[int] = -1 + ) -> Tuple["Series", pd.Index]: + """ + Encode the object as an enumerated type or categorical variable. + This method is useful for obtaining a numeric representation of an + array when all that matters is identifying distinct values. + + Parameters + ---------- + sort : bool, default True + na_sentinel : int or None, default -1 + Value to mark "not found". If None, will not drop the NaN + from the uniques of the values. + + Returns + ------- + codes : Series + A Series that's an indexer into `uniques`. + ``uniques.take(codes)`` will have the same values as `values`. + uniques : pd.Index + The unique valid values. + + .. note :: + + Even if there's a missing value in `values`, `uniques` will + *not* contain an entry for it. + + Examples + -------- + >>> kser = ks.Series(['b', None, 'a', 'c', 'b']) + >>> codes, uniques = kser.factorize() + >>> codes + 0 1 + 1 -1 + 2 0 + 3 2 + 4 1 + dtype: int32 + >>> uniques + Index(['a', 'b', 'c'], dtype='object') + + >>> codes, uniques = kser.factorize(na_sentinel=None) + >>> codes + 0 1 + 1 3 + 2 0 + 3 2 + 4 1 + dtype: int32 + >>> uniques + Index(['a', 'b', 'c', None], dtype='object') + + >>> codes, uniques = kser.factorize(na_sentinel=-2) + >>> codes + 0 1 + 1 -2 + 2 0 + 3 2 + 4 1 + dtype: int32 + >>> uniques + Index(['a', 'b', 'c'], dtype='object') + """ + assert (na_sentinel is None) or isinstance(na_sentinel, int) + assert sort is True + uniq_sdf = self._internal.spark_frame.select(self.spark.column).distinct() + + # Check number of uniques and constructs sorted `uniques_list` + max_compute_count = get_option("compute.max_rows") + if max_compute_count is not None: + uniq_pdf = uniq_sdf.limit(max_compute_count + 1).toPandas() + if len(uniq_pdf) > max_compute_count: + raise ValueError( + "Current Series has more then {0} unique values. " + "Please set 'compute.max_rows' by using 'databricks.koalas.config.set_option' " + "to more than {0} rows. Note that, before changing the " + "'compute.max_rows', this operation is considerably expensive.".format( + max_compute_count + ) + ) + else: + uniq_pdf = uniq_sdf.toPandas() + # pandas takes both NaN and null in Spark to np.nan, so de-duplication is required + uniq_series = first_series(uniq_pdf).drop_duplicates() + uniques_list = uniq_series.tolist() + uniques_list = sorted(uniques_list, key=lambda x: (pd.isna(x), x)) + + # Constructs `unique_to_code` mapping non-na unique to code + unique_to_code = {} + if na_sentinel is not None: + na_sentinel_code = na_sentinel + code = 0 + for unique in uniques_list: + if pd.isna(unique): + if na_sentinel is None: + na_sentinel_code = code + else: + unique_to_code[unique] = code + code += 1 + + kvs = list( + chain(*([(F.lit(unique), F.lit(code)) for unique, code in unique_to_code.items()])) + ) + + if len(kvs) == 0: # uniques are all missing values + new_scol = F.lit(na_sentinel_code) + else: + scol = self.spark.column + if isinstance(self.spark.data_type, (FloatType, DoubleType)): + cond = scol.isNull() | F.isnan(scol) + else: + cond = scol.isNull() + map_scol = F.create_map(kvs) + + null_scol = F.when(cond, F.lit(na_sentinel_code)) + new_scol = null_scol.otherwise(map_scol.getItem(scol)) + + internal = self._internal.with_new_columns( + [new_scol.alias(self._internal.data_spark_column_names[0])] + ) + + codes = first_series(DataFrame(internal)) + + if na_sentinel is not None: + # Drops the NaN from the uniques of the values + uniques_list = [x for x in uniques_list if not pd.isna(x)] + + uniques = pd.Index(uniques_list) + + return codes, uniques + def dropna(self, axis=0, inplace=False, **kwargs) -> Optional["Series"]: """ Return a new Series with missing values removed. diff --git a/databricks/koalas/tests/test_series.py b/databricks/koalas/tests/test_series.py index 00d8cc8404..b3b62552f0 100644 --- a/databricks/koalas/tests/test_series.py +++ b/databricks/koalas/tests/test_series.py @@ -2306,6 +2306,113 @@ def test_first_valid_index(self): kser = ks.from_pandas(pser) self.assert_eq(pser.first_valid_index(), kser.first_valid_index()) + def test_factorize(self): + pser = pd.Series(["a", "b", "a", "b"]) + kser = ks.from_pandas(pser) + pcodes, puniques = pser.factorize(sort=True) + kcodes, kuniques = kser.factorize() + self.assert_eq(pcodes.tolist(), kcodes.to_list()) + self.assert_eq(puniques, kuniques) + + pser = pd.Series([5, 1, 5, 1]) + kser = ks.from_pandas(pser) + pcodes, puniques = (pser + 1).factorize(sort=True) + kcodes, kuniques = (kser + 1).factorize() + self.assert_eq(pcodes.tolist(), kcodes.to_list()) + self.assert_eq(puniques, kuniques) + + pser = pd.Series(["a", "b", "a", "b"], name="ser", index=["w", "x", "y", "z"]) + kser = ks.from_pandas(pser) + pcodes, puniques = pser.factorize(sort=True) + kcodes, kuniques = kser.factorize() + self.assert_eq(pcodes.tolist(), kcodes.to_list()) + self.assert_eq(puniques, kuniques) + + pser = pd.Series( + ["a", "b", "a", "b"], index=pd.MultiIndex.from_arrays([[4, 3, 2, 1], [1, 2, 3, 4]]) + ) + kser = ks.from_pandas(pser) + pcodes, puniques = pser.factorize(sort=True) + kcodes, kuniques = kser.factorize() + self.assert_eq(pcodes.tolist(), kcodes.to_list()) + self.assert_eq(puniques, kuniques) + + # + # Deals with None and np.nan + # + pser = pd.Series(["a", "b", "a", np.nan]) + kser = ks.from_pandas(pser) + pcodes, puniques = pser.factorize(sort=True) + kcodes, kuniques = kser.factorize() + self.assert_eq(pcodes.tolist(), kcodes.to_list()) + self.assert_eq(puniques, kuniques) + + pser = pd.Series([1, None, 3, 2, 1]) + kser = ks.from_pandas(pser) + pcodes, puniques = pser.factorize(sort=True) + kcodes, kuniques = kser.factorize() + self.assert_eq(pcodes.tolist(), kcodes.to_list()) + self.assert_eq(puniques, kuniques) + + pser = pd.Series(["a", None, "a"]) + kser = ks.from_pandas(pser) + pcodes, puniques = pser.factorize(sort=True) + kcodes, kuniques = kser.factorize() + self.assert_eq(pcodes.tolist(), kcodes.to_list()) + self.assert_eq(puniques, kuniques) + + pser = pd.Series([None, np.nan]) + kser = ks.from_pandas(pser) + pcodes, puniques = pser.factorize() + kcodes, kuniques = kser.factorize() + self.assert_eq(pcodes, kcodes.to_list()) + # pandas: Float64Index([], dtype='float64') + self.assert_eq(pd.Index([]), kuniques) + + pser = pd.Series([np.nan, np.nan]) + kser = ks.from_pandas(pser) + pcodes, puniques = pser.factorize() + kcodes, kuniques = kser.factorize() + self.assert_eq(pcodes, kcodes.to_list()) + # pandas: Float64Index([], dtype='float64') + self.assert_eq(pd.Index([]), kuniques) + + # + # Deals with na_sentinel + # + # pandas >= 1.1.2 support na_sentinel=None + # pandas >= 0.24 support na_sentinel not to be -1 + # + pd_below_1_1_2 = LooseVersion(pd.__version__) < LooseVersion("1.1.2") + pd_below_0_24 = LooseVersion(pd.__version__) < LooseVersion("0.24") + + pser = pd.Series(["a", "b", "a", np.nan, None]) + kser = ks.from_pandas(pser) + + pcodes, puniques = pser.factorize(sort=True, na_sentinel=-2) + kcodes, kuniques = kser.factorize(na_sentinel=-2) + self.assert_eq([0, 1, 0, -2, -2] if pd_below_0_24 else pcodes.tolist(), kcodes.to_list()) + self.assert_eq(puniques, kuniques) + + pcodes, puniques = pser.factorize(sort=True, na_sentinel=2) + kcodes, kuniques = kser.factorize(na_sentinel=2) + self.assert_eq([0, 1, 0, 2, 2] if pd_below_0_24 else pcodes.tolist(), kcodes.to_list()) + self.assert_eq(puniques, kuniques) + + if not pd_below_1_1_2: + pcodes, puniques = pser.factorize(sort=True, na_sentinel=None) + kcodes, kuniques = kser.factorize(na_sentinel=None) + self.assert_eq(pcodes.tolist(), kcodes.to_list()) + # puniques is Index(['a', 'b', nan], dtype='object') + self.assert_eq(ks.Index(["a", "b", None]), kuniques) + + kser = ks.Series([1, 2, np.nan, 4, 5]) # Arrow takes np.nan as null + kser.loc[3] = np.nan # Spark takes np.nan as NaN + kcodes, kuniques = kser.factorize(na_sentinel=None) + pcodes, puniques = kser.to_pandas().factorize(sort=True, na_sentinel=None) + self.assert_eq(pcodes.tolist(), kcodes.to_list()) + self.assert_eq(puniques, kuniques) + def test_pad(self): pser = pd.Series([np.nan, 2, 3, 4, np.nan, 6], name="x") kser = ks.from_pandas(pser) diff --git a/docs/source/reference/series.rst b/docs/source/reference/series.rst index fe60eda15e..400ba5a945 100644 --- a/docs/source/reference/series.rst +++ b/docs/source/reference/series.rst @@ -213,6 +213,7 @@ Reshaping, sorting, transposing Series.explode Series.repeat Series.squeeze + Series.factorize Combining / joining / merging -----------------------------