Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 41 additions & 41 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
_correct_column_dtypes)
from . import get_npartitions
from .index_metadata import _IndexMetadata
from .iterator import PartitionIterator


@_inherit_docstrings(pd.DataFrame)
Expand Down Expand Up @@ -2328,50 +2329,53 @@ def iterrows(self):
"""Iterate over DataFrame rows as (index, Series) pairs.

Note:
Generators can't be pickeled so from the remote function
Generators can't be pickled so from the remote function
we expand the generator into a list before getting it.
This is not that ideal.

Returns:
A generator that iterates over the rows of the frame.
"""
def update_iterrow(series, i):
"""Helper function to correct the columns + name of the Series."""
series.index = self.columns
series.name = list(self.index)[i]
return series
index_iter = (self._row_metadata.partition_series(i).index
for i in range(len(self._row_partitions)))

iters = ray.get([_deploy_func.remote(
lambda df: list(df.iterrows()), part)
for part in self._row_partitions])
iters = itertools.chain.from_iterable(iters)
series = map(lambda s: update_iterrow(s[1][1], s[0]), enumerate(iters))
def iterrow_helper(part):
df = ray.get(part)
df.columns = self.columns
df.index = next(index_iter)
return df.iterrows()

return zip(self.index, series)
partition_iterator = PartitionIterator(self._row_partitions,
iterrow_helper)

for v in partition_iterator:
yield v

def items(self):
"""Iterator over (column name, Series) pairs.

Note:
Generators can't be pickeled so from the remote function
Generators can't be pickled so from the remote function
we expand the generator into a list before getting it.
This is not that ideal.

Returns:
A generator that iterates over the columns of the frame.
"""
iters = ray.get([_deploy_func.remote(
lambda df: list(df.items()), part)
for part in self._row_partitions])
col_iter = (self._col_metadata.partition_series(i).index
for i in range(len(self._col_partitions)))

def items_helper(part):
df = ray.get(part)
df.columns = next(col_iter)
df.index = self.index
return df.items()

def concat_iters(iterables):
for partitions in enumerate(zip(*iterables)):
series = pd.concat([_series for _, _series in partitions[1]])
series.index = self.index
series.name = list(self.columns)[partitions[0]]
yield (series.name, series)
partition_iterator = PartitionIterator(self._col_partitions,
items_helper)

return concat_iters(iters)
for v in partition_iterator:
yield v

def iteritems(self):
"""Iterator over (column name, Series) pairs.
Expand All @@ -2393,31 +2397,27 @@ def itertuples(self, index=True, name='Pandas'):
name (string, default "Pandas"): The name of the returned
namedtuples or None to return regular tuples.
Note:
Generators can't be pickeled so from the remote function
Generators can't be pickled so from the remote function
we expand the generator into a list before getting it.
This is not that ideal.

Returns:
A tuple representing row data. See args for varying tuples.
"""
iters = ray.get([
_deploy_func.remote(
lambda df: list(df.itertuples(index=index, name=name)),
part) for part in self._row_partitions])
iters = itertools.chain.from_iterable(iters)

def _replace_index(row_tuple, idx):
# We need to use try-except here because
# isinstance(row_tuple, namedtuple) won't work.
try:
row_tuple = row_tuple._replace(Index=idx)
except AttributeError: # Tuple not namedtuple
row_tuple = (idx,) + row_tuple[1:]
return row_tuple
index_iter = (self._row_metadata.partition_series(i).index
for i in range(len(self._row_partitions)))

if index:
iters = itertools.starmap(_replace_index, zip(iters, self.index))
return iters
def itertuples_helper(part):
df = ray.get(part)
df.columns = self.columns
df.index = next(index_iter)
return df.itertuples(index=index, name=name)

partition_iterator = PartitionIterator(self._row_partitions,
itertuples_helper)

for v in partition_iterator:
yield v

def join(self, other, on=None, how='left', lsuffix='', rsuffix='',
sort=False):
Expand Down
29 changes: 29 additions & 0 deletions python/ray/dataframe/iterator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from collections import Iterator


class PartitionIterator(Iterator):
def __init__(self, partitions, func):
"""PartitionIterator class to define a generator on partitioned data

Args:
partitions ([ObjectID]): Partitions to iterate over
func (callable): The function to get inner iterables from
each partition
"""
self.partitions = iter(partitions)
self.func = func
self.iter_cache = iter([])

def __iter__(self):
return self

def __next__(self):
return self.next()

def next(self):
try:
return next(self.iter_cache)
except StopIteration:
next_partition = next(self.partitions)
self.iter_cache = self.func(next_partition)
return self.next()