From 4ad5d0599d0c6c89d8d24ec9fa860e071c61f64b Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Mon, 16 Apr 2018 17:45:26 -0700 Subject: [PATCH 01/17] working for non-string functions and not lists of functions --- python/ray/dataframe/dataframe.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 0668c3569581..bb02574c5892 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -1059,12 +1059,24 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, if axis == 1: kwds['axis'] = axis return getattr(self, func)(*args, **kwds) - elif callable(func): - return self._callable_function(func, axis=axis, *args, **kwds) - else: - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + else:# callable(func): + if isinstance(func, dict): + result = [] + for key in func: + part, ind = self._col_metadata[key] + def helper(df): + x = df.iloc[:, ind] + return func[key](x) + + result.append(_deploy_func.remote(helper, + self._col_partitions[part])) + + return pd.Series(ray.get(result), index=func.keys()) + + elif isinstance(func, list): + pass + else: + return self._callable_function(f, axis=axis, *args, **kwds) def as_blocks(self, copy=True): raise NotImplementedError( From 3a8c30a432ac51727e0e2d742e67622bba6b98a9 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Mon, 16 Apr 2018 23:01:42 -0700 Subject: [PATCH 02/17] works with functions as strings now as well --- python/ray/dataframe/dataframe.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index bb02574c5892..704a669c1a00 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -1064,9 +1064,17 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, result = [] for key in func: part, ind = self._col_metadata[key] + + if isinstance(func[key], compat.string_types): + if axis == 1: + kwds['axis'] = axis + f = getattr(pd.core.series.Series, func[key]) + else: + f = func[key] + def helper(df): x = df.iloc[:, ind] - return func[key](x) + return f(x) result.append(_deploy_func.remote(helper, self._col_partitions[part])) From 9d4739b33aeded3e07401e52be77b7dc99a98986 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Mon, 16 Apr 2018 23:25:12 -0700 Subject: [PATCH 03/17] fixed linting errors --- python/ray/dataframe/dataframe.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 704a669c1a00..5b2a760ea0ba 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -1059,7 +1059,7 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, if axis == 1: kwds['axis'] = axis return getattr(self, func)(*args, **kwds) - else:# callable(func): + else: if isinstance(func, dict): result = [] for key in func: @@ -1076,13 +1076,15 @@ def helper(df): x = df.iloc[:, ind] return f(x) - result.append(_deploy_func.remote(helper, - self._col_partitions[part])) + result.append(_deploy_func.remote( + helper, self._col_partitions[part])) return pd.Series(ray.get(result), index=func.keys()) elif isinstance(func, list): - pass + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") else: return self._callable_function(f, axis=axis, *args, **kwds) From 4913a261039019b8a46da1f9cb52e0b7a8a89d8b Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Tue, 17 Apr 2018 12:47:43 -0700 Subject: [PATCH 04/17] throwing a warning if the input is a dictionary --- python/ray/dataframe/dataframe.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 5b2a760ea0ba..ca3cdca47feb 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -1061,6 +1061,8 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, return getattr(self, func)(*args, **kwds) else: if isinstance(func, dict): + warnings.warn("Currently not supporting functions that return " + "a DataFrame.", FutureWarning, stacklevel=2) result = [] for key in func: part, ind = self._col_metadata[key] @@ -1068,7 +1070,13 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, if isinstance(func[key], compat.string_types): if axis == 1: kwds['axis'] = axis + # find the corresponding pd.Series function f = getattr(pd.core.series.Series, func[key]) + elif isinstance(func[key], list): + # not yet supporting lists of functions in the dict + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") else: f = func[key] @@ -1085,8 +1093,8 @@ def helper(df): raise NotImplementedError( "To contribute to Pandas on Ray, please visit " "github.com/ray-project/ray.") - else: - return self._callable_function(f, axis=axis, *args, **kwds) + elif callable(func): + return self._callable_function(func, axis=axis, *args, **kwds) def as_blocks(self, copy=True): raise NotImplementedError( From a8453360b85eac362761bbcc8f8e0bdb9b762eda Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Thu, 26 Apr 2018 16:37:34 -0700 Subject: [PATCH 05/17] added dict of lists functionality --- python/ray/dataframe/dataframe.py | 99 ++++++++++++++++++++----------- 1 file changed, 65 insertions(+), 34 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index ca3cdca47feb..998fd91fb8ab 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -3,6 +3,7 @@ from __future__ import print_function import pandas as pd +# from functools import reduce from pandas.api.types import is_scalar from pandas.util._validators import validate_bool_kwarg from pandas.core.index import _ensure_index_from_sequences @@ -23,6 +24,7 @@ import numpy as np import ray import itertools +import functools import io import sys import re @@ -1047,52 +1049,81 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, """ axis = pd.DataFrame()._get_axis_number(axis) - if is_list_like(func) and not all([isinstance(obj, str) - for obj in func]): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + # if is_list_like(func) and not all([isinstance(obj, str) + # for obj in func]): + # raise NotImplementedError( + # "To contribute to Pandas on Ray, please visit " + # "github.com/ray-project/ray.") - if axis == 0 and is_list_like(func): - return self.aggregate(func, axis, *args, **kwds) + # if axis == 0 and is_list_like(func): + # return self.aggregate(func, axis, *args, **kwds) if isinstance(func, compat.string_types): if axis == 1: kwds['axis'] = axis return getattr(self, func)(*args, **kwds) else: if isinstance(func, dict): - warnings.warn("Currently not supporting functions that return " - "a DataFrame.", FutureWarning, stacklevel=2) result = [] - for key in func: - part, ind = self._col_metadata[key] + if list not in map(type, func.values()): + for key in func: + part, ind = self._col_metadata[key] + + if isinstance(func[key], compat.string_types): + if axis == 1: + kwds['axis'] = axis + # find the corresponding pd.Series function + f = getattr(pd.core.series.Series, func[key]) + else: + f = func[key] + + def helper(df): + x = df.iloc[:, ind] + return f(x) + + result.append(_deploy_func.remote( + helper, self._col_partitions[part])) + + return pd.Series(ray.get(result), index=func.keys()) + else: + for key in func: + part, ind = self._col_metadata[key] + + if isinstance(func[key], compat.string_types): + if axis == 1: + kwds['axis'] = axis + f = [getattr(pd.core.series.Series, func[key])] + elif isinstance(func[key], list): + f = func[key] + else: + f = [func[key]] + + def helper(df): + x = df.iloc[:, ind].apply(f).to_frame() + x.columns = [key] + return x + + result.append(_deploy_func.remote(helper, + self._col_partitions[part])) + + result = ray.get(result) + return functools.reduce((lambda l,r: l.join(r, how='outer')), result) - if isinstance(func[key], compat.string_types): + elif isinstance(func, list): + rows = [] + for function in func: + if isinstance(function, compat.string_types): if axis == 1: kwds['axis'] = axis - # find the corresponding pd.Series function - f = getattr(pd.core.series.Series, func[key]) - elif isinstance(func[key], list): - # not yet supporting lists of functions in the dict - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + f = getattr(pd.core.series.Series, function) else: - f = func[key] - - def helper(df): - x = df.iloc[:, ind] - return f(x) - - result.append(_deploy_func.remote( - helper, self._col_partitions[part])) - - return pd.Series(ray.get(result), index=func.keys()) - - elif isinstance(func, list): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + f = function + rows.append(pd.concat(ray.get(_map_partitions(lambda df: f(df), + self._col_partitions)), axis=1)) + df = pd.concat(rows, axis=0) + df.columns = self.columns + df.index = [f if isinstance(f,compat.string_types) \ + else f.__name__ for f in func] + return df elif callable(func): return self._callable_function(func, axis=axis, *args, **kwds) From 5e45743676192e8448877292904238438a75a933 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Fri, 27 Apr 2018 01:23:08 -0700 Subject: [PATCH 06/17] fix minor indexing errors and lint --- python/ray/dataframe/dataframe.py | 32 +++++++++++-------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 998fd91fb8ab..f7b7b32018b8 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -1049,14 +1049,6 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, """ axis = pd.DataFrame()._get_axis_number(axis) - # if is_list_like(func) and not all([isinstance(obj, str) - # for obj in func]): - # raise NotImplementedError( - # "To contribute to Pandas on Ray, please visit " - # "github.com/ray-project/ray.") - - # if axis == 0 and is_list_like(func): - # return self.aggregate(func, axis, *args, **kwds) if isinstance(func, compat.string_types): if axis == 1: kwds['axis'] = axis @@ -1088,11 +1080,7 @@ def helper(df): for key in func: part, ind = self._col_metadata[key] - if isinstance(func[key], compat.string_types): - if axis == 1: - kwds['axis'] = axis - f = [getattr(pd.core.series.Series, func[key])] - elif isinstance(func[key], list): + if isinstance(func[key], list): f = func[key] else: f = [func[key]] @@ -1101,12 +1089,14 @@ def helper(df): x = df.iloc[:, ind].apply(f).to_frame() x.columns = [key] return x - + result.append(_deploy_func.remote(helper, - self._col_partitions[part])) - + self._col_partitions[part])) + result = ray.get(result) - return functools.reduce((lambda l,r: l.join(r, how='outer')), result) + return functools.reduce(lambda l, r: l.join(r, + how='outer'), + result) elif isinstance(func, list): rows = [] @@ -1117,12 +1107,12 @@ def helper(df): f = getattr(pd.core.series.Series, function) else: f = function - rows.append(pd.concat(ray.get(_map_partitions(lambda df: f(df), - self._col_partitions)), axis=1)) + rows.append(pd.concat(ray.get(_map_partitions( + lambda df: f(df), self._col_partitions)), axis=1)) df = pd.concat(rows, axis=0) df.columns = self.columns - df.index = [f if isinstance(f,compat.string_types) \ - else f.__name__ for f in func] + df.index = [f_name if isinstance(f_name, compat.string_types) + else f.__name__ for f_name in func] return df elif callable(func): return self._callable_function(func, axis=axis, *args, **kwds) From e39dcf5286816ba8a7b5eca604081d5c8f367aec Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Fri, 27 Apr 2018 01:29:49 -0700 Subject: [PATCH 07/17] removed some commented out code --- python/ray/dataframe/dataframe.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index f7b7b32018b8..85b5aab39fd4 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -3,7 +3,6 @@ from __future__ import print_function import pandas as pd -# from functools import reduce from pandas.api.types import is_scalar from pandas.util._validators import validate_bool_kwarg from pandas.core.index import _ensure_index_from_sequences From 94afdf403d16339aa668e1b327851b802b49efb1 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Sat, 28 Apr 2018 15:17:28 -0700 Subject: [PATCH 08/17] some comments and thoughts for apply --- python/ray/dataframe/dataframe.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 85b5aab39fd4..e34cc3b7e61f 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -1068,8 +1068,10 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, f = func[key] def helper(df): - x = df.iloc[:, ind] - return f(x) + x = df.iloc[:, ind].apply(f) + return x + # x = df.iloc[:, ind] + # return f(x) result.append(_deploy_func.remote( helper, self._col_partitions[part])) @@ -1097,13 +1099,14 @@ def helper(df): how='outer'), result) - elif isinstance(func, list): + # TODO: change this to is_list_like + elif isinstance(func, list) and axis==0: rows = [] for function in func: if isinstance(function, compat.string_types): if axis == 1: kwds['axis'] = axis - f = getattr(pd.core.series.Series, function) + f = getattr(pd.DataFrame, function) else: f = function rows.append(pd.concat(ray.get(_map_partitions( From acdb7a23a1823420cc9c4b3add618506bc8bb128 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Sat, 28 Apr 2018 16:01:23 -0700 Subject: [PATCH 09/17] cleaned up code a little bit and added todos --- python/ray/dataframe/dataframe.py | 117 ++++++++++++++---------------- 1 file changed, 53 insertions(+), 64 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index e34cc3b7e61f..89386faa0b67 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -1046,78 +1046,67 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, Returns: Series or DataFrame, depending on func. """ + # TODO: improve performance + # TODO: do axis checking + # TODO: call agg instead of reimplementing the list pary + # TODO: return ray dataframes + # TODO: try to do series and concat instead of join axis = pd.DataFrame()._get_axis_number(axis) if isinstance(func, compat.string_types): if axis == 1: kwds['axis'] = axis return getattr(self, func)(*args, **kwds) - else: - if isinstance(func, dict): - result = [] - if list not in map(type, func.values()): - for key in func: - part, ind = self._col_metadata[key] - - if isinstance(func[key], compat.string_types): - if axis == 1: - kwds['axis'] = axis - # find the corresponding pd.Series function - f = getattr(pd.core.series.Series, func[key]) - else: - f = func[key] - - def helper(df): - x = df.iloc[:, ind].apply(f) - return x - # x = df.iloc[:, ind] - # return f(x) - - result.append(_deploy_func.remote( - helper, self._col_partitions[part])) - - return pd.Series(ray.get(result), index=func.keys()) + elif isinstance(func, dict): + result = [] + + has_list = list in map(type, func.values()) + for key in func: + part, ind = self._col_metadata[key] + + if not isinstance(func[key], list) and has_list: + f = [func[key]] else: - for key in func: - part, ind = self._col_metadata[key] - - if isinstance(func[key], list): - f = func[key] - else: - f = [func[key]] - - def helper(df): - x = df.iloc[:, ind].apply(f).to_frame() - x.columns = [key] - return x - - result.append(_deploy_func.remote(helper, - self._col_partitions[part])) - - result = ray.get(result) - return functools.reduce(lambda l, r: l.join(r, - how='outer'), - result) - - # TODO: change this to is_list_like - elif isinstance(func, list) and axis==0: - rows = [] - for function in func: - if isinstance(function, compat.string_types): - if axis == 1: - kwds['axis'] = axis - f = getattr(pd.DataFrame, function) + f = func[key] + + def helper(df): + x = df.iloc[:, ind].apply(f) + if has_list: + x = x.to_frame() + x.columns = [key] + return x else: - f = function - rows.append(pd.concat(ray.get(_map_partitions( - lambda df: f(df), self._col_partitions)), axis=1)) - df = pd.concat(rows, axis=0) - df.columns = self.columns - df.index = [f_name if isinstance(f_name, compat.string_types) - else f.__name__ for f_name in func] - return df - elif callable(func): - return self._callable_function(func, axis=axis, *args, **kwds) + return x + + result.append(_deploy_func.remote( + helper, self._col_partitions[part])) + + if has_list: + return functools.reduce(lambda l, r: l.join(r, + how='outer'), + ray.get(result)) + else: + return pd.Series(ray.get(result), index=func.keys()) + + # TODO: change this to is_list_like + elif isinstance(func, list) and axis==0: + rows = [] + for function in func: + if isinstance(function, compat.string_types): + if axis == 1: + kwds['axis'] = axis + f = getattr(pd.DataFrame, function) + else: + f = function + rows.append(pd.concat(ray.get(_map_partitions( + lambda df: f(df), self._col_partitions)), axis=1)) + df = pd.concat(rows, axis=0) + df.columns = self.columns + df.index = [f_name if isinstance(f_name, compat.string_types) + else f.__name__ for f_name in func] + return df + elif callable(func): + return self._callable_function(func, axis=axis, *args, **kwds) def as_blocks(self, copy=True): raise NotImplementedError( From 317abd88d51ee6d3bd5a5f11e3c3919be2997bf4 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Sat, 28 Apr 2018 17:46:34 -0700 Subject: [PATCH 10/17] improved performance --- python/ray/dataframe/dataframe.py | 59 ++++++++++++++++++------------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 89386faa0b67..3894efcd92b9 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -1061,35 +1061,46 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, result = [] has_list = list in map(type, func.values()) - for key in func: - part, ind = self._col_metadata[key] + part_ind_tuples = [(self._col_metadata[key], key) for key in func] + + if has_list: + # tup[1] is the key of the dict + # tup[0][0] is partition index + # tup[0][1] is the index within the partition + result = [_deploy_func.remote( + lambda df: df.iloc[:, tup[0][1]].apply(func[tup[1]] + if is_list_like(func[tup[1]]) + else [func[tup[1]]]), + self._col_partitions[tup[0][0]]) + for tup in part_ind_tuples] + return pd.concat(ray.get(result), axis=1) + else: + result = [_deploy_func.remote( + lambda df: df.iloc[:, tup[0][1]].apply(func[tup[1]]), + self._col_partitions[tup[0][0]]) + for tup in part_ind_tuples] + return pd.Series(ray.get(result), index = func.keys()) - if not isinstance(func[key], list) and has_list: - f = [func[key]] - else: - f = func[key] - - def helper(df): - x = df.iloc[:, ind].apply(f) - if has_list: - x = x.to_frame() - x.columns = [key] - return x - else: - return x - result.append(_deploy_func.remote( - helper, self._col_partitions[part])) + # for key in func: + # part, ind = self._col_metadata[key] + + # if not is_list_like(func[key]) and has_list: + # f = [func[key]] + # else: + # f = func[key] + + # result.append(_deploy_func.remote( + # lambda df: df.iloc[:, ind].apply(f), + # self._col_partitions[part])) - if has_list: - return functools.reduce(lambda l, r: l.join(r, - how='outer'), - ray.get(result)) - else: - return pd.Series(ray.get(result), index=func.keys()) + # if has_list: + # return pd.concat(ray.get(result), axis=1) + # else: + # return pd.Series(ray.get(result), index=func.keys()) # TODO: change this to is_list_like - elif isinstance(func, list) and axis==0: + elif is_list_like(func) and axis==0: rows = [] for function in func: if isinstance(function, compat.string_types): From 279655131d2d3cbd968dbe0626cb04e78961ddcd Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Sat, 28 Apr 2018 18:04:14 -0700 Subject: [PATCH 11/17] error checking and code cleanup and comments --- python/ray/dataframe/dataframe.py | 36 ++++++++++--------------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 3894efcd92b9..11b2f9baa3b1 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -1050,7 +1050,6 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, # TODO: do axis checking # TODO: call agg instead of reimplementing the list pary # TODO: return ray dataframes - # TODO: try to do series and concat instead of join axis = pd.DataFrame()._get_axis_number(axis) if isinstance(func, compat.string_types): @@ -1058,15 +1057,18 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, kwds['axis'] = axis return getattr(self, func)(*args, **kwds) elif isinstance(func, dict): + if axis == 1: + raise TypeError("(\"'dict' object is not callable\", " + "'occurred at index {0}'".format(self.index[0])) result = [] has_list = list in map(type, func.values()) part_ind_tuples = [(self._col_metadata[key], key) for key in func] + # tup[1] is the key of the dict + # tup[0][0] is partition index + # tup[0][1] is the index within the partition if has_list: - # tup[1] is the key of the dict - # tup[0][0] is partition index - # tup[0][1] is the index within the partition result = [_deploy_func.remote( lambda df: df.iloc[:, tup[0][1]].apply(func[tup[1]] if is_list_like(func[tup[1]]) @@ -1079,28 +1081,12 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, lambda df: df.iloc[:, tup[0][1]].apply(func[tup[1]]), self._col_partitions[tup[0][0]]) for tup in part_ind_tuples] - return pd.Series(ray.get(result), index = func.keys()) - - - # for key in func: - # part, ind = self._col_metadata[key] + return pd.Series(ray.get(result), index=func.keys()) - # if not is_list_like(func[key]) and has_list: - # f = [func[key]] - # else: - # f = func[key] - - # result.append(_deploy_func.remote( - # lambda df: df.iloc[:, ind].apply(f), - # self._col_partitions[part])) - - # if has_list: - # return pd.concat(ray.get(result), axis=1) - # else: - # return pd.Series(ray.get(result), index=func.keys()) - - # TODO: change this to is_list_like - elif is_list_like(func) and axis==0: + elif is_list_like(func): + if axis == 1: + raise TypeError("(\"'list' object is not callable\", " + "'occurred at index {0}'".format(self.index[0])) rows = [] for function in func: if isinstance(function, compat.string_types): From 71f086925d5a755e4dd8fb491d0cede7676ae94b Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Sun, 29 Apr 2018 14:18:01 -0700 Subject: [PATCH 12/17] small change --- python/ray/dataframe/dataframe.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 11b2f9baa3b1..9f9b6bd51ee5 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -1095,8 +1095,9 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, f = getattr(pd.DataFrame, function) else: f = function + #TODO: can these just be block_partitions? rows.append(pd.concat(ray.get(_map_partitions( - lambda df: f(df), self._col_partitions)), axis=1)) + lambda df: df.apply(f), self._col_partitions)), axis=1)) df = pd.concat(rows, axis=0) df.columns = self.columns df.index = [f_name if isinstance(f_name, compat.string_types) From 5402c5c4994bd51e0637a0bf71934cc2e248a95d Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Mon, 30 Apr 2018 13:24:42 -0700 Subject: [PATCH 13/17] improved list performance a lot --- python/ray/dataframe/dataframe.py | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 9f9b6bd51ee5..6a1da10ae27d 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -1048,7 +1048,7 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, """ # TODO: improve performance # TODO: do axis checking - # TODO: call agg instead of reimplementing the list pary + # TODO: call agg instead of reimplementing the list part # TODO: return ray dataframes axis = pd.DataFrame()._get_axis_number(axis) @@ -1087,22 +1087,14 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, if axis == 1: raise TypeError("(\"'list' object is not callable\", " "'occurred at index {0}'".format(self.index[0])) - rows = [] - for function in func: - if isinstance(function, compat.string_types): - if axis == 1: - kwds['axis'] = axis - f = getattr(pd.DataFrame, function) - else: - f = function - #TODO: can these just be block_partitions? - rows.append(pd.concat(ray.get(_map_partitions( - lambda df: df.apply(f), self._col_partitions)), axis=1)) - df = pd.concat(rows, axis=0) - df.columns = self.columns - df.index = [f_name if isinstance(f_name, compat.string_types) + # TODO: some checking on functions that return Series or Dataframe + new_cols = _map_partitions(lambda df: df.apply(func), + self._col_partitions) + new_index = [f_name if isinstance(f_name, compat.string_types) else f.__name__ for f_name in func] - return df + return DataFrame(col_partitions=new_cols, + columns=self.columns, + index=new_index) elif callable(func): return self._callable_function(func, axis=axis, *args, **kwds) From e9f1708ec4b7bc03dfc77f7a87f77bacefbd855c Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Tue, 1 May 2018 01:06:06 -0700 Subject: [PATCH 14/17] agg calls apply for lists --- python/ray/dataframe/dataframe.py | 43 ++++++++++++------------------- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 6a1da10ae27d..09dc644fea1a 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -23,7 +23,6 @@ import numpy as np import ray import itertools -import functools import io import sys import re @@ -828,16 +827,7 @@ def _aggregate(self, arg, *args, **kwargs): "To contribute to Pandas on Ray, please visit " "github.com/ray-project/ray.") elif is_list_like(arg): - from .concat import concat - - x = [self._aggregate(func, *args, **kwargs) - for func in arg] - - new_dfs = [x[i] if not isinstance(x[i], pd.Series) - else pd.DataFrame(x[i], columns=[arg[i]]).T - for i in range(len(x))] - - return concat(new_dfs) + return self.apply(arg, axis=_axis, args=args, **kwargs) elif callable(arg): self._callable_function(arg, _axis, *args, **kwargs) else: @@ -1046,10 +1036,6 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, Returns: Series or DataFrame, depending on func. """ - # TODO: improve performance - # TODO: do axis checking - # TODO: call agg instead of reimplementing the list part - # TODO: return ray dataframes axis = pd.DataFrame()._get_axis_number(axis) if isinstance(func, compat.string_types): @@ -1058,40 +1044,45 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, return getattr(self, func)(*args, **kwds) elif isinstance(func, dict): if axis == 1: - raise TypeError("(\"'dict' object is not callable\", " + raise TypeError( + "(\"'dict' object is not callable\", " "'occurred at index {0}'".format(self.index[0])) - result = [] - has_list = list in map(type, func.values()) part_ind_tuples = [(self._col_metadata[key], key) for key in func] - + # tup[1] is the key of the dict # tup[0][0] is partition index # tup[0][1] is the index within the partition if has_list: + # if input dict has a list, the function to apply must wrap + # single functions in lists as well to get the desired output + # format result = [_deploy_func.remote( - lambda df: df.iloc[:, tup[0][1]].apply(func[tup[1]] - if is_list_like(func[tup[1]]) - else [func[tup[1]]]), + lambda df: df.iloc[:, tup[0][1]].apply( + func[tup[1]] if is_list_like(func[tup[1]]) + else [func[tup[1]]]), self._col_partitions[tup[0][0]]) for tup in part_ind_tuples] return pd.concat(ray.get(result), axis=1) else: result = [_deploy_func.remote( lambda df: df.iloc[:, tup[0][1]].apply(func[tup[1]]), - self._col_partitions[tup[0][0]]) + self._col_partitions[tup[0][0]]) for tup in part_ind_tuples] return pd.Series(ray.get(result), index=func.keys()) elif is_list_like(func): if axis == 1: - raise TypeError("(\"'list' object is not callable\", " + raise TypeError( + "(\"'list' object is not callable\", " "'occurred at index {0}'".format(self.index[0])) # TODO: some checking on functions that return Series or Dataframe new_cols = _map_partitions(lambda df: df.apply(func), - self._col_partitions) + self._col_partitions) + + # resolve function names for the DataFrame index new_index = [f_name if isinstance(f_name, compat.string_types) - else f.__name__ for f_name in func] + else f_name.__name__ for f_name in func] return DataFrame(col_partitions=new_cols, columns=self.columns, index=new_index) From ffaeee4645eb4a733a45c665583465526e037705 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Tue, 1 May 2018 14:07:53 -0700 Subject: [PATCH 15/17] addressing comments on the PR --- python/ray/dataframe/dataframe.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 09dc644fea1a..3c545bef945c 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -1047,28 +1047,29 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, raise TypeError( "(\"'dict' object is not callable\", " "'occurred at index {0}'".format(self.index[0])) + if len(self.columns) != len(set(self.columns)): + warnings.warn( + 'duplicate column names not supported with apply().', + FutureWarning, stacklevel=2) has_list = list in map(type, func.values()) part_ind_tuples = [(self._col_metadata[key], key) for key in func] - # tup[1] is the key of the dict - # tup[0][0] is partition index - # tup[0][1] is the index within the partition if has_list: # if input dict has a list, the function to apply must wrap # single functions in lists as well to get the desired output # format result = [_deploy_func.remote( - lambda df: df.iloc[:, tup[0][1]].apply( - func[tup[1]] if is_list_like(func[tup[1]]) - else [func[tup[1]]]), - self._col_partitions[tup[0][0]]) - for tup in part_ind_tuples] + lambda df: df.iloc[:, ind].apply( + func[key] if is_list_like(func[key]) + else [func[key]]), + self._col_partitions[part]) + for (part, ind), key in part_ind_tuples] return pd.concat(ray.get(result), axis=1) else: result = [_deploy_func.remote( - lambda df: df.iloc[:, tup[0][1]].apply(func[tup[1]]), - self._col_partitions[tup[0][0]]) - for tup in part_ind_tuples] + lambda df: df.iloc[:, ind].apply(func[key]), + self._col_partitions[part]) + for (part, ind), key in part_ind_tuples] return pd.Series(ray.get(result), index=func.keys()) elif is_list_like(func): From 90c2b483bf1723ea92d56ca34afd4a8a00b51831 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Wed, 2 May 2018 23:46:56 -0700 Subject: [PATCH 16/17] col_metadata change --- python/ray/dataframe/dataframe.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 3c545bef945c..e9763fb7018e 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -1086,7 +1086,8 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, else f_name.__name__ for f_name in func] return DataFrame(col_partitions=new_cols, columns=self.columns, - index=new_index) + index=new_index, + col_metadata=self._col_metadata) elif callable(func): return self._callable_function(func, axis=axis, *args, **kwds) From 6025ae7e950e53095b21a8c97537a201cde5f102 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Thu, 3 May 2018 22:12:03 -0700 Subject: [PATCH 17/17] updated tests to expect TypeError where appropriate --- python/ray/dataframe/test/test_dataframe.py | 82 +++++++++------------ 1 file changed, 35 insertions(+), 47 deletions(-) diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index b6d6414c6008..47468a68086b 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -302,25 +302,22 @@ def test_int_dataframe(): test_apply(ray_df, pandas_df, func, 1) test_aggregate(ray_df, pandas_df, func, 1) else: - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_apply(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_aggregate(ray_df, pandas_df, func, 1) func = ['sum', lambda df: df.sum()] - with pytest.raises(NotImplementedError): - test_apply(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): - test_aggregate(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): - test_agg(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): + test_apply(ray_df, pandas_df, func, 0) + test_aggregate(ray_df, pandas_df, func, 0) + test_agg(ray_df, pandas_df, func, 0) + with pytest.raises(TypeError): test_apply(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_aggregate(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) test_transform(ray_df, pandas_df) @@ -464,25 +461,22 @@ def test_float_dataframe(): test_apply(ray_df, pandas_df, func, 1) test_aggregate(ray_df, pandas_df, func, 1) else: - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_apply(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_aggregate(ray_df, pandas_df, func, 1) func = ['sum', lambda df: df.sum()] - with pytest.raises(NotImplementedError): - test_apply(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): - test_aggregate(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): - test_agg(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): + test_apply(ray_df, pandas_df, func, 0) + test_aggregate(ray_df, pandas_df, func, 0) + test_agg(ray_df, pandas_df, func, 0) + with pytest.raises(TypeError): test_apply(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_aggregate(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) test_transform(ray_df, pandas_df) @@ -632,17 +626,14 @@ def test_mixed_dtype_dataframe(): test_agg(ray_df, pandas_df, func, 0) func = ['sum', lambda df: df.sum()] - with pytest.raises(NotImplementedError): - test_apply(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): - test_aggregate(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): - test_agg(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): + test_apply(ray_df, pandas_df, func, 0) + test_aggregate(ray_df, pandas_df, func, 0) + test_agg(ray_df, pandas_df, func, 0) + with pytest.raises(TypeError): test_apply(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_aggregate(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) test_transform(ray_df, pandas_df) @@ -782,25 +773,22 @@ def test_nan_dataframe(): test_apply(ray_df, pandas_df, func, 1) test_aggregate(ray_df, pandas_df, func, 1) else: - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_apply(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_aggregate(ray_df, pandas_df, func, 1) func = ['sum', lambda df: df.sum()] - with pytest.raises(NotImplementedError): - test_apply(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): - test_aggregate(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): - test_agg(ray_df, pandas_df, func, 0) - with pytest.raises(NotImplementedError): + test_apply(ray_df, pandas_df, func, 0) + test_aggregate(ray_df, pandas_df, func, 0) + test_agg(ray_df, pandas_df, func, 0) + with pytest.raises(TypeError): test_apply(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_aggregate(ray_df, pandas_df, func, 1) - with pytest.raises(NotImplementedError): + with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) test_transform(ray_df, pandas_df) @@ -855,8 +843,8 @@ def test_comparison_inter_ops(op): ray_df2 = rdf.DataFrame({"A": [0, 2], "col1": [0, 19], "col2": [1, 1]}) pandas_df2 = pd.DataFrame({"A": [0, 2], "col1": [0, 19], "col2": [1, 1]}) - ray_df_equals_pandas(getattr(ray_df, op)(ray_df2), - getattr(pandas_df, op)(pandas_df2)) + ray_df_equals_pandas(getattr(ray_df2, op)(ray_df2), + getattr(pandas_df2, op)(pandas_df2)) @pytest.fixture