Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 47 additions & 18 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4407,9 +4407,34 @@ def unstack(self, level=-1, fill_value=None):

def update(self, other, join='left', overwrite=True, filter_func=None,
raise_conflict=False):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Modify DataFrame in place using non-NA values from other.

Args:
other: DataFrame, or object coercible into a DataFrame
join: {'left'}, default 'left'
overwrite: If True then overwrite values for common keys in frame
filter_func: Can choose to replace values other than NA.
raise_conflict: If True, will raise an error if the DataFrame and
other both contain data in the same place.

Returns:
None
"""
if raise_conflict:
raise NotImplementedError(
"raise_conflict parameter not yet supported. "
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")

if not isinstance(other, DataFrame):
other = DataFrame(other)

def update_helper(x, y):
x.update(y, join, overwrite, filter_func, False)
return x

self._inter_df_op_helper(update_helper, other, join, 0, None,
inplace=True)

def var(self, axis=None, skipna=None, level=None, ddof=1,
numeric_only=None, **kwargs):
Expand Down Expand Up @@ -4963,36 +4988,40 @@ def _operator_helper(self, func, other, axis, level, *args):
if isinstance(other, DataFrame):
return self._inter_df_op_helper(
lambda x, y: func(x, y, axis, level, *args),
other, axis, level)
other, "outer", axis, level)
else:
return self._single_df_op_helper(
lambda df: func(df, other, axis, level, *args),
other, axis, level)

def _inter_df_op_helper(self, func, other, axis, level):
def _inter_df_op_helper(self, func, other, how, axis, level,
inplace=False):
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")
axis = pd.DataFrame()._get_axis_number(axis)

# Adding two DataFrames causes an outer join.
if isinstance(other, DataFrame):
new_column_index = self.columns.join(other.columns, how="outer")
new_index = self.index.join(other.index, how="outer")
copartitions = self._copartition(other, new_index)

new_blocks = \
np.array([_co_op_helper._submit(
args=tuple([func, self.columns, other.columns,
len(part[0]), None] +
np.concatenate(part).tolist()),
num_return_vals=len(part[0]))
for part in copartitions])
new_column_index = self.columns.join(other.columns, how=how)
new_index = self.index.join(other.index, how=how)
copartitions = self._copartition(other, new_index)

new_blocks = \
np.array([_co_op_helper._submit(
args=tuple([func, self.columns, other.columns,
len(part[0]), None] +
np.concatenate(part).tolist()),
num_return_vals=len(part[0]))
for part in copartitions])

if not inplace:
# TODO join the Index Metadata objects together for performance.
return DataFrame(block_partitions=new_blocks,
columns=new_column_index,
index=new_index)
else:
self._update_inplace(block_partitions=new_blocks,
columns=new_column_index,
index=new_index)

def _single_df_op_helper(self, func, other, axis, level):
if level is not None:
Expand Down
17 changes: 14 additions & 3 deletions python/ray/dataframe/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3009,10 +3009,21 @@ def test_unstack():


def test_update():
ray_df = create_test_dataframe()
df = rdf.DataFrame([[1.5, np.nan, 3.],
[1.5, np.nan, 3.],
[1.5, np.nan, 3],
[1.5, np.nan, 3]])

with pytest.raises(NotImplementedError):
ray_df.update(None)
other = rdf.DataFrame([[3.6, 2., np.nan],
[np.nan, np.nan, 7]], index=[1, 3])

df.update(other)

expected = rdf.DataFrame([[1.5, np.nan, 3],
[3.6, 2, 3],
[1.5, np.nan, 3],
[1.5, np.nan, 7.]])
assert ray_df_equals(df, expected)


@pytest.fixture
Expand Down