Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Index.map functionality #2136

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5ed61f2
Initial Index.map impl
awdavidson Apr 1, 2021
e2ac4f0
Initial Index.map impl
awdavidson Apr 1, 2021
ae9c3f3
Reformat
awdavidson Apr 1, 2021
1a55284
Add pd.Series compatability
awdavidson Apr 2, 2021
13706cc
Avoid collects
awdavidson Apr 2, 2021
f48da2e
Update impl
awdavidson Apr 4, 2021
1f794f7
Clean up impl and add docs
awdavidson Apr 5, 2021
18e37c0
reformat
awdavidson Apr 5, 2021
3499aa0
reformat
awdavidson Apr 5, 2021
a949400
reformat
awdavidson Apr 5, 2021
af72e24
Reformat
awdavidson Apr 6, 2021
97e03d3
Reformat
awdavidson Apr 6, 2021
7c1c678
Fix comment
awdavidson Apr 6, 2021
ced7d97
Remove unused import
awdavidson Apr 6, 2021
694650a
Update
awdavidson Apr 6, 2021
f845001
Merge branch 'master' of github.com:databricks/koalas into feature/im…
awdavidson Apr 7, 2021
136bd4c
Add categorical mapping
awdavidson Apr 10, 2021
06b250a
Reformat
awdavidson Apr 11, 2021
0c74b95
Remove print statement
awdavidson Apr 11, 2021
0bd6b3e
Remove unused import
awdavidson Apr 11, 2021
e52a4ca
Final tweaks
awdavidson Apr 11, 2021
289b573
Remove unused import
awdavidson Apr 11, 2021
f0697ee
minor cast tweaks
awdavidson Apr 11, 2021
8d206d9
Reformat
awdavidson Apr 11, 2021
ea8fc7f
Fix docstring
awdavidson Apr 12, 2021
7f78833
Fix docstring
awdavidson Apr 12, 2021
a6cd83e
Fix docstring
awdavidson Apr 12, 2021
b57224d
Fix docstring
awdavidson Apr 12, 2021
fe338c8
Fix docstring
awdavidson Apr 12, 2021
491a57a
reformat
awdavidson Apr 12, 2021
3568d0a
reformat
awdavidson Apr 12, 2021
e7957bd
fix docstring
awdavidson Apr 12, 2021
751e77d
Fix docstring
awdavidson Apr 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion databricks/koalas/indexes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#

from functools import partial
from typing import Any, List, Optional, Tuple, Union
from typing import Any, Callable, List, Optional, Tuple, Union
import warnings

import pandas as pd
Expand Down Expand Up @@ -507,6 +507,29 @@ def to_numpy(self, dtype=None, copy=False) -> np.ndarray:
result = result.copy()
return result

def map(
self,
mapper: Union[dict, Callable[[Any], Any], pd.Series],
return_type: ks.typedef.Dtype = str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, shall we import Dtype in the header if we still need it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, str is not Dtype.

Dtype = Union[np.dtype, ExtensionDtype]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix, I added the return_type parameter to be leveraged by the pandas_udf that is used to execute the transformation. Will think of a way to make this discoverable at run time removing the return_type parameter.

na_action: Any = None,
):
"""
Use to change Index values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about leveraging pandas docstring? It would be great to maintain the compatibility between Koalas APIs and pandas APIs.


Parameters
----------
mapper: dict, function or pd.Series
return_type: Dtype

Returns
-------
ks.Index
awdavidson marked this conversation as resolved.
Show resolved Hide resolved

"""
from databricks.koalas.indexes.extension import MapExtension

return MapExtension(index=self, na_action=na_action).map(mapper, return_type)

