Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added combine first function #1950

Closed

Conversation

AishwaryaKalloli
Copy link

@AishwaryaKalloli AishwaryaKalloli commented Dec 3, 2020

#1929
This is an initial commit, want to know if I am going in the right direction.
Please let me know if I need to correct/improve anything.

set(self._internal.column_labels).intersection(set(other._internal.column_labels))
)

update_sdf = self.join(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, makes sense. Will update the code and let you know.

@AishwaryaKalloli
Copy link
Author

I have attached the results of cases provided in pandas docs
First and second dataframes in each image are df1 and df2, and third one is df1.combine_first(df2).
The results are correct, although I am having a hard time in replacing df._internal.spark_column_names. Is it necessary to reset them, if so can you point me to the function that I could use to reset them.

image
image

Copy link
Contributor

@itholic itholic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll review more detail after discussing about the comments below.

Thanks for the work on this!! :)

Comment on lines 7252 to 7253
if isinstance(other, ks.Series):
other = other.to_frame()
Copy link
Contributor

@itholic itholic Dec 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need to consider about ks.Series since pandas seems not support the parameter as Series ??

>>> pdf
     A    B
0  NaN  NaN
1  0.0  4.0

>>> pser
1    3
2    3
Name: B, dtype: int64

>>> pdf.combine_first(pser)
Traceback (most recent call last):
...
ValueError: Must specify axis=0 or 1

and also specified in their docs

    Parameters
    ----------
    other : DataFrame
        Provided DataFrame to use to fill null values.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, I removed it

databricks/koalas/frame.py Show resolved Hide resolved
set(self._internal.column_labels).intersection(set(other._internal.column_labels))
)

update_sdf = combine_frames(self, other)._internal.resolved_copy.spark_frame
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say we don't need to use resolved_copy here because the combined DataFrame by using combine_frames is new created DataFrame, so nothing is required to resolved.

Copy link
Contributor

@itholic itholic Dec 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about create combined_df first and get the updated_sdf after then so that we can also use another items of combined_df not only spark_frame ??

like

        combined_df = combine_frames(self, other)
        update_sdf = combined_df._internal.resolved_copy.spark_frame
        ...
        spark_columns = combined_df._internal.spark_columns
        update_sdf.select(spark_columns, ...)


update_sdf = combine_frames(self, other)._internal.resolved_copy.spark_frame

for column_labels in update_columns:
Copy link
Contributor

@itholic itholic Dec 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: How about just column_label rather than column_labels to avoid confusion because I think it always indicates single column label ?

Comment on lines 7267 to 7270
update_sdf = update_sdf.withColumn(
"__this_" + column_name, F.when(old_col.isNull(), new_col).otherwise(old_col)
)
update_sdf = update_sdf.drop("__that_" + column_labels[0])
Copy link
Contributor

@itholic itholic Dec 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can simply select the necessary columns rather than use withColumn and drop ??

like

spark_columns = combined_df._internal.spark_columns
# Add some code to exclude `column_labels[0]` here rather than use `drop`
spark_columns = ...
cond = F.when(old_col.isNull(), new_col).otherwise(old_col).alias("__this_" + column_name)

update_sdf = update_sdf.select(*spark_columns, cond)

Comment on lines 7272 to 7281
all_column_labels = []
for column in update_sdf.columns:
if column.startswith("__this_") or column.startswith("__that_"):
all_column_labels.append((column[7:],))

internal = InternalFrame(
spark_frame=update_sdf,
index_spark_column_names=list(self._internal.index_spark_column_names),
column_labels=list(all_column_labels),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we can simply get the parameters for creating InternalFrame from combined_df if we created it before with a just little modifying?

@AishwaryaKalloli
Copy link
Author

AishwaryaKalloli commented Dec 7, 2020

Based on what I understood from the comments, what I have tried to do is

  1. Get all the common columns between self and other
  2. Combined both frames and created a sdf from that
  3. Created a list called final_spark_columns which will collect all the columns that have to be present in the final df
  4. looped through all the columns in combined frame and
    • collected common column names only once and named them as __newc_~ (~ is original column name, newc for new column)
    • collected other column names as it is
      in final_spark_columns
  5. Finally I have looped through all the column names in final_spark_columns and renamed them

I am not sure if it is very efficient though, got the warning below, let me know if it makes sense and what I can do to improve the code
20/12/07 18:50:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

@itholic
Copy link
Contributor

itholic commented Dec 8, 2020

Thanks, @AishwaryaKalloli . Let me take a look at the changes soon! :)

@AishwaryaKalloli
Copy link
Author

Sure, thanks :)

Copy link
Contributor

@itholic itholic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can refer to the implementation concept of Series.combine_first().

