Skip to content
85 changes: 75 additions & 10 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
is_bool_dtype,
is_list_like,
is_numeric_dtype,
is_timedelta64_dtype)
is_timedelta64_dtype,
_get_dtype_from_object)
from pandas.core.indexing import check_bool_indexer

import warnings
Expand Down Expand Up @@ -977,9 +978,42 @@ def assign(self, **kwargs):
"github.com/ray-project/ray.")

def astype(self, dtype, copy=True, errors='raise', **kwargs):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
if isinstance(dtype, dict):
if (not set(dtype.keys()).issubset(set(self.columns)) and
errors == 'raise'):
raise KeyError(
"Only a column name can be used for the key in"
"a dtype mappings argument.")
columns = list(dtype.keys())
col_idx = [(self.columns.get_loc(columns[i]), columns[i])
if columns[i] in self.columns
else (columns[i], columns[i])
for i in range(len(columns))]
new_dict = {}
for idx, key in col_idx:
new_dict[idx] = dtype[key]
new_rows = _map_partitions(lambda df, dt: df.astype(dtype=dt,
copy=True,
errors=errors,
**kwargs),
self._row_partitions, new_dict)
if copy:
return DataFrame(row_partitions=new_rows,
columns=self.columns,
index=self.index)
self._row_partitions = new_rows
else:
new_blocks = [_map_partitions(lambda d: d.astype(dtype=dtype,
copy=True,
errors=errors,
**kwargs),
block)
for block in self._block_partitions]
if copy:
return DataFrame(block_partitions=new_blocks,
columns=self.columns,
index=self.index)
self._block_partitions = new_blocks

def at_time(self, time, asof=False):
raise NotImplementedError(
Expand Down Expand Up @@ -2688,9 +2722,33 @@ def select(self, crit, axis=0):
"github.com/ray-project/ray.")

def select_dtypes(self, include=None, exclude=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
# Validates arguments for whether both include and exclude are None or
# if they are disjoint. Also invalidates string dtypes.
pd.DataFrame().select_dtypes(include, exclude)

if include and not is_list_like(include):
include = [include]
elif not include:
include = []

if exclude and not is_list_like(exclude):
exclude = [exclude]
elif not exclude:
exclude = []

sel = tuple(map(set, (include, exclude)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code (and below) is somewhat hard to read, and can be wrapped up into the if-else cases above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code using map in particular?


include, exclude = map(
lambda x: set(map(_get_dtype_from_object, x)), sel)

dtypes = self.dtypes
indicate = [i for i in range(len(dtypes))
if ((len(exclude) != 0 and any(map(
lambda x: issubclass(dtypes[i].type, x), exclude)))
or (len(include) != 0 and not any(map(
lambda x: issubclass(dtypes[i].type, x), include))))]

return self.drop(columns=self.columns[indicate], inplace=False)

def sem(self, axis=None, skipna=None, level=None, ddof=1,
numeric_only=None, **kwargs):
Expand Down Expand Up @@ -3317,9 +3375,16 @@ def __getattr__(self, key):
raise e

def __setitem__(self, key, value):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
if not isinstance(key, str):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
if key not in self.columns:
self.insert(loc=len(self.columns), column=key, value=value)
else:
loc = self.columns.get_loc(key)
self.__delitem__(key)
self.insert(loc=loc, column=key, value=value)

def __len__(self):
"""Gets the length of the dataframe.
Expand Down
49 changes: 41 additions & 8 deletions python/ray/dataframe/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,10 +905,28 @@ def test_assign():


def test_astype():
ray_df = create_test_dataframe()
td = TestData()
ray_df_frame = from_pandas(td.frame, 2)
our_df_casted = ray_df_frame.astype(np.int32)
expected_df_casted = pd.DataFrame(td.frame.values.astype(np.int32),
index=td.frame.index,
columns=td.frame.columns)

with pytest.raises(NotImplementedError):
ray_df.astype(None)
assert(ray_df_equals_pandas(our_df_casted, expected_df_casted))

our_df_casted = ray_df_frame.astype(np.float64)
expected_df_casted = pd.DataFrame(td.frame.values.astype(np.float64),
index=td.frame.index,
columns=td.frame.columns)

assert(ray_df_equals_pandas(our_df_casted, expected_df_casted))

our_df_casted = ray_df_frame.astype(str)
expected_df_casted = pd.DataFrame(td.frame.values.astype(str),
index=td.frame.index,
columns=td.frame.columns)

assert(ray_df_equals_pandas(our_df_casted, expected_df_casted))


def test_at_time():
Expand Down Expand Up @@ -2150,7 +2168,7 @@ def test_reindex():
ray_df = create_test_dataframe()

with pytest.raises(NotImplementedError):
ray_df.reindex()
ray_df.reindex(None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a sanity check test here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking through the code, was reindex dropped from this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, reindex did not have full functionality (axes, errors) so I took it out for the PR against master

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this line.



def test_reindex_axis():
Expand Down Expand Up @@ -2524,10 +2542,25 @@ def test_select():


def test_select_dtypes():
ray_df = create_test_dataframe()

with pytest.raises(NotImplementedError):
ray_df.select_dtypes()
df = pd.DataFrame({'test1': list('abc'),
'test2': np.arange(3, 6).astype('u1'),
'test3': np.arange(8.0, 11.0, dtype='float64'),
'test4': [True, False, True],
'test5': pd.date_range('now', periods=3).values,
'test6': list(range(5, 8))})
include = np.float, 'integer'
exclude = np.bool_,
rd = from_pandas(df, 2)
r = rd.select_dtypes(include=include, exclude=exclude)

e = df[["test2", "test3", "test6"]]
assert(ray_df_equals_pandas(r, e))

try:
rdf.DataFrame().select_dtypes()
assert(False)
except ValueError:
assert(True)


def test_sem():
Expand Down