diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 41924d801bd7..b4bc8dda5dea 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -46,6 +46,7 @@ _correct_column_dtypes) from . import get_npartitions from .index_metadata import _IndexMetadata +from .iterator import PartitionIterator @_inherit_docstrings(pd.DataFrame) @@ -2328,50 +2329,53 @@ def iterrows(self): """Iterate over DataFrame rows as (index, Series) pairs. Note: - Generators can't be pickeled so from the remote function + Generators can't be pickled so from the remote function we expand the generator into a list before getting it. This is not that ideal. Returns: A generator that iterates over the rows of the frame. """ - def update_iterrow(series, i): - """Helper function to correct the columns + name of the Series.""" - series.index = self.columns - series.name = list(self.index)[i] - return series + index_iter = (self._row_metadata.partition_series(i).index + for i in range(len(self._row_partitions))) - iters = ray.get([_deploy_func.remote( - lambda df: list(df.iterrows()), part) - for part in self._row_partitions]) - iters = itertools.chain.from_iterable(iters) - series = map(lambda s: update_iterrow(s[1][1], s[0]), enumerate(iters)) + def iterrow_helper(part): + df = ray.get(part) + df.columns = self.columns + df.index = next(index_iter) + return df.iterrows() - return zip(self.index, series) + partition_iterator = PartitionIterator(self._row_partitions, + iterrow_helper) + + for v in partition_iterator: + yield v def items(self): """Iterator over (column name, Series) pairs. Note: - Generators can't be pickeled so from the remote function + Generators can't be pickled so from the remote function we expand the generator into a list before getting it. This is not that ideal. Returns: A generator that iterates over the columns of the frame. """ - iters = ray.get([_deploy_func.remote( - lambda df: list(df.items()), part) - for part in self._row_partitions]) + col_iter = (self._col_metadata.partition_series(i).index + for i in range(len(self._col_partitions))) + + def items_helper(part): + df = ray.get(part) + df.columns = next(col_iter) + df.index = self.index + return df.items() - def concat_iters(iterables): - for partitions in enumerate(zip(*iterables)): - series = pd.concat([_series for _, _series in partitions[1]]) - series.index = self.index - series.name = list(self.columns)[partitions[0]] - yield (series.name, series) + partition_iterator = PartitionIterator(self._col_partitions, + items_helper) - return concat_iters(iters) + for v in partition_iterator: + yield v def iteritems(self): """Iterator over (column name, Series) pairs. @@ -2393,31 +2397,27 @@ def itertuples(self, index=True, name='Pandas'): name (string, default "Pandas"): The name of the returned namedtuples or None to return regular tuples. Note: - Generators can't be pickeled so from the remote function + Generators can't be pickled so from the remote function we expand the generator into a list before getting it. This is not that ideal. Returns: A tuple representing row data. See args for varying tuples. """ - iters = ray.get([ - _deploy_func.remote( - lambda df: list(df.itertuples(index=index, name=name)), - part) for part in self._row_partitions]) - iters = itertools.chain.from_iterable(iters) - - def _replace_index(row_tuple, idx): - # We need to use try-except here because - # isinstance(row_tuple, namedtuple) won't work. - try: - row_tuple = row_tuple._replace(Index=idx) - except AttributeError: # Tuple not namedtuple - row_tuple = (idx,) + row_tuple[1:] - return row_tuple + index_iter = (self._row_metadata.partition_series(i).index + for i in range(len(self._row_partitions))) - if index: - iters = itertools.starmap(_replace_index, zip(iters, self.index)) - return iters + def itertuples_helper(part): + df = ray.get(part) + df.columns = self.columns + df.index = next(index_iter) + return df.itertuples(index=index, name=name) + + partition_iterator = PartitionIterator(self._row_partitions, + itertuples_helper) + + for v in partition_iterator: + yield v def join(self, other, on=None, how='left', lsuffix='', rsuffix='', sort=False): diff --git a/python/ray/dataframe/iterator.py b/python/ray/dataframe/iterator.py new file mode 100644 index 000000000000..b26d1cf465be --- /dev/null +++ b/python/ray/dataframe/iterator.py @@ -0,0 +1,29 @@ +from collections import Iterator + + +class PartitionIterator(Iterator): + def __init__(self, partitions, func): + """PartitionIterator class to define a generator on partitioned data + + Args: + partitions ([ObjectID]): Partitions to iterate over + func (callable): The function to get inner iterables from + each partition + """ + self.partitions = iter(partitions) + self.func = func + self.iter_cache = iter([]) + + def __iter__(self): + return self + + def __next__(self): + return self.next() + + def next(self): + try: + return next(self.iter_cache) + except StopIteration: + next_partition = next(self.partitions) + self.iter_cache = self.func(next_partition) + return self.next()