Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 128 additions & 7 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pandas._libs import lib
from pandas.core.dtypes.cast import maybe_upcast_putmask
from pandas import compat
from pandas.compat import lzip, string_types, cPickle as pkl
from pandas.compat import lzip, to_str, string_types, cPickle as pkl
import pandas.core.common as com
from pandas.core.dtypes.common import (
is_bool_dtype,
Expand Down Expand Up @@ -756,7 +756,8 @@ def transpose(self, *args, **kwargs):

T = property(transpose)

def dropna(self, axis, how, thresh=None, subset=[], inplace=False):
def dropna(self, axis=0, how='any', thresh=None, subset=None,
inplace=False):
"""Create a new DataFrame from the removed NA values from this one.

Args:
Expand All @@ -774,7 +775,89 @@ def dropna(self, axis, how, thresh=None, subset=[], inplace=False):
If inplace is set to True, returns None, otherwise returns a new
DataFrame with the dropna applied.
"""
raise NotImplementedError("Not yet")
if is_list_like(axis):
result = self
# TODO(kunalgosar): this builds an intermediate dataframe,
# which does unnecessary computation
for ax in axis:
result = result.dropna(
axis=ax, how=how, thresh=thresh, subset=subset)
if not inplace:
return result

return self._update_inplace(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't return self._update_inplace, it will return None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the docs, when inplace=True, the function should return false. I made this a bit clearer in the code that this is intentional.

block_partitions=result._block_partitions,
columns=result.columns,
index=result.index
)

axis = pd.DataFrame()._get_axis_number(axis)
inplace = validate_bool_kwarg(inplace, "inplace")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should move this up higher, before the is_list_like block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


if how is not None and how not in ['any', 'all']:
raise ValueError('invalid how option: %s' % how)
if how is None and thresh is None:
raise TypeError('must specify how or thresh')

if subset is not None:
subset = set(subset)

if axis == 1:
subset = [item for item in self.index if item in subset]
else:
subset = [item for item in self.columns if item in subset]

def dropna_helper(df):
new_df = df.dropna(axis=axis, how=how, thresh=thresh,
subset=subset, inplace=False)

if axis == 1:
new_index = new_df.columns
new_df.columns = pd.RangeIndex(0, len(new_df.columns))
else:
new_index = new_df.index
new_df.reset_index(drop=True, inplace=True)

return new_df, new_index

parts = self._col_partitions if axis == 1 else self._row_partitions
result = [_deploy_func._submit(args=(dropna_helper, df),
num_return_vals=2) for df in parts]
new_parts, new_vals = [list(t) for t in zip(*result)]

if axis == 1:
new_vals = [self._col_metadata.get_global_indices(i, vals)
for i, vals in enumerate(ray.get(new_vals))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can build the columns in a remote task. It might be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I move this code to a remote task, it would require serializing self._col_metadata._lengths and self.columns, which could potentially be expensive. The code would later block on a ray.get on the new columns generated remotely.

Given there would be a blocking operation either way, I think this is fine for now but can be revisited.

Copy link
Member

@devin-petersohn devin-petersohn May 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where the blocking operation occurs does matter. Here the main thread will be waiting until the entire dropna operation is complete. This happens whether you need the columns immediately or not. We should block when the user needs it, not when the user calls an operation. Given this can be an iteratively executed operation, we should not block on each call.


# This flattens the 2d array to 1d
new_vals = [i for j in new_vals for i in j]
new_cols = self.columns[new_vals]

if not inplace:
return DataFrame(col_partitions=new_parts,
columns=new_cols,
index=self.index)

self._update_inplace(col_partitions=new_parts,
columns=new_cols,
index=self.index)

else:
new_vals = [self._row_metadata.get_global_indices(i, vals)
for i, vals in enumerate(ray.get(new_vals))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here with the index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.


# This flattens the 2d array to 1d
new_vals = [i for j in new_vals for i in j]
new_rows = self.index[new_vals]

if not inplace:
return DataFrame(row_partitions=new_parts,
index=new_rows,
columns=self.columns)

self._update_inplace(row_partitions=new_parts,
index=new_rows,
columns=self.columns)

def add(self, other, axis='columns', level=None, fill_value=None):
"""Add this DataFrame to another or a scalar/list.
Expand Down Expand Up @@ -1772,9 +1855,45 @@ def fillna(self, value=None, method=None, axis=None, inplace=False,
return new_obj

def filter(self, items=None, like=None, regex=None, axis=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Subset rows or columns based on their labels

Args:
items (list): list of labels to subset
like (string): retain labels where `arg in label == True`
regex (string): retain labels matching regex input
axis: axis to filter on

Returns:
A new dataframe with the filter applied.
"""
nkw = com._count_not_none(items, like, regex)
if nkw > 1:
raise TypeError('Keyword arguments `items`, `like`, or `regex` '
'are mutually exclusive')
if nkw == 0:
raise TypeError('Must pass either `items`, `like`, or `regex`')

if axis is None:
axis = 'columns' # This is the default info axis for dataframes

axis = pd.DataFrame()._get_axis_number(axis)
labels = self.columns if axis else self.index

if items is not None:
bool_arr = labels.isin(items)
elif like is not None:
def f(x):
return like in to_str(x)
bool_arr = labels.map(f).tolist()
else:
def f(x):
return matcher.search(to_str(x)) is not None
matcher = re.compile(regex)
bool_arr = labels.map(f).tolist()

if not axis:
return self[bool_arr]
return self[self.columns[bool_arr]]

def first(self, offset):
raise NotImplementedError(
Expand Down Expand Up @@ -3957,7 +4076,9 @@ def _getitem_array(self, key):
index=index)
else:
columns = self._col_metadata[key].index
indices_for_rows = [col for col in self.col if col in set(columns)]
indices_for_rows = \
[i for i, item in enumerate(self.columns)
if item in set(columns)]

new_parts = [_deploy_func.remote(
lambda df: df.__getitem__(indices_for_rows),
Expand Down
7 changes: 7 additions & 0 deletions python/ray/dataframe/index_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ def insert(self, key, loc=None, partition=None,
# Return inserted coordinate for callee
return coord_to_insert

def get_global_indices(self, partition, index_within_partition_list):
total = 0
for i in range(partition):
total += self._lengths[i]

return [total + i for i in index_within_partition_list]

def squeeze(self, partition, index_within_partition):
"""Prepare a single coordinate for removal by "squeezing" the
subsequent coordinates "up" one index within that partition. To be used
Expand Down
112 changes: 108 additions & 4 deletions python/ray/dataframe/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ def test_int_dataframe():
'col3',
'col4']

filter_by = {'items': ['col1', 'col5'],
'regex': '4$|3$',
'like': 'col'}

test_filter(ray_df, pandas_df, filter_by)
test_roundtrip(ray_df, pandas_df)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
Expand Down Expand Up @@ -350,6 +355,11 @@ def test_float_dataframe():
'col3',
'col4']

filter_by = {'items': ['col1', 'col5'],
'regex': '4$|3$',
'like': 'col'}

test_filter(ray_df, pandas_df, filter_by)
test_roundtrip(ray_df, pandas_df)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
Expand Down Expand Up @@ -510,6 +520,11 @@ def test_mixed_dtype_dataframe():
'col3',
'col4']

filter_by = {'items': ['col1', 'col5'],
'regex': '4$|3$',
'like': 'col'}

test_filter(ray_df, pandas_df, filter_by)
test_roundtrip(ray_df, pandas_df)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
Expand Down Expand Up @@ -670,6 +685,11 @@ def test_nan_dataframe():
'col3',
'col4']

filter_by = {'items': ['col1', 'col5'],
'regex': '4$|3$',
'like': 'col'}

test_filter(ray_df, pandas_df, filter_by)
test_roundtrip(ray_df, pandas_df)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
Expand Down Expand Up @@ -806,6 +826,23 @@ def test_nan_dataframe():
test_transform(ray_df, pandas_df)


def test_dense_nan_df():
ray_df = rdf.DataFrame([[np.nan, 2, np.nan, 0],
[3, 4, np.nan, 1],
[np.nan, np.nan, np.nan, 5]],
columns=list('ABCD'))

pd_df = pd.DataFrame([[np.nan, 2, np.nan, 0],
[3, 4, np.nan, 1],
[np.nan, np.nan, np.nan, 5]],
columns=list('ABCD'))

test_dropna(ray_df, pd_df)
test_dropna_inplace(ray_df, pd_df)
test_dropna_multiple_axes(ray_df, pd_df)
test_dropna_multiple_axes_inplace(ray_df, pd_df)


@pytest.fixture
def test_inter_df_math(op, simple=False):
ray_df = rdf.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7],
Expand Down Expand Up @@ -1260,6 +1297,68 @@ def test_drop_duplicates():
ray_df.drop_duplicates()


@pytest.fixture
def test_dropna(ray_df, pd_df):
assert ray_df_equals_pandas(ray_df.dropna(axis=1, how='all'),
pd_df.dropna(axis=1, how='all'))

assert ray_df_equals_pandas(ray_df.dropna(axis=1, how='any'),
pd_df.dropna(axis=1, how='any'))

assert ray_df_equals_pandas(ray_df.dropna(axis=0, how='all'),
pd_df.dropna(axis=0, how='all'))

assert ray_df_equals_pandas(ray_df.dropna(thresh=2),
pd_df.dropna(thresh=2))


@pytest.fixture
def test_dropna_inplace(ray_df, pd_df):
ray_df = ray_df.copy()
pd_df = pd_df.copy()

ray_df.dropna(thresh=2, inplace=True)
pd_df.dropna(thresh=2, inplace=True)

assert ray_df_equals_pandas(ray_df, pd_df)

ray_df.dropna(axis=1, how='any', inplace=True)
pd_df.dropna(axis=1, how='any', inplace=True)

assert ray_df_equals_pandas(ray_df, pd_df)


@pytest.fixture
def test_dropna_multiple_axes(ray_df, pd_df):
assert ray_df_equals_pandas(
ray_df.dropna(how='all', axis=[0, 1]),
pd_df.dropna(how='all', axis=[0, 1])
)
assert ray_df_equals_pandas(
ray_df.dropna(how='all', axis=(0, 1)),
pd_df.dropna(how='all', axis=(0, 1))
)


@pytest.fixture
def test_dropna_multiple_axes_inplace(ray_df, pd_df):
ray_df_copy = ray_df.copy()
pd_df_copy = pd_df.copy()

ray_df_copy.dropna(how='all', axis=[0, 1], inplace=True)
pd_df_copy.dropna(how='all', axis=[0, 1], inplace=True)

assert ray_df_equals_pandas(ray_df_copy, pd_df_copy)

ray_df_copy = ray_df.copy()
pd_df_copy = pd_df.copy()

ray_df_copy.dropna(how='all', axis=(0, 1), inplace=True)
pd_df_copy.dropna(how='all', axis=(0, 1), inplace=True)

assert ray_df_equals_pandas(ray_df_copy, pd_df_copy)


def test_duplicated():
ray_df = create_test_dataframe()

Expand Down Expand Up @@ -1755,11 +1854,16 @@ def test_fillna_datetime_columns(num_partitions=2):
"""


def test_filter():
ray_df = create_test_dataframe()
@pytest.fixture
def test_filter(ray_df, pandas_df, by):
ray_df_equals_pandas(ray_df.filter(items=by['items']),
pandas_df.filter(items=by['items']))

with pytest.raises(NotImplementedError):
ray_df.filter()
ray_df_equals_pandas(ray_df.filter(regex=by['regex']),
pandas_df.filter(regex=by['regex']))

ray_df_equals_pandas(ray_df.filter(like=by['like']),
pandas_df.filter(like=by['like']))


def test_first():
Expand Down