@property
def values(self) -> np.ndarray:
"""
Expand Down
144 changes: 144 additions & 0 deletions databricks/koalas/indexes/extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#
# Copyright (C) 2019 Databricks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from typing import Any, Callable, Union

import pandas as pd

from pyspark.sql.functions import pandas_udf, PandasUDFType

import databricks.koalas as ks
from databricks.koalas.indexes.base import Index
from databricks.koalas.internal import SPARK_DEFAULT_INDEX_NAME
from databricks.koalas.typedef.typehints import Dtype, as_spark_type


# TODO: Implement na_action similar functionality to pandas
# NB: Passing return_type into class cause Serialisation errors; instead pass at method level
class MapExtension:
def __init__(self, index, na_action: Any):
self._index = index
if na_action is not None:
raise NotImplementedError("Currently do not support na_action functionality")
else:
self._na_action = na_action

def map(
self, mapper: Union[dict, Callable[[Any], Any], pd.Series], return_type: Dtype
) -> Index:
"""
Single callable/entry point to map Index values

Parameters
----------
mapper: dict, function or pd.Series
return_type: Dtype

Returns
-------
ks.Index
awdavidson marked this conversation as resolved.
Show resolved Hide resolved

"""
if isinstance(mapper, dict):
idx = self._map_dict(mapper, return_type)
elif isinstance(mapper, pd.Series):
idx = self._map_series(mapper, return_type)
elif isinstance(mapper, ks.Series):
raise NotImplementedError("Currently do not support input of ks.Series in Index.map")
else:
idx = self._map_lambda(mapper, return_type)
return idx

def _map_dict(self, mapper: dict, return_type: Dtype) -> Index:
"""
Helper method that has been isolated to merely help map an Index when argument in dict type.

Parameters
----------
mapper: dict
Key-value pairs that are used to instruct mapping from index value to new value
return_type: Dtype
Data type of returned value

Returns
-------
ks.Index
awdavidson marked this conversation as resolved.
Show resolved Hide resolved

.. note:: Default return value for missing elements is the index's original value

"""

@pandas_udf(as_spark_type(return_type), PandasUDFType.SCALAR)
def pyspark_mapper(col):
return col.apply(lambda i: mapper.get(i, return_type(i)))

return self._index._with_new_scol(pyspark_mapper(SPARK_DEFAULT_INDEX_NAME))

def _map_series(self, mapper: pd.Series, return_type: Dtype) -> Index:
"""
Helper method that has been isolated to merely help map an Index
when argument in pandas.Series type.

Parameters
----------
mapper: pandas.Series
Series of (index, value) that is used to instruct mapping from index value to new value
return_type: Dtype
Data type of returned value

Returns
-------
ks.Index
awdavidson marked this conversation as resolved.
Show resolved Hide resolved

.. note:: Default return value for missing elements is the index's original value

"""
# TODO: clean up, maybe move somewhere else
def getOrElse(i):
try:
return mapper.loc[i]
except:
return return_type(i)

@pandas_udf(as_spark_type(return_type), PandasUDFType.SCALAR)
def pyspark_mapper(col):
return col.apply(lambda i: getOrElse(i))

return self._index._with_new_scol(pyspark_mapper(SPARK_DEFAULT_INDEX_NAME))

def _map_lambda(self, mapper: Callable[[Any], Any], return_type: Dtype) -> Index:
"""
Helper method that has been isolated to merely help map Index when the argument is a
generic lambda function.

Parameters
----------
mapper: Callable[[Any], Any]
Generic lambda function that is applied to index
return_type: Dtype
Data type of returned value

Returns
-------
ks.Index
awdavidson marked this conversation as resolved.
Show resolved Hide resolved

