Skip to content

Commit

Permalink
Implement Series.reindex()
Browse files Browse the repository at this point in the history
  • Loading branch information
LucasG0 committed Aug 31, 2020
1 parent 4345791 commit 2ac9745
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 58 deletions.
59 changes: 17 additions & 42 deletions databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,19 @@
from functools import partial, reduce
import sys
from itertools import zip_longest
from typing import Any, Optional, List, Tuple, Union, Generic, TypeVar, Iterable, Dict, Callable
from typing import (
Any,
Optional,
List,
Tuple,
Type,
Union,
Generic,
TypeVar,
Iterable,
Dict,
Callable,
)

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -457,6 +469,10 @@ def __init__(self, data=None, index=None, columns=None, dtype=None, copy=False):

self._internal_frame = internal

@property
def _constructor(self) -> Type["DataFrame"]:
return DataFrame

@property
def _ksers(self):
""" Return a dict of column label -> Series which anchors `self`. """
Expand Down Expand Up @@ -7813,47 +7829,6 @@ def reindex(
else:
return df

def _reindex_index(self, index, fill_value):
# When axis is index, we can mimic pandas' by a right outer join.
assert (
len(self._internal.index_spark_column_names) <= 1
), "Index should be single column or not set."

index_column = self._internal.index_spark_column_names[0]

kser = ks.Series(list(index))
labels = kser._internal.spark_frame.select(kser.spark.column.alias(index_column))
frame = self._internal.resolved_copy.spark_frame.drop(NATURAL_ORDER_COLUMN_NAME)

if fill_value is not None:
frame_index_column = verify_temp_column_name(frame, "__frame_index_column__")
frame = frame.withColumnRenamed(index_column, frame_index_column)

temp_fill_value = verify_temp_column_name(frame, "__fill_value__")
labels = labels.withColumn(temp_fill_value, F.lit(fill_value))

frame_index_scol = scol_for(frame, frame_index_column)
labels_index_scol = scol_for(labels, index_column)

joined_df = frame.join(labels, on=[frame_index_scol == labels_index_scol], how="right")
joined_df = joined_df.select(
labels_index_scol,
*[
F.when(
frame_index_scol.isNull() & labels_index_scol.isNotNull(),
scol_for(joined_df, temp_fill_value),
)
.otherwise(scol_for(joined_df, col))
.alias(col)
for col in self._internal.data_spark_column_names
]
)
else:
joined_df = frame.join(labels, on=index_column, how="right")

internal = self._internal.with_new_sdf(joined_df)
return DataFrame(internal)

def _reindex_columns(self, columns, fill_value):
level = self._internal.column_labels_level
if level > 1:
Expand Down
49 changes: 48 additions & 1 deletion databricks/koalas/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from collections.abc import Iterable
from distutils.version import LooseVersion
from functools import reduce
from typing import Optional, Union, List
from typing import Optional, Union, List, Type
import warnings

import numpy as np
Expand All @@ -41,6 +41,7 @@
scol_for,
validate_arguments_and_invoke_function,
validate_axis,
verify_temp_column_name,
)
from databricks.koalas.window import Rolling, Expanding

Expand All @@ -50,6 +51,11 @@ class Frame(object, metaclass=ABCMeta):
The base class for both DataFrame and Series.
"""

@property
@abstractmethod
def _constructor(self) -> Type["Frame"]:
pass

@property
@abstractmethod
def _internal(self) -> InternalFrame:
Expand Down Expand Up @@ -2459,6 +2465,47 @@ def ffill(self, axis=None, inplace=False, limit=None):
"""
return self.fillna(method="ffill", axis=axis, inplace=inplace, limit=limit)

def _reindex_index(self, index, fill_value):
# When axis is index, we can mimic pandas' by a right outer join.
assert (
len(self._internal.index_spark_column_names) <= 1
), "Index should be single column or not set."

index_column = self._internal.index_spark_column_names[0]

kser = ks.Series(list(index))
labels = kser._internal.spark_frame.select(kser.spark.column.alias(index_column))
frame = self._internal.resolved_copy.spark_frame.drop(NATURAL_ORDER_COLUMN_NAME)

