diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 0668c3569581..4bc0997c3b61 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -9,7 +9,7 @@ from pandas._libs import lib from pandas.core.dtypes.cast import maybe_upcast_putmask from pandas import compat -from pandas.compat import lzip, string_types, cPickle as pkl +from pandas.compat import lzip, to_str, string_types, cPickle as pkl import pandas.core.common as com from pandas.core.dtypes.common import ( is_bool_dtype, @@ -756,7 +756,8 @@ def transpose(self, *args, **kwargs): T = property(transpose) - def dropna(self, axis, how, thresh=None, subset=[], inplace=False): + def dropna(self, axis=0, how='any', thresh=None, subset=None, + inplace=False): """Create a new DataFrame from the removed NA values from this one. Args: @@ -774,7 +775,94 @@ def dropna(self, axis, how, thresh=None, subset=[], inplace=False): If inplace is set to True, returns None, otherwise returns a new DataFrame with the dropna applied. """ - raise NotImplementedError("Not yet") + inplace = validate_bool_kwarg(inplace, "inplace") + + if is_list_like(axis): + axis = [pd.DataFrame()._get_axis_number(ax) for ax in axis] + + result = self + # TODO(kunalgosar): this builds an intermediate dataframe, + # which does unnecessary computation + for ax in axis: + result = result.dropna( + axis=ax, how=how, thresh=thresh, subset=subset) + if not inplace: + return result + + self._update_inplace(block_partitions=result._block_partitions, + columns=result.columns, + index=result.index) + + return None + + axis = pd.DataFrame()._get_axis_number(axis) + + if how is not None and how not in ['any', 'all']: + raise ValueError('invalid how option: %s' % how) + if how is None and thresh is None: + raise TypeError('must specify how or thresh') + + if subset is not None: + subset = set(subset) + + if axis == 1: + subset = [item for item in self.index if item in subset] + else: + subset = [item for item in self.columns if item in subset] + + def dropna_helper(df): + new_df = df.dropna(axis=axis, how=how, thresh=thresh, + subset=subset, inplace=False) + + if axis == 1: + new_index = new_df.columns + new_df.columns = pd.RangeIndex(0, len(new_df.columns)) + else: + new_index = new_df.index + new_df.reset_index(drop=True, inplace=True) + + return new_df, new_index + + parts = self._col_partitions if axis == 1 else self._row_partitions + result = [_deploy_func._submit(args=(dropna_helper, df), + num_return_vals=2) for df in parts] + new_parts, new_vals = [list(t) for t in zip(*result)] + + if axis == 1: + new_vals = [self._col_metadata.get_global_indices(i, vals) + for i, vals in enumerate(ray.get(new_vals))] + + # This flattens the 2d array to 1d + new_vals = [i for j in new_vals for i in j] + new_cols = self.columns[new_vals] + + if not inplace: + return DataFrame(col_partitions=new_parts, + columns=new_cols, + index=self.index) + + self._update_inplace(col_partitions=new_parts, + columns=new_cols, + index=self.index) + + else: + new_vals = [self._row_metadata.get_global_indices(i, vals) + for i, vals in enumerate(ray.get(new_vals))] + + # This flattens the 2d array to 1d + new_vals = [i for j in new_vals for i in j] + new_rows = self.index[new_vals] + + if not inplace: + return DataFrame(row_partitions=new_parts, + index=new_rows, + columns=self.columns) + + self._update_inplace(row_partitions=new_parts, + index=new_rows, + columns=self.columns) + + return None def add(self, other, axis='columns', level=None, fill_value=None): """Add this DataFrame to another or a scalar/list. @@ -1772,9 +1860,45 @@ def fillna(self, value=None, method=None, axis=None, inplace=False, return new_obj def filter(self, items=None, like=None, regex=None, axis=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Subset rows or columns based on their labels + + Args: + items (list): list of labels to subset + like (string): retain labels where `arg in label == True` + regex (string): retain labels matching regex input + axis: axis to filter on + + Returns: + A new dataframe with the filter applied. + """ + nkw = com._count_not_none(items, like, regex) + if nkw > 1: + raise TypeError('Keyword arguments `items`, `like`, or `regex` ' + 'are mutually exclusive') + if nkw == 0: + raise TypeError('Must pass either `items`, `like`, or `regex`') + + if axis is None: + axis = 'columns' # This is the default info axis for dataframes + + axis = pd.DataFrame()._get_axis_number(axis) + labels = self.columns if axis else self.index + + if items is not None: + bool_arr = labels.isin(items) + elif like is not None: + def f(x): + return like in to_str(x) + bool_arr = labels.map(f).tolist() + else: + def f(x): + return matcher.search(to_str(x)) is not None + matcher = re.compile(regex) + bool_arr = labels.map(f).tolist() + + if not axis: + return self[bool_arr] + return self[self.columns[bool_arr]] def first(self, offset): raise NotImplementedError( @@ -3957,7 +4081,9 @@ def _getitem_array(self, key): index=index) else: columns = self._col_metadata[key].index - indices_for_rows = [col for col in self.col if col in set(columns)] + indices_for_rows = \ + [i for i, item in enumerate(self.columns) + if item in set(columns)] new_parts = [_deploy_func.remote( lambda df: df.__getitem__(indices_for_rows), diff --git a/python/ray/dataframe/index_metadata.py b/python/ray/dataframe/index_metadata.py index 63d96202ff9c..50df6a22a00d 100644 --- a/python/ray/dataframe/index_metadata.py +++ b/python/ray/dataframe/index_metadata.py @@ -271,6 +271,13 @@ def insert(self, key, loc=None, partition=None, # Return inserted coordinate for callee return coord_to_insert + def get_global_indices(self, partition, index_within_partition_list): + total = 0 + for i in range(partition): + total += self._lengths[i] + + return [total + i for i in index_within_partition_list] + def squeeze(self, partition, index_within_partition): """Prepare a single coordinate for removal by "squeezing" the subsequent coordinates "up" one index within that partition. To be used diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index b6d6414c6008..51698e392b35 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -190,6 +190,11 @@ def test_int_dataframe(): 'col3', 'col4'] + filter_by = {'items': ['col1', 'col5'], + 'regex': '4$|3$', + 'like': 'col'} + + test_filter(ray_df, pandas_df, filter_by) test_roundtrip(ray_df, pandas_df) test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) @@ -350,6 +355,11 @@ def test_float_dataframe(): 'col3', 'col4'] + filter_by = {'items': ['col1', 'col5'], + 'regex': '4$|3$', + 'like': 'col'} + + test_filter(ray_df, pandas_df, filter_by) test_roundtrip(ray_df, pandas_df) test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) @@ -510,6 +520,11 @@ def test_mixed_dtype_dataframe(): 'col3', 'col4'] + filter_by = {'items': ['col1', 'col5'], + 'regex': '4$|3$', + 'like': 'col'} + + test_filter(ray_df, pandas_df, filter_by) test_roundtrip(ray_df, pandas_df) test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) @@ -670,6 +685,11 @@ def test_nan_dataframe(): 'col3', 'col4'] + filter_by = {'items': ['col1', 'col5'], + 'regex': '4$|3$', + 'like': 'col'} + + test_filter(ray_df, pandas_df, filter_by) test_roundtrip(ray_df, pandas_df) test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) @@ -806,6 +826,23 @@ def test_nan_dataframe(): test_transform(ray_df, pandas_df) +def test_dense_nan_df(): + ray_df = rdf.DataFrame([[np.nan, 2, np.nan, 0], + [3, 4, np.nan, 1], + [np.nan, np.nan, np.nan, 5]], + columns=list('ABCD')) + + pd_df = pd.DataFrame([[np.nan, 2, np.nan, 0], + [3, 4, np.nan, 1], + [np.nan, np.nan, np.nan, 5]], + columns=list('ABCD')) + + test_dropna(ray_df, pd_df) + test_dropna_inplace(ray_df, pd_df) + test_dropna_multiple_axes(ray_df, pd_df) + test_dropna_multiple_axes_inplace(ray_df, pd_df) + + @pytest.fixture def test_inter_df_math(op, simple=False): ray_df = rdf.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7], @@ -1260,6 +1297,68 @@ def test_drop_duplicates(): ray_df.drop_duplicates() +@pytest.fixture +def test_dropna(ray_df, pd_df): + assert ray_df_equals_pandas(ray_df.dropna(axis=1, how='all'), + pd_df.dropna(axis=1, how='all')) + + assert ray_df_equals_pandas(ray_df.dropna(axis=1, how='any'), + pd_df.dropna(axis=1, how='any')) + + assert ray_df_equals_pandas(ray_df.dropna(axis=0, how='all'), + pd_df.dropna(axis=0, how='all')) + + assert ray_df_equals_pandas(ray_df.dropna(thresh=2), + pd_df.dropna(thresh=2)) + + +@pytest.fixture +def test_dropna_inplace(ray_df, pd_df): + ray_df = ray_df.copy() + pd_df = pd_df.copy() + + ray_df.dropna(thresh=2, inplace=True) + pd_df.dropna(thresh=2, inplace=True) + + assert ray_df_equals_pandas(ray_df, pd_df) + + ray_df.dropna(axis=1, how='any', inplace=True) + pd_df.dropna(axis=1, how='any', inplace=True) + + assert ray_df_equals_pandas(ray_df, pd_df) + + +@pytest.fixture +def test_dropna_multiple_axes(ray_df, pd_df): + assert ray_df_equals_pandas( + ray_df.dropna(how='all', axis=[0, 1]), + pd_df.dropna(how='all', axis=[0, 1]) + ) + assert ray_df_equals_pandas( + ray_df.dropna(how='all', axis=(0, 1)), + pd_df.dropna(how='all', axis=(0, 1)) + ) + + +@pytest.fixture +def test_dropna_multiple_axes_inplace(ray_df, pd_df): + ray_df_copy = ray_df.copy() + pd_df_copy = pd_df.copy() + + ray_df_copy.dropna(how='all', axis=[0, 1], inplace=True) + pd_df_copy.dropna(how='all', axis=[0, 1], inplace=True) + + assert ray_df_equals_pandas(ray_df_copy, pd_df_copy) + + ray_df_copy = ray_df.copy() + pd_df_copy = pd_df.copy() + + ray_df_copy.dropna(how='all', axis=(0, 1), inplace=True) + pd_df_copy.dropna(how='all', axis=(0, 1), inplace=True) + + assert ray_df_equals_pandas(ray_df_copy, pd_df_copy) + + def test_duplicated(): ray_df = create_test_dataframe() @@ -1755,11 +1854,16 @@ def test_fillna_datetime_columns(num_partitions=2): """ -def test_filter(): - ray_df = create_test_dataframe() +@pytest.fixture +def test_filter(ray_df, pandas_df, by): + ray_df_equals_pandas(ray_df.filter(items=by['items']), + pandas_df.filter(items=by['items'])) - with pytest.raises(NotImplementedError): - ray_df.filter() + ray_df_equals_pandas(ray_df.filter(regex=by['regex']), + pandas_df.filter(regex=by['regex'])) + + ray_df_equals_pandas(ray_df.filter(like=by['like']), + pandas_df.filter(like=by['like'])) def test_first():