Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 47 additions & 22 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,16 +827,7 @@ def _aggregate(self, arg, *args, **kwargs):
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
elif is_list_like(arg):
from .concat import concat

x = [self._aggregate(func, *args, **kwargs)
for func in arg]

new_dfs = [x[i] if not isinstance(x[i], pd.Series)
else pd.DataFrame(x[i], columns=[arg[i]]).T
for i in range(len(x))]

return concat(new_dfs)
return self.apply(arg, axis=_axis, args=args, **kwargs)
elif callable(arg):
self._callable_function(arg, _axis, *args, **kwargs)
else:
Expand Down Expand Up @@ -1047,24 +1038,58 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None,
"""
axis = pd.DataFrame()._get_axis_number(axis)

if is_list_like(func) and not all([isinstance(obj, str)
for obj in func]):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")

if axis == 0 and is_list_like(func):
return self.aggregate(func, axis, *args, **kwds)
if isinstance(func, compat.string_types):
if axis == 1:
kwds['axis'] = axis
return getattr(self, func)(*args, **kwds)
elif isinstance(func, dict):
if axis == 1:
raise TypeError(
"(\"'dict' object is not callable\", "
"'occurred at index {0}'".format(self.index[0]))
if len(self.columns) != len(set(self.columns)):
warnings.warn(
'duplicate column names not supported with apply().',
FutureWarning, stacklevel=2)
has_list = list in map(type, func.values())
part_ind_tuples = [(self._col_metadata[key], key) for key in func]

if has_list:
# if input dict has a list, the function to apply must wrap
# single functions in lists as well to get the desired output
# format
result = [_deploy_func.remote(
lambda df: df.iloc[:, ind].apply(
func[key] if is_list_like(func[key])
else [func[key]]),
self._col_partitions[part])
for (part, ind), key in part_ind_tuples]
return pd.concat(ray.get(result), axis=1)
else:
result = [_deploy_func.remote(
lambda df: df.iloc[:, ind].apply(func[key]),
self._col_partitions[part])
for (part, ind), key in part_ind_tuples]
return pd.Series(ray.get(result), index=func.keys())

elif is_list_like(func):
if axis == 1:
raise TypeError(
"(\"'list' object is not callable\", "
"'occurred at index {0}'".format(self.index[0]))
# TODO: some checking on functions that return Series or Dataframe
new_cols = _map_partitions(lambda df: df.apply(func),
self._col_partitions)

# resolve function names for the DataFrame index
new_index = [f_name if isinstance(f_name, compat.string_types)
else f_name.__name__ for f_name in func]
return DataFrame(col_partitions=new_cols,
columns=self.columns,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add col_metadata=self._col_metadata after #1965

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @osalpekar. Rebase and make this change, then looks great!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rebased and made change in commit below!

index=new_index,
col_metadata=self._col_metadata)
elif callable(func):
return self._callable_function(func, axis=axis, *args, **kwds)
else:
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")

def as_blocks(self, copy=True):
raise NotImplementedError(
Expand Down
82 changes: 35 additions & 47 deletions python/ray/dataframe/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,25 +302,22 @@ def test_int_dataframe():
test_apply(ray_df, pandas_df, func, 1)
test_aggregate(ray_df, pandas_df, func, 1)
else:
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_apply(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_aggregate(ray_df, pandas_df, func, 1)

func = ['sum', lambda df: df.sum()]
with pytest.raises(NotImplementedError):
test_apply(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_aggregate(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_agg(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_apply(ray_df, pandas_df, func, 0)
test_aggregate(ray_df, pandas_df, func, 0)
test_agg(ray_df, pandas_df, func, 0)
with pytest.raises(TypeError):
test_apply(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_aggregate(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)

test_transform(ray_df, pandas_df)
Expand Down Expand Up @@ -464,25 +461,22 @@ def test_float_dataframe():
test_apply(ray_df, pandas_df, func, 1)
test_aggregate(ray_df, pandas_df, func, 1)
else:
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_apply(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_aggregate(ray_df, pandas_df, func, 1)

func = ['sum', lambda df: df.sum()]
with pytest.raises(NotImplementedError):
test_apply(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_aggregate(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_agg(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_apply(ray_df, pandas_df, func, 0)
test_aggregate(ray_df, pandas_df, func, 0)
test_agg(ray_df, pandas_df, func, 0)
with pytest.raises(TypeError):
test_apply(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_aggregate(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)

test_transform(ray_df, pandas_df)
Expand Down Expand Up @@ -632,17 +626,14 @@ def test_mixed_dtype_dataframe():
test_agg(ray_df, pandas_df, func, 0)

func = ['sum', lambda df: df.sum()]
with pytest.raises(NotImplementedError):
test_apply(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_aggregate(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_agg(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_apply(ray_df, pandas_df, func, 0)
test_aggregate(ray_df, pandas_df, func, 0)
test_agg(ray_df, pandas_df, func, 0)
with pytest.raises(TypeError):
test_apply(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_aggregate(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)

test_transform(ray_df, pandas_df)
Expand Down Expand Up @@ -782,25 +773,22 @@ def test_nan_dataframe():
test_apply(ray_df, pandas_df, func, 1)
test_aggregate(ray_df, pandas_df, func, 1)
else:
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_apply(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_aggregate(ray_df, pandas_df, func, 1)

func = ['sum', lambda df: df.sum()]
with pytest.raises(NotImplementedError):
test_apply(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_aggregate(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_agg(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_apply(ray_df, pandas_df, func, 0)
test_aggregate(ray_df, pandas_df, func, 0)
test_agg(ray_df, pandas_df, func, 0)
with pytest.raises(TypeError):
test_apply(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_aggregate(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)

test_transform(ray_df, pandas_df)
Expand Down Expand Up @@ -855,8 +843,8 @@ def test_comparison_inter_ops(op):
ray_df2 = rdf.DataFrame({"A": [0, 2], "col1": [0, 19], "col2": [1, 1]})
pandas_df2 = pd.DataFrame({"A": [0, 2], "col1": [0, 19], "col2": [1, 1]})

ray_df_equals_pandas(getattr(ray_df, op)(ray_df2),
getattr(pandas_df, op)(pandas_df2))
ray_df_equals_pandas(getattr(ray_df2, op)(ray_df2),
getattr(pandas_df2, op)(pandas_df2))


@pytest.fixture
Expand Down