if fill_value is not None:
frame_index_column = verify_temp_column_name(frame, "__frame_index_column__")
frame = frame.withColumnRenamed(index_column, frame_index_column)

temp_fill_value = verify_temp_column_name(frame, "__fill_value__")
labels = labels.withColumn(temp_fill_value, F.lit(fill_value))

frame_index_scol = scol_for(frame, frame_index_column)
labels_index_scol = scol_for(labels, index_column)

joined_df = frame.join(labels, on=[frame_index_scol == labels_index_scol], how="right")
joined_df = joined_df.select(
labels_index_scol,
*[
F.when(
frame_index_scol.isNull() & labels_index_scol.isNotNull(),
scol_for(joined_df, temp_fill_value),
)
.otherwise(scol_for(joined_df, col))
.alias(col)
for col in self._internal.data_spark_column_names
]
)
else:
joined_df = frame.join(labels, on=index_column, how="right")

internal = self._internal.with_new_sdf(joined_df)
return self._constructor(internal)

@property
def at(self):
return AtIndexer(self)
Expand Down
1 change: 0 additions & 1 deletion databricks/koalas/missing/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ class MissingPandasLikeSeries(object):
infer_objects = _unsupported_function("infer_objects")
interpolate = _unsupported_function("interpolate")
last = _unsupported_function("last")
reindex = _unsupported_function("reindex")
reindex_like = _unsupported_function("reindex_like")
rename_axis = _unsupported_function("rename_axis")
reorder_levels = _unsupported_function("reorder_levels")
Expand Down
158 changes: 144 additions & 14 deletions databricks/koalas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from collections.abc import Iterable
from distutils.version import LooseVersion
from functools import partial, wraps, reduce
from typing import Any, Generic, List, Optional, Tuple, TypeVar, Union
from typing import Any, Generic, List, Optional, Tuple, Type, TypeVar, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -349,26 +349,33 @@ def __init__(self, data=None, index=None, dtype=None, name=None, copy=False, fas
super(Series, self).__init__(data)
self._column_label = index
else:
if isinstance(data, pd.Series):
assert index is None
assert dtype is None
assert name is None
assert not copy
assert not fastpath
s = data
if isinstance(data, InternalFrame):
internal = data
else:
s = pd.Series(
data=data, index=index, dtype=dtype, name=name, copy=copy, fastpath=fastpath
)
internal = InternalFrame.from_pandas(pd.DataFrame(s))
if s.name is None:
internal = internal.copy(column_labels=[None])
if isinstance(data, pd.Series):
assert index is None
assert dtype is None
assert name is None
assert not copy
assert not fastpath
s = data
else:
s = pd.Series(
data=data, index=index, dtype=dtype, name=name, copy=copy, fastpath=fastpath
)
internal = InternalFrame.from_pandas(pd.DataFrame(s))
if s.name is None:
internal = internal.copy(column_labels=[None])
anchor = DataFrame(internal)

super(Series, self).__init__(anchor)
self._column_label = anchor._internal.column_labels[0]
anchor._kseries = {self._column_label: self}

@property
def _constructor(self) -> Type["Series"]:
return Series

@property
def _internal(self) -> InternalFrame:
return self._kdf._internal.select_column(self._column_label)
Expand Down Expand Up @@ -1540,6 +1547,129 @@ def drop_duplicates(self, keep="first", inplace=False):
else:
return first_series(kdf)

def reindex(
self,
index: Optional[Any] = None,
copy: Optional[bool] = True,
fill_value: Optional[Any] = None,
) -> "Series":
"""
Conform Series to new index with optional filling logic, placing
NA/NaN in locations having no value in the previous index. A new object
is produced unless the new index is equivalent to the current one and
``copy=False``.
Parameters
----------
index: array-like, optional
New labels / index to conform to, should be specified using keywords.
Preferably an Index object to avoid duplicating data
copy : bool, default True
Return a new object, even if the passed indexes are the same.
fill_value : scalar, default np.NaN
Value to use for missing values. Defaults to NaN, but can be any
"compatible" value.
Returns
-------
Series with changed index.
See Also
--------
Series.reset_index : Remove row labels or move them to new columns.
Examples
--------
Create a series with some fictional data.
>>> index = ['Firefox', 'Chrome', 'Safari', 'IE10', 'Konqueror']
>>> ser = ks.Series([200, 200, 404, 404, 301],
... index=index, name='http_status')
>>> ser
Firefox 200
Chrome 200
Safari 404
IE10 404
Konqueror 301
Name: http_status, dtype: int64
Create a new index and reindex the dataframe. By default
values in the new index that do not have corresponding
records in the dataframe are assigned ``NaN``.
>>> new_index= ['Safari', 'Iceweasel', 'Comodo Dragon', 'IE10',
... 'Chrome']
>>> ser.reindex(new_index).sort_index()
Chrome 200.0
Comodo Dragon NaN
IE10 404.0
Iceweasel NaN
Safari 404.0
Name: http_status, dtype: float64
We can fill in the missing values by passing a value to
the keyword ``fill_value``.
>>> ser.reindex(new_index, fill_value=0, copy=False).sort_index()
Chrome 200
Comodo Dragon 0
IE10 404
Iceweasel 0
Safari 404
Name: http_status, dtype: int64
To further illustrate the filling functionality in
``reindex``, we will create a series with a
monotonically increasing index (for example, a sequence
of dates).
>>> date_index = pd.date_range('1/1/2010', periods=6, freq='D')
>>> ser2 = ks.Series([100, 101, np.nan, 100, 89, 88],
... name='prices', index=date_index)
>>> ser2.sort_index()
2010-01-01 100.0
2010-01-02 101.0
2010-01-03 NaN
2010-01-04 100.0
2010-01-05 89.0
2010-01-06 88.0
Name: prices, dtype: float64
Suppose we decide to expand the dataframe to cover a wider
date range.
>>> date_index2 = pd.date_range('12/29/2009', periods=10, freq='D')
>>> ser2.reindex(date_index2).sort_index()
2009-12-29 NaN
2009-12-30 NaN
2009-12-31 NaN
2010-01-01 100.0
2010-01-02 101.0
2010-01-03 NaN
2010-01-04 100.0
2010-01-05 89.0
2010-01-06 88.0
2010-01-07 NaN
Name: prices, dtype: float64
"""
if index is not None and not is_list_like(index):
raise TypeError(
"Index must be called with a collection of some kind, "
"%s was passed" % type(index)
)

ser = self

if index is not None:
ser = ser._reindex_index(index, fill_value)

# Copy
if copy and ser is self:
return ser.copy()
else:
return ser

def fillna(self, value=None, method=None, axis=None, inplace=False, limit=None):
"""Fill NA/NaN values.
Expand Down
23 changes: 23 additions & 0 deletions databricks/koalas/tests/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,29 @@ def test_drop_duplicates(self):
self.assert_eq(kser.sort_index(), pser.sort_index())
self.assert_eq(kdf, pdf)

def test_reindex(self):
index = ["A", "B", "C", "D", "E"]
pser = pd.Series([1.0, 2.0, 3.0, 4.0, None], index=index, name="x")
kser = ks.from_pandas(pser)

self.assert_eq(pser, kser)

self.assert_eq(
pser.reindex(["A", "B"]).sort_index(), kser.reindex(["A", "B"]).sort_index(),
)

self.assert_eq(
pser.reindex(["A", "B", "2", "3"]).sort_index(),
kser.reindex(["A", "B", "2", "3"]).sort_index(),
)

self.assert_eq(
pser.reindex(["A", "E", "2"], fill_value=0).sort_index(),
kser.reindex(["A", "E", "2"], fill_value=0).sort_index(),
)

self.assertRaises(TypeError, lambda: kser.reindex(index=123))

def test_fillna(self):
pdf = pd.DataFrame({"x": [np.nan, 2, 3, 4, np.nan, 6], "y": [np.nan, 2, 3, 4, np.nan, 6]})
kdf = ks.from_pandas(pdf)
Expand Down
1 change: 1 addition & 0 deletions docs/source/reference/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ Reindexing / Selection / Label manipulation
Series.idxmin
Series.isin
Series.rename
Series.reindex
Series.reset_index
Series.sample
Series.take
Expand Down

0 comments on commit 2ac9745

Please sign in to comment.