"""

@pandas_udf(as_spark_type(return_type), PandasUDFType.SCALAR)
def pyspark_mapper(col):
return col.apply(mapper)

return self._index._with_new_scol(pyspark_mapper(SPARK_DEFAULT_INDEX_NAME))
3 changes: 0 additions & 3 deletions databricks/koalas/missing/indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class MissingPandasLikeIndex(object):
is_ = _unsupported_function("is_")
is_lexsorted_for_tuple = _unsupported_function("is_lexsorted_for_tuple")
join = _unsupported_function("join")
map = _unsupported_function("map")
putmask = _unsupported_function("putmask")
ravel = _unsupported_function("ravel")
reindex = _unsupported_function("reindex")
Expand Down Expand Up @@ -132,7 +131,6 @@ class MissingPandasLikeCategoricalIndex(MissingPandasLikeIndex):
set_categories = _unsupported_function("set_categories", cls="CategoricalIndex")
as_ordered = _unsupported_function("as_ordered", cls="CategoricalIndex")
as_unordered = _unsupported_function("as_unordered", cls="CategoricalIndex")
map = _unsupported_function("map", cls="CategoricalIndex")


class MissingPandasLikeMultiIndex(object):
Expand Down Expand Up @@ -161,7 +159,6 @@ class MissingPandasLikeMultiIndex(object):
is_lexsorted = _unsupported_function("is_lexsorted")
is_lexsorted_for_tuple = _unsupported_function("is_lexsorted_for_tuple")
join = _unsupported_function("join")
map = _unsupported_function("map")
putmask = _unsupported_function("putmask")
ravel = _unsupported_function("ravel")
reindex = _unsupported_function("reindex")
Expand Down
41 changes: 41 additions & 0 deletions databricks/koalas/tests/indexes/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import numpy as np
import pandas as pd
from pandas.tseries.offsets import DateOffset
import pyspark

import databricks.koalas as ks
Expand Down Expand Up @@ -65,6 +66,46 @@ def test_index_basic(self):
self.assert_eq(kdf.index, pdf.index)
self.assert_eq(type(kdf.index).__name__, type(pdf.index).__name__)

def test_map(self):
kser = ks.Series([1, 2, 3], index=[1, 2, 3])

with self.assertRaisesRegex(
NotImplementedError, "Currently do not support input of ks.Series in Index.map"
):
kser.index.map(ks.Series(["one", "two", "three"], index=[1, 2, 3]))

# Apply series
self.assert_eq(
kser.index.map(pd.Series(["one", "2", "three"], index=[1, 2, 3])),
ks.Index(["one", "2", "three"]),
)
self.assert_eq(
kser.index.map(pd.Series(["one", "2"], index=[1, 2])), ks.Index(["one", "2", "3"]),
)

# Apply dict
self.assert_eq(
kser.index.map({1: "one", 2: "two", 3: "three"}), ks.Index(["one", "two", "three"])
)
self.assert_eq(kser.index.map({1: "one", 2: "two"}), ks.Index(["one", "two", "3"]))
self.assert_eq(kser.index.map({1: 10, 2: 20}, return_type=int), ks.Index([10, 20, 3]))

# Apply lambda
self.assert_eq(kser.index.map(lambda id: id + 1, return_type=int), ks.Index([2, 3, 4]))
self.assert_eq(
kser.index.map(lambda id: id + 1.1, return_type=float), ks.Index([2.1, 3.1, 4.1])
)
self.assert_eq(
kser.index.map(lambda id: "{id} + 1".format(id=id), str),
ks.Index(["1 + 1", "2 + 1", "3 + 1"]),
)

kser = ks.Series([1, 2, 3, 4], index=pd.date_range("2018-04-09", periods=4, freq="2D"))
self.assert_eq(
kser.index.map(lambda id: id + DateOffset(days=1), return_type=datetime),
ks.Series([1, 2, 3, 4], index=pd.date_range("2018-04-10", periods=4, freq="2D")).index,
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add tests with CategoricalIndex?
You can put the tests in test_categorical.py.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ueshin I am looking at adding tests to CategoricalIndex but have some unexpected behaviour. I wonder whether you can help explain.

The current implementation for Index.map leverages _with_new_scol to avoid collect anything to the driver. When you have a CategoricalIndex such as ks.CategoricalIndex(["a", "b", "c"]) the returned spark_frame is

+-----------------+-----------------+
|__index_level_0__|__natural_order__|
+-----------------+-----------------+
|                0|                0|
|                1|       8589934592|
|                2|      17179869184|
+-----------------+-----------------+

I was expecting

+-----------------+-----------------+
|__index_level_0__|__natural_order__|
+-----------------+-----------------+
|                a|                0|
|                b|       8589934592|
|                c|      17179869184|
+-----------------+-----------------+

Is my expectation incorrect?

This seems to be caused by https://github.com/databricks/koalas/blob/master/databricks/koalas/frame.py#L510

If you have a pdf = pd.DataFrame(index=pd.CategoricalIndex(["a", "b", "c"])) InternalFrame.from_pandas(pdf).spark_frame returns

+-----------------+-----------------+
|__index_level_0__|__natural_order__|
+-----------------+-----------------+
|                0|                0|
|                1|       8589934592|
|                2|      17179869184|
+-----------------+-----------------+

Where as if you have pdf = pd.DataFrame(index=pd.Index(["a", "b", "c"]) InternalFrame.from_pandas(pdf).spark_frame returns

+-----------------+-----------------+
|__index_level_0__|__natural_order__|
+-----------------+-----------------+
|                a|                0|
|                b|       8589934592|
|                c|      17179869184|
+-----------------+-----------------+

Apologies if this is a silly question. Thanks in advance

Copy link
Collaborator

@ueshin ueshin Apr 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior is expected since so far Koalas manages only Categorical 's codes in Spark and its categories are in the metadata, index_dtypes or data_dtypes.


def test_index_from_series(self):
pser = pd.Series([1, 2, 3], name="a", index=[10, 20, 30])
kser = ks.from_pandas(pser)
Expand Down
89 changes: 89 additions & 0 deletions databricks/koalas/tests/indexes/test_extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#
# Copyright (C) 2019 Databricks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from datetime import datetime

import pandas as pd
from pandas.tseries.offsets import DateOffset

import databricks.koalas as ks
from databricks.koalas.indexes.extension import MapExtension
from databricks.koalas.testing.utils import ReusedSQLTestCase, TestUtils


class MapExtensionTest(ReusedSQLTestCase, TestUtils):
@property
def kidx(self):
return ks.Index([1, 2, 3])

def test_na_action(self):
with self.assertRaisesRegex(
NotImplementedError, "Currently do not support na_action functionality"
):
MapExtension(self.kidx, "ignore")

def test_map_dict(self):
self.assert_eq(
MapExtension(self.kidx, None)._map_dict({1: "one", 2: "two", 3: "three"}, str),
ks.Index(["one", "two", "three"]),
)
self.assert_eq(
MapExtension(self.kidx, None)._map_dict({1: "one", 2: "two"}, str),
ks.Index(["one", "two", "3"]),
)
self.assert_eq(
MapExtension(self.kidx, None)._map_dict({1: 10, 2: 20}, int), ks.Index([10, 20, 3])
)

def test_map_series(self):
self.assert_eq(
MapExtension(self.kidx, None)._map_series(
pd.Series(["one", "2", "three"], index=[1, 2, 3]), str
),
ks.Index(["one", "2", "three"]),
)
self.assert_eq(
MapExtension(self.kidx, None)._map_series(pd.Series(["one", "2"], index=[1, 2]), str),
ks.Index(["one", "2", "3"]),
)

def test_map_lambda(self):
self.assert_eq(
MapExtension(self.kidx, None)._map_lambda(lambda id: id + 1, int), ks.Index([2, 3, 4])
)
self.assert_eq(
MapExtension(self.kidx, None)._map_lambda(lambda id: id + 1.1, float),
ks.Index([2.1, 3.1, 4.1]),
)
self.assert_eq(
MapExtension(self.kidx, None)._map_lambda(lambda id: "{id} + 1".format(id=id), str),
ks.Index(["1 + 1", "2 + 1", "3 + 1"]),
)
kser = ks.Series([1, 2, 3, 4], index=pd.date_range("2018-04-09", periods=4, freq="2D"))
self.assert_eq(
MapExtension(kser.index, None)._map_lambda(
lambda id: id + DateOffset(days=1), datetime
),
ks.Series([1, 2, 3, 4], index=pd.date_range("2018-04-10", periods=4, freq="2D")).index,
)

def test_map(self):
with self.assertRaisesRegex(
NotImplementedError, "Currently do not support input of ks.Series in Index.map"
):
MapExtension(self.kidx, None).map(
ks.Series(["one", "two", "three"], index=[1, 2, 3]), str
)