Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/dataframe/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False,
"Obj as dicts not implemented. To contribute to "
"Pandas on Ray, please visit github.com/ray-project/ray.")

axis = pandas.DataFrame()._get_axis_number(axis)
axis = DataFrame()._get_axis_number(axis)

if join not in ['inner', 'outer']:
raise ValueError("Only can inner (intersect) or outer (union) join the"
Expand Down
113 changes: 74 additions & 39 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pandas.core.dtypes.common import (
is_bool_dtype,
is_list_like,
is_integer,
is_numeric_dtype,
is_timedelta64_dtype,
_get_dtype_from_object)
Expand Down Expand Up @@ -167,6 +168,11 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
if self._dtypes_cache is None:
self._correct_dtypes()

axes = ['index', 'columns']
self._AXIS_ALIASES = {'rows': 0}
self._AXIS_NUMBERS = {a: i for i, a in enumerate(axes)}
self._AXIS_NAMES = dict(enumerate(axes))

def _get_row_partitions(self):
return [_blocks_to_row.remote(*part)
for part in self._block_partitions]
Expand Down Expand Up @@ -352,7 +358,7 @@ def _arithmetic_helper(self, remote_func, axis, level=None):
if level is not None:
raise NotImplementedError("Level not yet supported.")

axis = pd.DataFrame()._get_axis_number(axis) if axis is not None \
axis = self._get_axis_number(axis) if axis is not None \
else 0

oid_series = ray.get(_map_partitions(remote_func,
Expand Down Expand Up @@ -395,6 +401,36 @@ def _validate_eval_query(self, expr, **kwargs):
if 'parser' in kwargs and kwargs['parser'] == 'python':
raise NotImplementedError("'Not' nodes are not implemented.")

def _get_axis_number(self, axis):
axis = self._AXIS_ALIASES.get(axis, axis)
if is_integer(axis):
if axis in self._AXIS_NAMES:
return axis
else:
try:
return self._AXIS_NUMBERS[axis]
except KeyError:
pass
raise ValueError('No axis named {0} for object type {1}'
.format(axis, type(self)))

def _get_axis_name(self, axis):
axis = self._AXIS_ALIASES.get(axis, axis)
if isinstance(axis, string_types):
if axis in self._AXIS_NUMBERS:
return axis
else:
try:
return self._AXIS_NAMES[axis]
except KeyError:
pass
raise ValueError('No axis named {0} for object type {1}'
.format(axis, type(self)))

def _get_axis(self, axis):
name = self._get_axis_name(axis)
return getattr(self, name)

@property
def size(self):
"""Get the number of elements in the DataFrame.
Expand Down Expand Up @@ -628,7 +664,7 @@ def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True,
Returns:
A new DataFrame resulting from the groupby.
"""
axis = pd.DataFrame()._get_axis_number(axis)
axis = self._get_axis_number(axis)
if callable(by):
by = by(self.index)
elif isinstance(by, compat.string_types):
Expand Down Expand Up @@ -782,7 +818,7 @@ def dropna(self, axis=0, how='any', thresh=None, subset=None,
inplace = validate_bool_kwarg(inplace, "inplace")

if is_list_like(axis):
axis = [pd.DataFrame()._get_axis_number(ax) for ax in axis]
axis = [self._get_axis_number(ax) for ax in axis]

result = self
# TODO(kunalgosar): this builds an intermediate dataframe,
Expand All @@ -799,24 +835,25 @@ def dropna(self, axis=0, how='any', thresh=None, subset=None,

return None

axis = pd.DataFrame()._get_axis_number(axis)
axis = self._get_axis_number(axis)
agg_axis = 1 - axis

if how is not None and how not in ['any', 'all']:
raise ValueError('invalid how option: %s' % how)
if how is None and thresh is None:
raise TypeError('must specify how or thresh')

indices = None
if subset is not None:
subset = set(subset)

if axis == 1:
subset = [item for item in self.index if item in subset]
else:
subset = [item for item in self.columns if item in subset]
ax = self._get_axis(agg_axis)
indices = ax.get_indexer_for(subset)
check = indices == -1
if check.any():
raise KeyError(list(np.compress(check, subset)))

def dropna_helper(df):
new_df = df.dropna(axis=axis, how=how, thresh=thresh,
subset=subset, inplace=False)
subset=indices, inplace=False)

if axis == 1:
new_index = new_df.columns
Expand All @@ -838,7 +875,7 @@ def dropna_helper(df):

# This flattens the 2d array to 1d
new_vals = [i for j in new_vals for i in j]
new_cols = self.columns[new_vals]
new_cols = self._get_axis(axis)[new_vals]

if not inplace:
return DataFrame(col_partitions=new_parts,
Expand All @@ -855,7 +892,7 @@ def dropna_helper(df):

# This flattens the 2d array to 1d
new_vals = [i for j in new_vals for i in j]
new_rows = self.index[new_vals]
new_rows = self._get_axis(axis)[new_vals]

if not inplace:
return DataFrame(row_partitions=new_parts,
Expand Down Expand Up @@ -888,7 +925,7 @@ def agg(self, func, axis=0, *args, **kwargs):
return self.aggregate(func, axis, *args, **kwargs)

def aggregate(self, func, axis=0, *args, **kwargs):
axis = pd.DataFrame()._get_axis_number(axis)
axis = self._get_axis_number(axis)

result = None

Expand Down Expand Up @@ -1007,7 +1044,7 @@ def agg_helper(df, arg, *args, **kwargs):
is_series = ray.get(is_series)
if all(is_series):
new_series = pd.concat(ray.get(new_parts))
new_series.index = self.columns if axis == 0 else self.index
new_series.index = self._get_axis(1 - axis)
return new_series
# This error is thrown when some of the partitions return Series and
# others return DataFrames. We do not allow mixed returns.
Expand Down Expand Up @@ -1128,7 +1165,7 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None,
Returns:
Series or DataFrame, depending on func.
"""
axis = pd.DataFrame()._get_axis_number(axis)
axis = self._get_axis_number(axis)

if isinstance(func, compat.string_types):
if axis == 1:
Expand Down Expand Up @@ -1225,10 +1262,10 @@ def astype(self, dtype, copy=True, errors='raise', **kwargs):
"Only a column name can be used for the key in"
"a dtype mappings argument.")
columns = list(dtype.keys())
col_idx = [(self.columns.get_loc(columns[i]), columns[i])
if columns[i] in self.columns
else (columns[i], columns[i])
for i in range(len(columns))]

col_idx = self.columns.get_indexer_for(columns)
col_idx = zip(col_idx, columns)

new_dict = {}
for idx, key in col_idx:
new_dict[idx] = dtype[key]
Expand Down Expand Up @@ -1376,7 +1413,7 @@ def cov(self, min_periods=None):
"github.com/ray-project/ray.")

def _cumulative_helper(self, func, axis):
axis = pd.DataFrame()._get_axis_number(axis) if axis is not None \
axis = self._get_axis_number(axis) if axis is not None \
else 0

if axis == 0:
Expand Down Expand Up @@ -1498,7 +1535,7 @@ def diff(self, periods=1, axis=0):
Returns:
DataFrame with the diff applied
"""
axis = pd.DataFrame()._get_axis_number(axis)
axis = self._get_axis_number(axis)
partitions = (self._col_partitions if
axis == 0 else self._row_partitions)

Expand Down Expand Up @@ -1579,7 +1616,7 @@ def drop(self, labels=None, axis=0, index=None, columns=None, level=None,
if index is not None or columns is not None:
raise ValueError("Cannot specify both 'labels' and "
"'index'/'columns'")
axis = pd.DataFrame()._get_axis_name(axis)
axis = self._get_axis_name(axis)
axes = {axis: labels}
elif index is not None or columns is not None:
axes, _ = pd.DataFrame()._construct_axes_from_arguments((index,
Expand Down Expand Up @@ -1877,7 +1914,7 @@ def fillna(self, value=None, method=None, axis=None, inplace=False,

inplace = validate_bool_kwarg(inplace, 'inplace')

axis = pd.DataFrame()._get_axis_number(axis) \
axis = self._get_axis_number(axis) \
if axis is not None \
else 0

Expand Down Expand Up @@ -1973,8 +2010,8 @@ def filter(self, items=None, like=None, regex=None, axis=None):
if axis is None:
axis = 'columns' # This is the default info axis for dataframes

axis = pd.DataFrame()._get_axis_number(axis)
labels = self.columns if axis else self.index
axis = self._get_axis_number(axis)
labels = self._get_axis(axis)

if items is not None:
bool_arr = labels.isin(items)
Expand Down Expand Up @@ -2845,7 +2882,7 @@ def mode(self, axis=0, numeric_only=False):
Returns:
DataFrame: The mode of the DataFrame.
"""
axis = pd.DataFrame()._get_axis_number(axis)
axis = self._get_axis_number(axis)

def mode_helper(df):
mode_df = df.mode(axis=axis, numeric_only=numeric_only)
Expand Down Expand Up @@ -3155,7 +3192,7 @@ def quantile_helper(df, base_object):
return df.quantile(q=q, axis=axis, numeric_only=numeric_only,
interpolation=interpolation)

axis = pd.DataFrame()._get_axis_number(axis)
axis = self._get_axis_number(axis)

if isinstance(q, (pd.Series, np.ndarray, pd.Index, list)):

Expand Down Expand Up @@ -3248,7 +3285,7 @@ def rank_helper(df):
na_option=na_option,
ascending=ascending, pct=pct)

axis = pd.DataFrame()._get_axis_number(axis)
axis = self._get_axis_number(axis)

if (axis == 1):
new_cols = self.dtypes[self.dtypes.apply(
Expand Down Expand Up @@ -3557,7 +3594,7 @@ def sample(self, n=None, frac=None, replace=False, weights=None,
A new Dataframe
"""

axis = pd.DataFrame()._get_axis_number(axis) if axis is not None \
axis = self._get_axis_number(axis) if axis is not None \
else 0

if axis == 0:
Expand Down Expand Up @@ -3770,7 +3807,7 @@ def set_axis(self, labels, axis=0, inplace=None):
FutureWarning, stacklevel=2)
inplace = True
if inplace:
setattr(self, pd.DataFrame()._get_axis_name(axis), labels)
setattr(self, self._get_axis_name(axis), labels)
else:
obj = self.copy()
obj.set_axis(labels, axis=axis, inplace=True)
Expand Down Expand Up @@ -3919,7 +3956,7 @@ def sort_index(self, axis=0, level=None, ascending=True, inplace=False,
return self.sort_values(by, axis=axis, ascending=ascending,
inplace=inplace)

axis = pd.DataFrame()._get_axis_number(axis)
axis = self._get_axis_number(axis)

args = (axis, level, ascending, False, kind, na_position,
sort_remaining)
Expand Down Expand Up @@ -3981,7 +4018,7 @@ def sort_values(self, by, axis=0, ascending=True, inplace=False,
A sorted DataFrame.
"""

axis = pd.DataFrame()._get_axis_number(axis)
axis = self._get_axis_number(axis)

if not is_list_like(by):
by = [by]
Expand Down Expand Up @@ -4491,7 +4528,7 @@ def where(self, cond, other=np.nan, inplace=False, axis=None, level=None,
raise NotImplementedError("Multilevel Index not yet supported on "
"Pandas on Ray.")

axis = pd.DataFrame()._get_axis_number(axis) if axis is not None else 0
axis = self._get_axis_number(axis) if axis is not None else 0

cond = cond(self) if callable(cond) else cond

Expand Down Expand Up @@ -4635,9 +4672,7 @@ def _getitem_array(self, key):
index=index)
else:
columns = self._col_metadata[key].index
indices_for_rows = \
[i for i, item in enumerate(self.columns)
if item in set(columns)]
indices_for_rows = self.columns.get_indexer_for(columns)

new_parts = [_deploy_func.remote(
lambda df: df.__getitem__(indices_for_rows),
Expand Down Expand Up @@ -5103,7 +5138,7 @@ def _inter_df_op_helper(self, func, other, how, axis, level,
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")
axis = pd.DataFrame()._get_axis_number(axis)
axis = self._get_axis_number(axis)

new_column_index = self.columns.join(other.columns, how=how)
new_index = self.index.join(other.index, how=how)
Expand Down Expand Up @@ -5131,7 +5166,7 @@ def _single_df_op_helper(self, func, other, axis, level):
if level is not None:
raise NotImplementedError("Multilevel index not yet supported "
"in Pandas on Ray")
axis = pd.DataFrame()._get_axis_number(axis)
axis = self._get_axis_number(axis)

if is_list_like(other):
new_index = self.index
Expand Down
39 changes: 39 additions & 0 deletions python/ray/dataframe/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,10 +842,15 @@ def test_dense_nan_df():
[np.nan, np.nan, np.nan, 5]],
columns=list('ABCD'))

column_subsets = [list('AD'), list('BC'), list('CD')]
row_subsets = [[0, 1], [0, 1, 2], [2, 0]]

test_dropna(ray_df, pd_df)
test_dropna_inplace(ray_df, pd_df)
test_dropna_multiple_axes(ray_df, pd_df)
test_dropna_multiple_axes_inplace(ray_df, pd_df)
test_dropna_subset(ray_df, pd_df, column_subsets, row_subsets)
test_dropna_subset_error(ray_df)


@pytest.fixture
Expand Down Expand Up @@ -1402,6 +1407,40 @@ def test_dropna_multiple_axes_inplace(ray_df, pd_df):
assert ray_df_equals_pandas(ray_df_copy, pd_df_copy)


@pytest.fixture
def test_dropna_subset(ray_df, pd_df, column_subsets, row_subsets):
for subset in column_subsets:
assert ray_df_equals_pandas(
ray_df.dropna(how='all', subset=subset),
pd_df.dropna(how='all', subset=subset)
)

assert ray_df_equals_pandas(
ray_df.dropna(how='any', subset=subset),
pd_df.dropna(how='any', subset=subset)
)

for subset in row_subsets:
assert ray_df_equals_pandas(
ray_df.dropna(how='all', axis=1, subset=subset),
pd_df.dropna(how='all', axis=1, subset=subset)
)

assert ray_df_equals_pandas(
ray_df.dropna(how='any', axis=1, subset=subset),
pd_df.dropna(how='any', axis=1, subset=subset)
)


@pytest.fixture
def test_dropna_subset_error(ray_df):
with pytest.raises(KeyError):
ray_df.dropna(subset=list('EF'))

with pytest.raises(KeyError):
ray_df.dropna(axis=1, subset=[4, 5])


def test_duplicated():
ray_df = create_test_dataframe()

Expand Down