Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added combine first function #1950

Closed
90 changes: 90 additions & 0 deletions databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
from databricks.koalas.utils import (
align_diff_frames,
column_labels_level,
combine_frames,
default_session,
is_name_like_tuple,
is_name_like_value,
Expand Down Expand Up @@ -7247,6 +7248,95 @@ def append(

return cast(DataFrame, concat([self, other], ignore_index=ignore_index))

def combine_first(self, other: "DataFrame") -> "DataFrame":
itholic marked this conversation as resolved.
Show resolved Hide resolved
"""
Update null elements with value in the same location in `other`.

Combine two DataFrame objects by filling null values in one DataFrame
with non-null values from other DataFrame. The row and column indexes
of the resulting DataFrame will be the union of the two.

Parameters
----------
other : DataFrame
Provided DataFrame to use to fill null values.

Returns
-------
DataFrame

See Also
--------
DataFrame.combine : Perform series-wise operation on two DataFrames
using a given function.

Examples
--------
>>> from databricks.koalas.config import set_option, reset_option
>>> set_option("compute.ops_on_diff_frames", True)
>>> df1 = ks.DataFrame({'A': [None, 0], 'B': [None, 4]})
>>> df2 = ks.DataFrame({'A': [1, 1], 'B': [3, 3]})
>>> df1.combine_first(df2)
A B
0 1.0 3.0
1 0.0 4.0

Null values still persist if the location of that null value
does not exist in `other`

>>> df1 = ks.DataFrame({'A': [None, 0], 'B': [4, None]})
>>> df2 = ks.DataFrame({'B': [3, 3], 'C': [1, 1]}, index=[1, 2])
>>> df1.combine_first(df2)
A B C
0 NaN 4.0 NaN
1 0.0 3.0 1.0
2 NaN 3.0 1.0

>>> reset_option("compute.ops_on_diff_frames")
"""
if not isinstance(other, DataFrame):
raise ValueError("`combine_first` only allows `DataFrame` for parameter `other`")

if same_anchor(self, other):
return self

itholic marked this conversation as resolved.
Show resolved Hide resolved
update_columns = set(self._internal.column_labels).intersection(
set(other._internal.column_labels)
)
final_spark_columns = []

combined_df = combine_frames(self, other)
column_labels = combined_df._internal.column_labels
updated_sdf = combined_df._internal.spark_frame

for column_label in column_labels:
if column_label[1:] in update_columns:
if column_label[0] == "this":
column_name = self._internal.spark_column_name_for(column_label[1:])
final_spark_columns.append(column_name)

old_col = scol_for(updated_sdf, "__this_" + column_name)
new_col = scol_for(updated_sdf, ("__that_" + column_name))
cond = F.when(old_col.isNull(), new_col).otherwise(old_col).alias(column_name)
spark_columns = updated_sdf.columns
updated_sdf = updated_sdf.select(*spark_columns, cond)

else:
if column_label[0] == "this":
column_name = self._internal.spark_column_name_for(column_label[1:])
col = scol_for(updated_sdf, "__this_" + column_name)
else:
column_name = other._internal.spark_column_name_for(column_label[1:])
col = scol_for(updated_sdf, "__that_" + column_name)

spark_columns = updated_sdf.columns
updated_sdf = updated_sdf.select(*spark_columns, col.alias(column_name))
final_spark_columns.append(column_name)

updated_sdf = updated_sdf.select(*final_spark_columns)

return DataFrame(updated_sdf)

# TODO: add 'filter_func' and 'errors' parameter
def update(self, other: "DataFrame", join: str = "left", overwrite: bool = True) -> None:
"""
Expand Down
1 change: 0 additions & 1 deletion databricks/koalas/missing/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class _MissingPandasLikeDataFrame(object):
between_time = _unsupported_function("between_time")
boxplot = _unsupported_function("boxplot")
combine = _unsupported_function("combine")
combine_first = _unsupported_function("combine_first")
compare = _unsupported_function("compare")
convert_dtypes = _unsupported_function("convert_dtypes")
corrwith = _unsupported_function("corrwith")
Expand Down
22 changes: 22 additions & 0 deletions databricks/koalas/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2352,6 +2352,28 @@ def test_replace(self):
kdf.replace({("X", "B"): {0: 100, 4: 400}}), pdf.replace({("X", "B"): {0: 100, 4: 400}})
)

def test_combine_first(self):
pdf1 = pd.DataFrame({"A": [None, 0], "B": [4, None]})
pdf2 = pd.DataFrame({"C": [3, 3], "B": [1, 1]})

df1 = ks.from_pandas(pdf1)
df2 = ks.from_pandas(pdf2)

with option_context("compute.ops_on_diff_frames", True):
res = df1.combine_first(df2)
self.assert_eq(res, pdf1.combine_first(pdf2))

midx_pdf1 = pd.DataFrame({"A": [None, 0], "B": [None, 4]})
midx_pdf1.columns = pd.MultiIndex.from_tuples([("A", "willow"), ("B", "pine")])
midx_pdf2 = pd.DataFrame({"B": [3, 3], "C": [1, 1]})
midx_pdf2.columns = pd.MultiIndex.from_tuples([("B", "pine"), ("C", "oak")])

midx_df1 = ks.from_pandas(midx_pdf1)
midx_df2 = ks.from_pandas(midx_pdf2)
with option_context("compute.ops_on_diff_frames", True):
midx_res = midx_df1.combine_first(midx_df2)
self.assert_eq(midx_res, midx_pdf1.combine_first(midx_pdf2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have more examples for exception cases ??


def test_update(self):
# check base function
def get_data(left_columns=None, right_columns=None):
Expand Down