Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a28e092
Add parquet-cpp to gitignore
simon-mo Feb 20, 2018
9678cc4
Add read_csv and read_parquet
simon-mo Feb 20, 2018
184b544
Gitignore pytest_cache
simon-mo Feb 20, 2018
91a5d7b
Fix flake8
simon-mo Feb 20, 2018
a6f27e6
Add io to __init__
simon-mo Feb 20, 2018
9734308
Merge branch 'master' of https://github.com/ray-project/ray into df_io
simon-mo Feb 21, 2018
bba597c
Changing Index. Currently running tests, but so far untested.
devin-petersohn Feb 22, 2018
82f2bd7
Removing issue of reassigning DF in from_pandas
devin-petersohn Feb 22, 2018
48dc6fc
Fixing lint
devin-petersohn Feb 22, 2018
bf42a26
Fix bug
devin-petersohn Feb 22, 2018
f0b2069
Fix bug
devin-petersohn Feb 22, 2018
52e269a
Fix bug
devin-petersohn Feb 22, 2018
ac717b2
Better performance
devin-petersohn Feb 22, 2018
7efea34
Fixing index issue with sum
devin-petersohn Feb 23, 2018
15a5271
Address comments
simon-mo Feb 23, 2018
ef80f9f
Merge branch 'df_patch03' of https://github.com/devin-petersohn/ray i…
simon-mo Feb 23, 2018
32dbd9e
Update io with index
simon-mo Feb 23, 2018
c1d781e
Updating performance and implementation. Adding tests
devin-petersohn Feb 24, 2018
f5ee1bd
Fixing off-by-1
devin-petersohn Feb 24, 2018
aa9aca8
Fix lint
devin-petersohn Feb 24, 2018
a268c40
Address Comments
simon-mo Feb 24, 2018
3f1d958
Merge branch 'df_patch03' of https://github.com/devin-petersohn/ray i…
simon-mo Feb 24, 2018
223f3ed
Make pop compatible with new to_pandas
simon-mo Feb 24, 2018
5e583e6
Format Code
simon-mo Feb 24, 2018
20fffcc
Merge branch 'master' of https://github.com/ray-project/ray into df_io
simon-mo Feb 25, 2018
634dba4
Cleanup some index issue
simon-mo Feb 26, 2018
9b69cfc
Bug fix: assigned reset_index back
simon-mo Feb 26, 2018
631e418
Merge branch 'master' into df_io
simon-mo Feb 26, 2018
d19514e
Remove unused debug line
simon-mo Feb 26, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
/src/thirdparty/boost_1_60_0/
/src/thirdparty/catapult/
/src/thirdparty/flatbuffers/
/src/thirdparty/parquet-cpp

# Files generated by flatc should be ignored
/src/common/format/*.py
Expand Down Expand Up @@ -137,3 +138,6 @@ build
/site/Gemfile.lock
/site/.sass-cache
/site/_site

# Pytest Cache
**/.pytest_cache
28 changes: 23 additions & 5 deletions python/ray/dataframe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,27 @@
from __future__ import division
from __future__ import print_function

from .dataframe import DataFrame
from .dataframe import from_pandas
from .dataframe import to_pandas
from .series import Series
DEFAULT_NPARTITIONS = 10

__all__ = ["DataFrame", "from_pandas", "to_pandas", "Series"]

def set_npartition_default(n):
global DEFAULT_NPARTITIONS
DEFAULT_NPARTITIONS = n


def get_npartitions():
return DEFAULT_NPARTITIONS


# We import these file after above two function
# because they depend on npartitions.
from .dataframe import DataFrame # noqa: 402
from .dataframe import from_pandas # noqa: 402
from .dataframe import to_pandas # noqa: 402
from .series import Series # noqa: 402
from .io import (read_csv, read_parquet) # noqa: 402

__all__ = [
"DataFrame", "from_pandas", "to_pandas", "Series", "read_csv",
"read_parquet"
]
59 changes: 47 additions & 12 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,16 +373,29 @@ def transpose(self, *args, **kwargs):
temp_index = [idx
for _ in range(len(self._df))
for idx in self.columns]

