Skip to content
107 changes: 93 additions & 14 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
is_bool_dtype,
is_list_like,
is_numeric_dtype,
is_timedelta64_dtype)
is_timedelta64_dtype,
_get_dtype_from_object)
from pandas.core.indexing import check_bool_indexer

import warnings
Expand Down Expand Up @@ -977,9 +978,34 @@ def assign(self, **kwargs):
"github.com/ray-project/ray.")

def astype(self, dtype, copy=True, errors='raise', **kwargs):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
if errors == 'raise':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not ignore errors when errors == 'raise'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kunalgosar What do you mean by your comment?

Copy link
Contributor

@kunalgosar kunalgosar Apr 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On errors == 'raise', these exceptions should be raised. On errors == 'ignore', they should be suppressed and self should be returned as per the pandas documentation. Currently errors are suppressed in this if case.

try:
pd.DataFrame().astype(dtype)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check doesn't work on empty dataframes

except (ValueError, TypeError):
return self
if isinstance(dtype, dict):
new_rows = _map_partitions(lambda df: df.astype(dtype=dtype,
copy=True,
errors='ignore',
**kwargs),
self._row_partitions)
if copy:
return DataFrame(row_partitions=new_rows,
columns=self.columns,
index=self.index)
self._row_partitions = new_rows
else:
new_blocks = [_map_partitions(lambda d: d.astype(dtype=dtype,
copy=True,
errors='ignore',
**kwargs),
block)
for block in self._block_partitions]
if copy:
return DataFrame(block_partitions=new_blocks,
columns=self.columns,
index=self.index)
self._block_partitions = new_blocks

