diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 0668c3569581..e9763fb7018e 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -827,16 +827,7 @@ def _aggregate(self, arg, *args, **kwargs): "To contribute to Pandas on Ray, please visit " "github.com/ray-project/ray.") elif is_list_like(arg): - from .concat import concat - - x = [self._aggregate(func, *args, **kwargs) - for func in arg] - - new_dfs = [x[i] if not isinstance(x[i], pd.Series) - else pd.DataFrame(x[i], columns=[arg[i]]).T - for i in range(len(x))] - - return concat(new_dfs) + return self.apply(arg, axis=_axis, args=args, **kwargs) elif callable(arg): self._callable_function(arg, _axis, *args, **kwargs) else: @@ -1047,24 +1038,58 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, """ axis = pd.DataFrame()._get_axis_number(axis) - if is_list_like(func) and not all([isinstance(obj, str) - for obj in func]): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") - - if axis == 0 and is_list_like(func): - return self.aggregate(func, axis, *args, **kwds) if isinstance(func, compat.string_types): if axis == 1: kwds['axis'] = axis return getattr(self, func)(*args, **kwds) + elif isinstance(func, dict): + if axis == 1: + raise TypeError( + "(\"'dict' object is not callable\", " + "'occurred at index {0}'".format(self.index[0])) + if len(self.columns) != len(set(self.columns)): + warnings.warn( + 'duplicate column names not supported with apply().', + FutureWarning, stacklevel=2) + has_list = list in map(type, func.values()) + part_ind_tuples = [(self._col_metadata[key], key) for key in func] + + if has_list: + # if input dict has a list, the function to apply must wrap + # single functions in lists as well to get the desired output + # format + result = [_deploy_func.remote( + lambda df: df.iloc[:, ind].apply( + func[key] if is_list_like(func[key]) + else [func[key]]), + self._col_partitions[part]) + for (part, ind), key in part_ind_tuples] + return pd.concat(ray.get(result), axis=1) + else: + result = [_deploy_func.remote( + lambda df: df.iloc[:, ind].apply(func[key]), + self._col_partitions[part]) + for (part, ind), key in part_ind_tuples] + return pd.Series(ray.get(result), index=func.keys()) + + elif is_list_like(func): + if axis == 1: + raise TypeError( + "(\"'list' object is not callable\", " + "'occurred at index {0}'".format(self.index[0])) + # TODO: some checking on functions that return Series or Dataframe + new_cols = _map_partitions(lambda df: df.apply(func), + self._col_partitions) + + # resolve function names for the DataFrame index + new_index = [f_name if isinstance(f_name, compat.string_types) + else f_name.__name__ for f_name in func] + return DataFrame(col_partitions=new_cols, + columns=self.columns, + index=new_index, + col_metadata=self._col_metadata) elif callable(func): return self._callable_function(func, axis=axis, *args, **kwds) - else: - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") def as_blocks(self, copy=True): raise NotImplementedError( diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index b6d6414c6008..47468a68086b 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -302,25 +302,22 @@ def test_int_dataframe(): test_apply(ray_df, pandas_df, func, 1) test_aggregate(ray_df, pandas_df, func, 1) else: - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_apply(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_aggregate(ray_df, pandas_df, func, 1) func = ['sum', lambda df: df.sum()] - with pytest.raises(NotImplementedError): - test_apply(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): - test_aggregate(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): - test_agg(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): + test_apply(ray_df, pandas_df, func, 0) + test_aggregate(ray_df, pandas_df, func, 0) + test_agg(ray_df, pandas_df, func, 0) + with pytest.raises(TypeError): test_apply(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_aggregate(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) test_transform(ray_df, pandas_df) @@ -464,25 +461,22 @@ def test_float_dataframe(): test_apply(ray_df, pandas_df, func, 1) test_aggregate(ray_df, pandas_df, func, 1) else: - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_apply(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_aggregate(ray_df, pandas_df, func, 1) func = ['sum', lambda df: df.sum()] - with pytest.raises(NotImplementedError): - test_apply(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): - test_aggregate(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): - test_agg(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): + test_apply(ray_df, pandas_df, func, 0) + test_aggregate(ray_df, pandas_df, func, 0) + test_agg(ray_df, pandas_df, func, 0) + with pytest.raises(TypeError): test_apply(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_aggregate(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) test_transform(ray_df, pandas_df) @@ -632,17 +626,14 @@ def test_mixed_dtype_dataframe(): test_agg(ray_df, pandas_df, func, 0) func = ['sum', lambda df: df.sum()] - with pytest.raises(NotImplementedError): - test_apply(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): - test_aggregate(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): - test_agg(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): + test_apply(ray_df, pandas_df, func, 0) + test_aggregate(ray_df, pandas_df, func, 0) + test_agg(ray_df, pandas_df, func, 0) + with pytest.raises(TypeError): test_apply(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_aggregate(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) test_transform(ray_df, pandas_df) @@ -782,25 +773,22 @@ def test_nan_dataframe(): test_apply(ray_df, pandas_df, func, 1) test_aggregate(ray_df, pandas_df, func, 1) else: - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_apply(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_aggregate(ray_df, pandas_df, func, 1) func = ['sum', lambda df: df.sum()] - with pytest.raises(NotImplementedError): - test_apply(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): - test_aggregate(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): - test_agg(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): + test_apply(ray_df, pandas_df, func, 0) + test_aggregate(ray_df, pandas_df, func, 0) + test_agg(ray_df, pandas_df, func, 0) + with pytest.raises(TypeError): test_apply(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_aggregate(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) test_transform(ray_df, pandas_df) @@ -855,8 +843,8 @@ def test_comparison_inter_ops(op): ray_df2 = rdf.DataFrame({"A": [0, 2], "col1": [0, 19], "col2": [1, 1]}) pandas_df2 = pd.DataFrame({"A": [0, 2], "col1": [0, 19], "col2": [1, 1]}) - ray_df_equals_pandas(getattr(ray_df, op)(ray_df2), - getattr(pandas_df, op)(pandas_df2)) + ray_df_equals_pandas(getattr(ray_df2, op)(ray_df2), + getattr(pandas_df2, op)(pandas_df2)) @pytest.fixture