def combine_first(self, other) -> "Series":
"""
Combine Series values, choosing the calling Series's values first.
Parameters
----------
other : Series
The value(s) to be combined with the `Series`.
Returns
-------
Series
The result of combining the Series with the other object.
See Also
--------
Series.combine : Perform elementwise operation on two Series
using a given function.
Notes
-----
Result index will be the union of the two indexes.
Examples
--------
>>> s1 = ks.Series([1, np.nan])
>>> s2 = ks.Series([3, 4])
>>> with ks.option_context("compute.ops_on_diff_frames", True):
... s1.combine_first(s2)
0 1.0
1 4.0
dtype: float64
"""
if not isinstance(other, ks.Series):
raise ValueError("`combine_first` only allows `Series` for parameter `other`")
if same_anchor(self, other):
this = self.spark.column
that = other.spark.column
combined = self._kdf
else:
combined = combine_frames(self._kdf, other._kdf)
this = combined["this"]._internal.spark_column_for(self._column_label)
that = combined["that"]._internal.spark_column_for(other._column_label)
# If `self` has missing value, use value of `other`
cond = F.when(this.isNull(), that).otherwise(this)
# If `self` and `other` come from same frame, the anchor should be kept
if same_anchor(self, other):
return self._with_new_scol(cond)
index_scols = combined._internal.index_spark_columns
sdf = combined._internal.spark_frame.select(
*index_scols, cond.alias(self._internal.data_spark_column_names[0])
).distinct()
internal = self._internal.with_new_sdf(sdf)
return first_series(DataFrame(internal))

databricks/koalas/frame.py Show resolved Hide resolved

combined_df = combine_frames(self, other)
column_labels = combined_df._internal.column_labels
updated_sdf = combined_df._internal.resolved_copy.spark_frame
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe we don't need resolved_copy here.

spark_columns = combined_df._internal.spark_columns

for column_label in column_labels:
if (column_label[1],) in update_columns:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems not valid for MultiIndex columns ??

Could you try with below self and other ??

self = pd.DataFrame({'A': [None, 0], 'B': [None, 4]}, columns=pd.MultiIndex.from_tuples([('A', 'hello'), ('B', 'hi')]))
other = pd.DataFrame({'B': [3, 3], 'C': [1, 1]}, columns=pd.MultiIndex.from_tuples([('B', 'hi'), ('C', 'okay')]))

Copy link
Author

@AishwaryaKalloli AishwaryaKalloli Dec 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, would it make sense to use something like following

for column_label in column_labels:
    if column_label[1:] in update_columns:
        if column_label[0] == "this":
            column_name = self._internal.spark_column_name_for(column_label[1:])

@AishwaryaKalloli
Copy link
Author

Thanks, I'll take a look at implementation in series and update accordingly.

@shril
Copy link
Contributor

shril commented Dec 19, 2020

You are facing pycodestyle test failure.
Please try to run the following command before committing the code.

./dev/pytest` -k test_dataframe.py

The tests which are failing are mostly style tests. You can try rectifying them locally.

midx_df2 = ks.from_pandas(midx_pdf2)
with option_context("compute.ops_on_diff_frames", True):
midx_res = midx_df1.combine_first(midx_df2)
self.assert_eq(midx_res, midx_pdf1.combine_first(midx_pdf2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have more examples for exception cases ??

@xinrong-meng
Copy link
Contributor

Hi @AishwaryaKalloli, since Koalas has been ported to Spark as pandas API on Spark, would you like to migrate this PR to the Spark repository? Here is the ticket https://issues.apache.org/jira/browse/SPARK-36399. Otherwise, I may do that for you next week.

@xinrong-meng
Copy link
Contributor

I am porting this now.

ueshin pushed a commit to apache/spark that referenced this pull request Sep 1, 2021
### What changes were proposed in this pull request?
Implement `DataFrame.combine_first`.

The PR is based on databricks/koalas#1950. Thanks AishwaryaKalloli for the prototype.

### Why are the changes needed?
Update null elements with value in the same location in another is a common use case.
That is supported in pandas. We should support that as well.

### Does this PR introduce _any_ user-facing change?
Yes. `DataFrame.combine_first` can be used.

```py
>>> ps.set_option("compute.ops_on_diff_frames", True)
>>> df1 = ps.DataFrame({'A': [None, 0], 'B': [None, 4]})
>>> df2 = ps.DataFrame({'A': [1, 1], 'B': [3, 3]})
>>> df1.combine_first(df2).sort_index()
     A    B
0  1.0  3.0
1  0.0  4.0

# Null values still persist if the location of that null value does not exist in other

>>> df1 = ps.DataFrame({'A': [None, 0], 'B': [4, None]})
>>> df2 = ps.DataFrame({'B': [3, 3], 'C': [1, 1]}, index=[1, 2])
>>> df1.combine_first(df2).sort_index()
     A    B    C
0  NaN  4.0  NaN
1  0.0  3.0  1.0
2  NaN  3.0  1.0
>>> ps.reset_option("compute.ops_on_diff_frames")
```

### How was this patch tested?
Unit tests.

Closes #33714 from xinrong-databricks/df_combine_first.

Authored-by: Xinrong Meng <[email protected]>
Signed-off-by: Takuya UESHIN <[email protected]>
@xinrong-meng
Copy link
Contributor

Hi @AishwaryaKalloli I would like to close the PR since it has been ported to Spark

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants