diff --git a/databricks/koalas/frame.py b/databricks/koalas/frame.py index ff02bfc1f0..5d31702f56 100644 --- a/databricks/koalas/frame.py +++ b/databricks/koalas/frame.py @@ -85,6 +85,7 @@ from databricks.koalas.utils import ( align_diff_frames, column_labels_level, + combine_frames, default_session, is_name_like_tuple, is_name_like_value, @@ -7247,6 +7248,95 @@ def append( return cast(DataFrame, concat([self, other], ignore_index=ignore_index)) + def combine_first(self, other: "DataFrame") -> "DataFrame": + """ + Update null elements with value in the same location in `other`. + + Combine two DataFrame objects by filling null values in one DataFrame + with non-null values from other DataFrame. The row and column indexes + of the resulting DataFrame will be the union of the two. + + Parameters + ---------- + other : DataFrame + Provided DataFrame to use to fill null values. + + Returns + ------- + DataFrame + + See Also + -------- + DataFrame.combine : Perform series-wise operation on two DataFrames + using a given function. + + Examples + -------- + >>> from databricks.koalas.config import set_option, reset_option + >>> set_option("compute.ops_on_diff_frames", True) + >>> df1 = ks.DataFrame({'A': [None, 0], 'B': [None, 4]}) + >>> df2 = ks.DataFrame({'A': [1, 1], 'B': [3, 3]}) + >>> df1.combine_first(df2) + A B + 0 1.0 3.0 + 1 0.0 4.0 + + Null values still persist if the location of that null value + does not exist in `other` + + >>> df1 = ks.DataFrame({'A': [None, 0], 'B': [4, None]}) + >>> df2 = ks.DataFrame({'B': [3, 3], 'C': [1, 1]}, index=[1, 2]) + >>> df1.combine_first(df2) + A B C + 0 NaN 4.0 NaN + 1 0.0 3.0 1.0 + 2 NaN 3.0 1.0 + + >>> reset_option("compute.ops_on_diff_frames") + """ + if not isinstance(other, DataFrame): + raise ValueError("`combine_first` only allows `DataFrame` for parameter `other`") + + if same_anchor(self, other): + return self + + update_columns = set(self._internal.column_labels).intersection( + set(other._internal.column_labels) + ) + final_spark_columns = [] + + combined_df = combine_frames(self, other) + column_labels = combined_df._internal.column_labels + updated_sdf = combined_df._internal.spark_frame + + for column_label in column_labels: + if column_label[1:] in update_columns: + if column_label[0] == "this": + column_name = self._internal.spark_column_name_for(column_label[1:]) + final_spark_columns.append(column_name) + + old_col = scol_for(updated_sdf, "__this_" + column_name) + new_col = scol_for(updated_sdf, ("__that_" + column_name)) + cond = F.when(old_col.isNull(), new_col).otherwise(old_col).alias(column_name) + spark_columns = updated_sdf.columns + updated_sdf = updated_sdf.select(*spark_columns, cond) + + else: + if column_label[0] == "this": + column_name = self._internal.spark_column_name_for(column_label[1:]) + col = scol_for(updated_sdf, "__this_" + column_name) + else: + column_name = other._internal.spark_column_name_for(column_label[1:]) + col = scol_for(updated_sdf, "__that_" + column_name) + + spark_columns = updated_sdf.columns + updated_sdf = updated_sdf.select(*spark_columns, col.alias(column_name)) + final_spark_columns.append(column_name) + + updated_sdf = updated_sdf.select(*final_spark_columns) + + return DataFrame(updated_sdf) + # TODO: add 'filter_func' and 'errors' parameter def update(self, other: "DataFrame", join: str = "left", overwrite: bool = True) -> None: """ diff --git a/databricks/koalas/missing/frame.py b/databricks/koalas/missing/frame.py index 8425ce18f3..68a45e2a22 100644 --- a/databricks/koalas/missing/frame.py +++ b/databricks/koalas/missing/frame.py @@ -42,7 +42,6 @@ class _MissingPandasLikeDataFrame(object): between_time = _unsupported_function("between_time") boxplot = _unsupported_function("boxplot") combine = _unsupported_function("combine") - combine_first = _unsupported_function("combine_first") compare = _unsupported_function("compare") convert_dtypes = _unsupported_function("convert_dtypes") corrwith = _unsupported_function("corrwith") diff --git a/databricks/koalas/tests/test_dataframe.py b/databricks/koalas/tests/test_dataframe.py index 351f98dbb2..89ce9a3858 100644 --- a/databricks/koalas/tests/test_dataframe.py +++ b/databricks/koalas/tests/test_dataframe.py @@ -2352,6 +2352,28 @@ def test_replace(self): kdf.replace({("X", "B"): {0: 100, 4: 400}}), pdf.replace({("X", "B"): {0: 100, 4: 400}}) ) + def test_combine_first(self): + pdf1 = pd.DataFrame({"A": [None, 0], "B": [4, None]}) + pdf2 = pd.DataFrame({"C": [3, 3], "B": [1, 1]}) + + df1 = ks.from_pandas(pdf1) + df2 = ks.from_pandas(pdf2) + + with option_context("compute.ops_on_diff_frames", True): + res = df1.combine_first(df2) + self.assert_eq(res, pdf1.combine_first(pdf2)) + + midx_pdf1 = pd.DataFrame({"A": [None, 0], "B": [None, 4]}) + midx_pdf1.columns = pd.MultiIndex.from_tuples([("A", "willow"), ("B", "pine")]) + midx_pdf2 = pd.DataFrame({"B": [3, 3], "C": [1, 1]}) + midx_pdf2.columns = pd.MultiIndex.from_tuples([("B", "pine"), ("C", "oak")]) + + midx_df1 = ks.from_pandas(midx_pdf1) + midx_df2 = ks.from_pandas(midx_pdf2) + with option_context("compute.ops_on_diff_frames", True): + midx_res = midx_df1.combine_first(midx_df2) + self.assert_eq(midx_res, midx_pdf1.combine_first(midx_pdf2)) + def test_update(self): # check base function def get_data(left_columns=None, right_columns=None):