temp_columns = self.index
local_transpose = self._map_partitions(
lambda df: df.transpose(*args, **kwargs), index=temp_index)
local_transpose.columns = temp_columns

# Sum will collapse the NAs from the groupby
return local_transpose.reduce_by_index(
df = local_transpose.reduce_by_index(
lambda df: df.apply(lambda x: x), axis=1)

# Reassign the columns within partition to self.index.
# We have to use _depoly_func instead of _map_partition due to
# new_labels argument
def _reassign_columns(df, new_labels):
df.columns = new_labels
return df
df._df = [
_deploy_func.remote(
_reassign_columns,
part,
self.index) for part in df._df]

return df

T = property(transpose)

def dropna(self, axis, how, thresh=None, subset=[], inplace=False):
Expand Down Expand Up @@ -563,9 +576,15 @@ def count(self, axis=0, level=None, numeric_only=False):
for _ in range(len(self._df))
for idx in self.columns]

return sum(ray.get(self._map_partitions(lambda df: df.count(
axis=axis, level=level, numeric_only=numeric_only
), index=temp_index)._df))
collapsed_df = sum(
ray.get(
self._map_partitions(
lambda df: df.count(
axis=axis,
level=level,
numeric_only=numeric_only),
index=temp_index)._df))
return collapsed_df

def cov(self, min_periods=None):
raise NotImplementedError("Not Yet implemented.")
Expand Down Expand Up @@ -865,7 +884,9 @@ def iterrows(self):
iters = ray.get([
_deploy_func.remote(
lambda df: list(df.iterrows()), part) for part in self._df])
return itertools.chain.from_iterable(iters)
iters = itertools.chain.from_iterable(iters)
series = map(lambda idx_series_tuple: idx_series_tuple[1], iters)
return zip(self.index, series)

def items(self):
"""Iterator over (column name, Series) pairs.
Expand All @@ -884,6 +905,7 @@ def items(self):
def concat_iters(iterables):
for partitions in zip(*iterables):
series = pd.concat([_series for _, _series in partitions])
series.index = self.index
yield (series.name, series)

return concat_iters(iters)
Expand Down Expand Up @@ -919,7 +941,20 @@ def itertuples(self, index=True, name='Pandas'):
_deploy_func.remote(
lambda df: list(df.itertuples(index=index, name=name)),
part) for part in self._df])
return itertools.chain.from_iterable(iters)
iters = itertools.chain.from_iterable(iters)

def _replace_index(row_tuple, idx):
# We need to use try-except here because
# isinstance(row_tuple, namedtuple) won't work.
try:
row_tuple = row_tuple._replace(Index=idx)
except AttributeError: # Tuple not namedtuple
row_tuple = (idx,) + row_tuple[1:]
return row_tuple

if index:
iters = itertools.starmap(_replace_index, zip(iters, self.index))
return iters

def join(self, other, on=None, how='left', lsuffix='', rsuffix='',
sort=False):
Expand Down Expand Up @@ -1100,8 +1135,7 @@ def pop(self, item):
popped = to_pandas(self._map_partitions(
lambda df: df.pop(item)))
self._df = self._map_partitions(lambda df: df.drop([item], axis=1))._df
self.columns = [col for col in self.columns if col != item]

self.columns = self.columns.drop(item)
return popped

def pow(self, other, axis='columns', level=None, fill_value=None):
Expand Down Expand Up @@ -1949,13 +1983,14 @@ def from_pandas(df, npartitions=None, chunksize=None, sort=True):
while len(temp_df) > chunksize:
t_df = temp_df[:chunksize]
lengths.append(len(t_df))
# reindex here because we want a pd.RangeIndex within the partitions.
# It is smaller and sometimes faster.
t_df.reindex()
# reset_index here because we want a pd.RangeIndex
# within the partitions. It is smaller and sometimes faster.
t_df = t_df.reset_index(drop=True)
top = ray.put(t_df)
dataframes.append(top)
temp_df = temp_df[chunksize:]
else:
temp_df = temp_df.reset_index(drop=True)
dataframes.append(ray.put(temp_df))
lengths.append(len(temp_df))

Expand Down
Loading