From afc29451fe5e8e5ab70ee7f360baf1d2e7cabacb Mon Sep 17 00:00:00 2001 From: Kunal Gosar Date: Wed, 9 May 2018 17:38:02 -0700 Subject: [PATCH 1/5] fix iterrows --- python/ray/dataframe/dataframe.py | 27 ++++++++++++------------- python/ray/dataframe/iterator.py | 33 +++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 14 deletions(-) create mode 100644 python/ray/dataframe/iterator.py diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 41924d801bd7..5a5968f18204 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -46,6 +46,7 @@ _correct_column_dtypes) from . import get_npartitions from .index_metadata import _IndexMetadata +from .iterator import PartitionIterator @_inherit_docstrings(pd.DataFrame) @@ -2328,32 +2329,30 @@ def iterrows(self): """Iterate over DataFrame rows as (index, Series) pairs. Note: - Generators can't be pickeled so from the remote function + Generators can't be pickled so from the remote function we expand the generator into a list before getting it. This is not that ideal. Returns: A generator that iterates over the rows of the frame. """ - def update_iterrow(series, i): - """Helper function to correct the columns + name of the Series.""" - series.index = self.columns - series.name = list(self.index)[i] - return series + def iterrow_helper(part, i): + df = ray.get(part) + df.columns = self.columns + df.index = self._row_metadata.partition_series(i).index + return df.iterrows() - iters = ray.get([_deploy_func.remote( - lambda df: list(df.iterrows()), part) - for part in self._row_partitions]) - iters = itertools.chain.from_iterable(iters) - series = map(lambda s: update_iterrow(s[1][1], s[0]), enumerate(iters)) + partition_iterator = PartitionIterator(self._row_partitions, + iterrow_helper) - return zip(self.index, series) + for v in partition_iterator: + yield v def items(self): """Iterator over (column name, Series) pairs. Note: - Generators can't be pickeled so from the remote function + Generators can't be pickled so from the remote function we expand the generator into a list before getting it. This is not that ideal. @@ -2393,7 +2392,7 @@ def itertuples(self, index=True, name='Pandas'): name (string, default "Pandas"): The name of the returned namedtuples or None to return regular tuples. Note: - Generators can't be pickeled so from the remote function + Generators can't be pickled so from the remote function we expand the generator into a list before getting it. This is not that ideal. diff --git a/python/ray/dataframe/iterator.py b/python/ray/dataframe/iterator.py new file mode 100644 index 000000000000..876162e6be14 --- /dev/null +++ b/python/ray/dataframe/iterator.py @@ -0,0 +1,33 @@ +class PartitionIterator(object): + def __init__(self, partitions, func): + """PartitionIterator class to define a generator on partitioned data + + Args: + partitions ([ObjectID]): Partitions to iterate over + func (callable): The function to get inner iterables from + each partition + """ + self.partitions = partitions + self.curr_partition = -1 + self.func = func + self.iter_cache = iter([]) + + def __iter__(self): + return self + + def __next__(self): + return self.next() + + def next(self): + try: + n = next(self.iter_cache) + return n + except StopIteration: + self.curr_partition += 1 + if self.curr_partition < len(self.partitions): + next_partition = self.partitions[self.curr_partition] + else: + raise StopIteration() + + self.iter_cache = self.func(next_partition, self.curr_partition) + return self.next() From 16f84490072a1b2bd476b94eba0ab5c9cad3cf6b Mon Sep 17 00:00:00 2001 From: Kunal Gosar Date: Wed, 9 May 2018 18:06:41 -0700 Subject: [PATCH 2/5] make iteration methods performant --- python/ray/dataframe/dataframe.py | 46 +++++++++++++------------------ 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 5a5968f18204..8cce4c144788 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -2359,18 +2359,17 @@ def items(self): Returns: A generator that iterates over the columns of the frame. """ - iters = ray.get([_deploy_func.remote( - lambda df: list(df.items()), part) - for part in self._row_partitions]) + def items_helper(part, i): + df = ray.get(part) + df.columns = self._col_metadata.partition_series(i).index + df.index = self.index + return df.items() - def concat_iters(iterables): - for partitions in enumerate(zip(*iterables)): - series = pd.concat([_series for _, _series in partitions[1]]) - series.index = self.index - series.name = list(self.columns)[partitions[0]] - yield (series.name, series) + partition_iterator = PartitionIterator(self._col_partitions, + items_helper) - return concat_iters(iters) + for v in partition_iterator: + yield v def iteritems(self): """Iterator over (column name, Series) pairs. @@ -2399,24 +2398,17 @@ def itertuples(self, index=True, name='Pandas'): Returns: A tuple representing row data. See args for varying tuples. """ - iters = ray.get([ - _deploy_func.remote( - lambda df: list(df.itertuples(index=index, name=name)), - part) for part in self._row_partitions]) - iters = itertools.chain.from_iterable(iters) - - def _replace_index(row_tuple, idx): - # We need to use try-except here because - # isinstance(row_tuple, namedtuple) won't work. - try: - row_tuple = row_tuple._replace(Index=idx) - except AttributeError: # Tuple not namedtuple - row_tuple = (idx,) + row_tuple[1:] - return row_tuple + def itertuples_helper(part, i): + df = ray.get(part) + df.columns = self.columns + df.index = self._row_metadata.partition_series(i).index + return df.itertuples(index=index, name=name) - if index: - iters = itertools.starmap(_replace_index, zip(iters, self.index)) - return iters + partition_iterator = PartitionIterator(self._row_partitions, + itertuples_helper) + + for v in partition_iterator: + yield v def join(self, other, on=None, how='left', lsuffix='', rsuffix='', sort=False): From a473ac270bee64e6102f02ef2d771a4a282fb834 Mon Sep 17 00:00:00 2001 From: Kunal Gosar Date: Thu, 10 May 2018 12:45:35 -0700 Subject: [PATCH 3/5] resolving comments --- python/ray/dataframe/iterator.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/ray/dataframe/iterator.py b/python/ray/dataframe/iterator.py index 876162e6be14..a8dc268874ee 100644 --- a/python/ray/dataframe/iterator.py +++ b/python/ray/dataframe/iterator.py @@ -1,4 +1,7 @@ -class PartitionIterator(object): +import collections + + +class PartitionIterator(collections.Iterator): def __init__(self, partitions, func): """PartitionIterator class to define a generator on partitioned data From 29d9bb2d4305aba5bd6bf934e50b9b2979610555 Mon Sep 17 00:00:00 2001 From: Kunal Gosar Date: Wed, 16 May 2018 15:48:29 -0700 Subject: [PATCH 4/5] remove indexing from iterator --- python/ray/dataframe/dataframe.py | 21 +++++++++++++++------ python/ray/dataframe/iterator.py | 19 ++++++------------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 8cce4c144788..43dc09d1800c 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -2336,10 +2336,13 @@ def iterrows(self): Returns: A generator that iterates over the rows of the frame. """ - def iterrow_helper(part, i): + index_iter = iter([self._row_metadata.partition_series(i).index + for i in range(len(self._row_partitions))]) + + def iterrow_helper(part): df = ray.get(part) df.columns = self.columns - df.index = self._row_metadata.partition_series(i).index + df.index = next(index_iter) return df.iterrows() partition_iterator = PartitionIterator(self._row_partitions, @@ -2359,9 +2362,12 @@ def items(self): Returns: A generator that iterates over the columns of the frame. """ - def items_helper(part, i): + col_iter = iter([self._col_metadata.partition_series(i).index + for i in range(len(self._col_partitions))]) + + def items_helper(part): df = ray.get(part) - df.columns = self._col_metadata.partition_series(i).index + df.columns = next(col_iter) df.index = self.index return df.items() @@ -2398,10 +2404,13 @@ def itertuples(self, index=True, name='Pandas'): Returns: A tuple representing row data. See args for varying tuples. """ - def itertuples_helper(part, i): + index_iter = iter([self._row_metadata.partition_series(i).index + for i in range(len(self._row_partitions))]) + + def itertuples_helper(part): df = ray.get(part) df.columns = self.columns - df.index = self._row_metadata.partition_series(i).index + df.index = next(index_iter) return df.itertuples(index=index, name=name) partition_iterator = PartitionIterator(self._row_partitions, diff --git a/python/ray/dataframe/iterator.py b/python/ray/dataframe/iterator.py index a8dc268874ee..b26d1cf465be 100644 --- a/python/ray/dataframe/iterator.py +++ b/python/ray/dataframe/iterator.py @@ -1,7 +1,7 @@ -import collections +from collections import Iterator -class PartitionIterator(collections.Iterator): +class PartitionIterator(Iterator): def __init__(self, partitions, func): """PartitionIterator class to define a generator on partitioned data @@ -10,8 +10,7 @@ def __init__(self, partitions, func): func (callable): The function to get inner iterables from each partition """ - self.partitions = partitions - self.curr_partition = -1 + self.partitions = iter(partitions) self.func = func self.iter_cache = iter([]) @@ -23,14 +22,8 @@ def __next__(self): def next(self): try: - n = next(self.iter_cache) - return n + return next(self.iter_cache) except StopIteration: - self.curr_partition += 1 - if self.curr_partition < len(self.partitions): - next_partition = self.partitions[self.curr_partition] - else: - raise StopIteration() - - self.iter_cache = self.func(next_partition, self.curr_partition) + next_partition = next(self.partitions) + self.iter_cache = self.func(next_partition) return self.next() From 6acfe91a1bd27a5c6bdf02ec8e2f381800897a71 Mon Sep 17 00:00:00 2001 From: Kunal Gosar Date: Wed, 16 May 2018 17:11:58 -0700 Subject: [PATCH 5/5] switch to iterator syntax --- python/ray/dataframe/dataframe.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 43dc09d1800c..b4bc8dda5dea 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -2336,8 +2336,8 @@ def iterrows(self): Returns: A generator that iterates over the rows of the frame. """ - index_iter = iter([self._row_metadata.partition_series(i).index - for i in range(len(self._row_partitions))]) + index_iter = (self._row_metadata.partition_series(i).index + for i in range(len(self._row_partitions))) def iterrow_helper(part): df = ray.get(part) @@ -2362,8 +2362,8 @@ def items(self): Returns: A generator that iterates over the columns of the frame. """ - col_iter = iter([self._col_metadata.partition_series(i).index - for i in range(len(self._col_partitions))]) + col_iter = (self._col_metadata.partition_series(i).index + for i in range(len(self._col_partitions))) def items_helper(part): df = ray.get(part) @@ -2404,8 +2404,8 @@ def itertuples(self, index=True, name='Pandas'): Returns: A tuple representing row data. See args for varying tuples. """ - index_iter = iter([self._row_metadata.partition_series(i).index - for i in range(len(self._row_partitions))]) + index_iter = (self._row_metadata.partition_series(i).index + for i in range(len(self._row_partitions))) def itertuples_helper(part): df = ray.get(part)