def at_time(self, time, asof=False):
raise NotImplementedError(
Expand Down Expand Up @@ -2418,9 +2444,31 @@ def rdiv(self, other, axis='columns', level=None, fill_value=None):
def reindex(self, labels=None, index=None, columns=None, axis=None,
method=None, copy=True, level=None, fill_value=np.nan,
limit=None, tolerance=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
if not columns:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anytime that columns == None this will return an empty dataframe

return DataFrame()
col_idx = [self.columns.get_loc(columns[i])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to handle cases where some of the passed in values are not in self.columns. Pandas fills this case with NaNs.

for i in range(len(columns))
if columns[i] in self.columns]

if not copy:
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")

axis = pd.DataFrame()._get_axis_number(axis) if (axis) else 0
if axis == 1 or columns:
def row_helper(df, col_idx):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the partitions currently have RangeIndex on both rows and columns, this function won't work. Refer to utils._reindex_helper and ask @devin-petersohn for details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I tried to get around that by translating the columns passed in to the helper by using the loc index of the columns desired (col_idx)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this implementation of reindex for a future PR

df = df.reindex(columns=col_idx, copy=True)
return df
new_rows = _map_partitions(row_helper,
self._row_partitions, col_idx)
return DataFrame(row_partitions=new_rows,
columns=columns,
index=self.index)
else:
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")

def reindex_axis(self, labels, axis=0, method=None, level=None, copy=True,
limit=None, fill_value=np.nan):
Expand Down Expand Up @@ -2688,9 +2736,33 @@ def select(self, crit, axis=0):
"github.com/ray-project/ray.")

def select_dtypes(self, include=None, exclude=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
# Validates arguments for whether both include and exclude are None or
# if they are disjoint. Also invalidates string dtypes.
pd.DataFrame().select_dtypes(include, exclude)

if include and not is_list_like(include):
include = [include]
elif not include:
include = []

if exclude and not is_list_like(exclude):
exclude = [exclude]
elif not exclude:
exclude = []

sel = tuple(map(set, (include, exclude)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code (and below) is somewhat hard to read, and can be wrapped up into the if-else cases above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code using map in particular?


include, exclude = map(
lambda x: set(map(_get_dtype_from_object, x)), sel)

dtypes = self.dtypes
indicate = [i for i in range(len(dtypes))
if ((len(exclude) != 0 and any(map(
lambda x: issubclass(dtypes[i].type, x), exclude)))
or (len(include) != 0 and not any(map(
lambda x: issubclass(dtypes[i].type, x), include))))]

return self.drop(columns=self.columns[indicate], inplace=False)

def sem(self, axis=None, skipna=None, level=None, ddof=1,
numeric_only=None, **kwargs):
Expand Down Expand Up @@ -3260,7 +3332,7 @@ def _getitem_array(self, key):
new_parts = _map_partitions(lambda df: df[key],
self._col_partitions)
columns = self.columns
index = self.index[key]
index = self._col_metadata[key].index
Copy link
Contributor

@kunalgosar kunalgosar Apr 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this change?


return DataFrame(col_partitions=new_parts,
columns=columns,
Expand Down Expand Up @@ -3317,9 +3389,16 @@ def __getattr__(self, key):
raise e

def __setitem__(self, key, value):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
if not isinstance(key, str):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
if key not in self.columns:
self.insert(loc=len(self.columns), column=key, value=value)
else:
loc = self.columns.get_loc(key)
self.__delitem__(key)
self.insert(loc=loc, column=key, value=value)

def __len__(self):
"""Gets the length of the dataframe.
Expand Down
58 changes: 47 additions & 11 deletions python/ray/dataframe/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,10 +905,28 @@ def test_assign():


def test_astype():
ray_df = create_test_dataframe()
td = TestData()
ray_df_frame = from_pandas(td.frame, 2)
our_df_casted = ray_df_frame.astype(np.int32)
expected_df_casted = pd.DataFrame(td.frame.values.astype(np.int32),
index=td.frame.index,
columns=td.frame.columns)

with pytest.raises(NotImplementedError):
ray_df.astype(None)
assert(ray_df_equals_pandas(our_df_casted, expected_df_casted))

our_df_casted = ray_df_frame.astype(np.float64)
expected_df_casted = pd.DataFrame(td.frame.values.astype(np.float64),
index=td.frame.index,
columns=td.frame.columns)

assert(ray_df_equals_pandas(our_df_casted, expected_df_casted))

our_df_casted = ray_df_frame.astype(str)
expected_df_casted = pd.DataFrame(td.frame.values.astype(str),
index=td.frame.index,
columns=td.frame.columns)

assert(ray_df_equals_pandas(our_df_casted, expected_df_casted))


def test_at_time():
Expand Down Expand Up @@ -2147,10 +2165,13 @@ def test_rdiv():


def test_reindex():
ray_df = create_test_dataframe()

with pytest.raises(NotImplementedError):
ray_df.reindex()
trd = rdf.DataFrame({'a': list('abc'),
'b': list(range(1, 4)),
'c': np.arange(3, 6).astype('u1'),
'd': np.arange(4.0, 7.0, dtype='float64'),
'e': [True, False, True],
'f': pd.date_range('now', periods=3).values})
assert(trd.reindex(columns=['a', 'b'])['b'].equals(trd['b']))


def test_reindex_axis():
Expand Down Expand Up @@ -2524,10 +2545,25 @@ def test_select():


def test_select_dtypes():
ray_df = create_test_dataframe()

with pytest.raises(NotImplementedError):
ray_df.select_dtypes()
df = pd.DataFrame({'test1': list('abc'),
'test2': np.arange(3, 6).astype('u1'),
'test3': np.arange(8.0, 11.0, dtype='float64'),
'test4': [True, False, True],
'test5': pd.date_range('now', periods=3).values,
'test6': list(range(5, 8))})
include = np.float, 'integer'
exclude = np.bool_,
rd = from_pandas(df, 2)
r = rd.select_dtypes(include=include, exclude=exclude)

e = df[["test2", "test3", "test6"]]
assert(ray_df_equals_pandas(r, e))

try:
rdf.DataFrame().select_dtypes()
assert(False)
except ValueError:
assert(True)


def test_sem():
Expand Down