diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index e3222b3e0f82..10671a4a31bb 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -4407,9 +4407,34 @@ def unstack(self, level=-1, fill_value=None): def update(self, other, join='left', overwrite=True, filter_func=None, raise_conflict=False): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Modify DataFrame in place using non-NA values from other. + + Args: + other: DataFrame, or object coercible into a DataFrame + join: {'left'}, default 'left' + overwrite: If True then overwrite values for common keys in frame + filter_func: Can choose to replace values other than NA. + raise_conflict: If True, will raise an error if the DataFrame and + other both contain data in the same place. + + Returns: + None + """ + if raise_conflict: + raise NotImplementedError( + "raise_conflict parameter not yet supported. " + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") + + if not isinstance(other, DataFrame): + other = DataFrame(other) + + def update_helper(x, y): + x.update(y, join, overwrite, filter_func, False) + return x + + self._inter_df_op_helper(update_helper, other, join, 0, None, + inplace=True) def var(self, axis=None, skipna=None, level=None, ddof=1, numeric_only=None, **kwargs): @@ -4963,36 +4988,40 @@ def _operator_helper(self, func, other, axis, level, *args): if isinstance(other, DataFrame): return self._inter_df_op_helper( lambda x, y: func(x, y, axis, level, *args), - other, axis, level) + other, "outer", axis, level) else: return self._single_df_op_helper( lambda df: func(df, other, axis, level, *args), other, axis, level) - def _inter_df_op_helper(self, func, other, axis, level): + def _inter_df_op_helper(self, func, other, how, axis, level, + inplace=False): if level is not None: raise NotImplementedError("Mutlilevel index not yet supported " "in Pandas on Ray") axis = pd.DataFrame()._get_axis_number(axis) - # Adding two DataFrames causes an outer join. - if isinstance(other, DataFrame): - new_column_index = self.columns.join(other.columns, how="outer") - new_index = self.index.join(other.index, how="outer") - copartitions = self._copartition(other, new_index) - - new_blocks = \ - np.array([_co_op_helper._submit( - args=tuple([func, self.columns, other.columns, - len(part[0]), None] + - np.concatenate(part).tolist()), - num_return_vals=len(part[0])) - for part in copartitions]) + new_column_index = self.columns.join(other.columns, how=how) + new_index = self.index.join(other.index, how=how) + copartitions = self._copartition(other, new_index) + new_blocks = \ + np.array([_co_op_helper._submit( + args=tuple([func, self.columns, other.columns, + len(part[0]), None] + + np.concatenate(part).tolist()), + num_return_vals=len(part[0])) + for part in copartitions]) + + if not inplace: # TODO join the Index Metadata objects together for performance. return DataFrame(block_partitions=new_blocks, columns=new_column_index, index=new_index) + else: + self._update_inplace(block_partitions=new_blocks, + columns=new_column_index, + index=new_index) def _single_df_op_helper(self, func, other, axis, level): if level is not None: diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 7660b5366447..3037e609a6c1 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -3009,10 +3009,21 @@ def test_unstack(): def test_update(): - ray_df = create_test_dataframe() + df = rdf.DataFrame([[1.5, np.nan, 3.], + [1.5, np.nan, 3.], + [1.5, np.nan, 3], + [1.5, np.nan, 3]]) - with pytest.raises(NotImplementedError): - ray_df.update(None) + other = rdf.DataFrame([[3.6, 2., np.nan], + [np.nan, np.nan, 7]], index=[1, 3]) + + df.update(other) + + expected = rdf.DataFrame([[1.5, np.nan, 3], + [3.6, 2, 3], + [1.5, np.nan, 3], + [1.5, np.nan, 7.]]) + assert ray_df_equals(df, expected) @pytest.fixture